risingwave_meta/barrier/
rpc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::error::Error;
19use std::fmt::{Debug, Formatter};
20use std::future::poll_fn;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::StreamExt;
27use futures::future::join_all;
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
30use risingwave_common::util::epoch::Epoch;
31use risingwave_common::util::tracing::TracingContext;
32use risingwave_connector::source::SplitImpl;
33use risingwave_meta_model::WorkerId;
34use risingwave_pb::common::{HostAddress, WorkerNode};
35use risingwave_pb::hummock::HummockVersionStats;
36use risingwave_pb::stream_plan::barrier_mutation::Mutation;
37use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation, SubscriptionUpstreamInfo};
38use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
39use risingwave_pb::stream_service::inject_barrier_request::{
40    BuildActorInfo, FragmentBuildActorInfo,
41};
42use risingwave_pb::stream_service::streaming_control_stream_request::{
43    CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
44    RemovePartialGraphRequest, ResetDatabaseRequest,
45};
46use risingwave_pb::stream_service::{
47    BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
48    streaming_control_stream_request, streaming_control_stream_response,
49};
50use risingwave_rpc_client::StreamingControlHandle;
51use thiserror_ext::AsReport;
52use tokio::time::sleep;
53use tokio_retry::strategy::ExponentialBackoff;
54use tracing::{debug, error, info, warn};
55use uuid::Uuid;
56
57use super::{BarrierKind, Command, InflightSubscriptionInfo, TracedEpoch};
58use crate::barrier::checkpoint::{
59    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
60};
61use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
62use crate::barrier::edge_builder::FragmentEdgeBuildResult;
63use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo};
64use crate::barrier::progress::CreateMviewProgressTracker;
65use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
66use crate::controller::fragment::InflightFragmentInfo;
67use crate::manager::MetaSrvEnv;
68use crate::model::{ActorId, StreamActor, StreamJobActorsToCreate, StreamJobFragments};
69use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
70use crate::{MetaError, MetaResult};
71
72fn to_partial_graph_id(job_id: Option<TableId>) -> u32 {
73    job_id
74        .map(|table| {
75            assert_ne!(table.table_id, u32::MAX);
76            table.table_id
77        })
78        .unwrap_or(u32::MAX)
79}
80
81pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<TableId> {
82    if partial_graph_id == u32::MAX {
83        None
84    } else {
85        Some(TableId::new(partial_graph_id))
86    }
87}
88
89struct ControlStreamNode {
90    worker_id: WorkerId,
91    host: HostAddress,
92    handle: StreamingControlHandle,
93}
94
95pub(super) struct ControlStreamManager {
96    connected_nodes: HashMap<WorkerId, ControlStreamNode>,
97    workers: HashMap<WorkerId, WorkerNode>,
98    env: MetaSrvEnv,
99}
100
101impl ControlStreamManager {
102    pub(super) fn new(env: MetaSrvEnv) -> Self {
103        Self {
104            connected_nodes: Default::default(),
105            workers: Default::default(),
106            env,
107        }
108    }
109
110    pub(super) fn is_connected(&self, worker_id: WorkerId) -> bool {
111        self.connected_nodes.contains_key(&worker_id)
112    }
113
114    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
115        self.workers[&worker_id].host.clone().unwrap()
116    }
117
118    #[await_tree::instrument("try_reconnect_worker({worker_id})")]
119    pub(super) async fn try_reconnect_worker(
120        &mut self,
121        worker_id: WorkerId,
122        inflight_infos: impl Iterator<
123            Item = (
124                DatabaseId,
125                &InflightSubscriptionInfo,
126                impl Iterator<Item = TableId>,
127            ),
128        >,
129        term_id: String,
130        context: &impl GlobalBarrierWorkerContext,
131    ) {
132        if self.connected_nodes.contains_key(&worker_id) {
133            warn!(worker_id, "node already connected");
134            return;
135        }
136        let node = &self.workers[&worker_id];
137        let node_host = node.host.as_ref().unwrap();
138
139        let init_request = Self::collect_init_request(inflight_infos, term_id);
140        match context.new_control_stream(node, &init_request).await {
141            Ok(handle) => {
142                assert!(
143                    self.connected_nodes
144                        .insert(
145                            worker_id,
146                            ControlStreamNode {
147                                worker_id,
148                                host: node.host.clone().unwrap(),
149                                handle,
150                            }
151                        )
152                        .is_none()
153                );
154                info!(?node_host, "add control stream worker");
155            }
156            Err(e) => {
157                error!(err = %e.as_report(), ?node_host, "fail to create worker node");
158            }
159        }
160    }
161
162    pub(super) async fn add_worker(
163        &mut self,
164        node: WorkerNode,
165        inflight_infos: impl Iterator<
166            Item = (
167                DatabaseId,
168                &InflightSubscriptionInfo,
169                impl Iterator<Item = TableId>,
170            ),
171        >,
172        term_id: String,
173        context: &impl GlobalBarrierWorkerContext,
174    ) {
175        let node_id = node.id as WorkerId;
176        let node = match self.workers.entry(node_id) {
177            Entry::Occupied(entry) => {
178                let entry = entry.into_mut();
179                assert_eq!(entry.host, node.host);
180                warn!(id = node.id, host = ?node.host, "node already exists");
181                &*entry
182            }
183            Entry::Vacant(entry) => &*entry.insert(node),
184        };
185        if self.connected_nodes.contains_key(&node_id) {
186            warn!(id = node.id, host = ?node.host, "new node already connected");
187            return;
188        }
189        let node_host = node.host.clone().unwrap();
190        let mut backoff = ExponentialBackoff::from_millis(100)
191            .max_delay(Duration::from_secs(3))
192            .factor(5);
193        let init_request = Self::collect_init_request(inflight_infos, term_id);
194        const MAX_RETRY: usize = 5;
195        for i in 1..=MAX_RETRY {
196            match context.new_control_stream(node, &init_request).await {
197                Ok(handle) => {
198                    assert!(
199                        self.connected_nodes
200                            .insert(
201                                node_id,
202                                ControlStreamNode {
203                                    worker_id: node.id as _,
204                                    host: node.host.clone().unwrap(),
205                                    handle,
206                                }
207                            )
208                            .is_none()
209                    );
210                    info!(?node_host, "add control stream worker");
211                    return;
212                }
213                Err(e) => {
214                    // It may happen that the dns information of newly registered worker node
215                    // has not been propagated to the meta node and cause error. Wait for a while and retry
216                    let delay = backoff.next().unwrap();
217                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
218                    sleep(delay).await;
219                }
220            }
221        }
222        error!(?node_host, "fail to create worker node after retry");
223    }
224
225    pub(super) async fn reset(
226        &mut self,
227        nodes: &HashMap<WorkerId, WorkerNode>,
228        term_id: String,
229        context: &impl GlobalBarrierWorkerContext,
230    ) -> HashSet<WorkerId> {
231        let init_request = PbInitRequest {
232            databases: vec![],
233            term_id,
234        };
235        let init_request = &init_request;
236        self.workers = nodes.clone();
237        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async move {
238            let result = context.new_control_stream(node, init_request).await;
239            (*worker_id, node.clone(), result)
240        }))
241        .await;
242        self.connected_nodes.clear();
243        let mut failed_workers = HashSet::new();
244        for (worker_id, node, result) in nodes {
245            match result {
246                Ok(handle) => {
247                    assert!(
248                        self.connected_nodes
249                            .insert(
250                                worker_id,
251                                ControlStreamNode {
252                                    worker_id: node.id as _,
253                                    host: node.host.clone().unwrap(),
254                                    handle
255                                }
256                            )
257                            .is_none()
258                    );
259                }
260                Err(e) => {
261                    failed_workers.insert(worker_id);
262                    warn!(
263                        e = %e.as_report(),
264                        worker_id,
265                        ?node,
266                        "failed to connect to node"
267                    )
268                }
269            }
270        }
271
272        failed_workers
273    }
274
275    /// Clear all nodes and response streams in the manager.
276    pub(super) fn clear(&mut self) {
277        *self = Self::new(self.env.clone());
278    }
279
280    fn poll_next_response(
281        &mut self,
282        cx: &mut Context<'_>,
283    ) -> Poll<(
284        WorkerId,
285        MetaResult<streaming_control_stream_response::Response>,
286    )> {
287        if self.connected_nodes.is_empty() {
288            return Poll::Pending;
289        }
290        let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
291        {
292            for (worker_id, node) in &mut self.connected_nodes {
293                match node.handle.response_stream.poll_next_unpin(cx) {
294                    Poll::Ready(result) => {
295                        poll_result = Poll::Ready((
296                            *worker_id,
297                            result
298                                .ok_or_else(|| anyhow!("end of stream").into())
299                                .and_then(|result| {
300                                    result.map_err(Into::<MetaError>::into).and_then(|resp| {
301                                        match resp
302                                            .response
303                                            .ok_or_else(||anyhow!("empty response"))?
304                                        {
305                                            streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!(
306                                                "worker node {worker_id} is shutting down"
307                                            )
308                                            .into()),
309                                            streaming_control_stream_response::Response::Init(_) => {
310                                                // This arm should be unreachable.
311                                                Err(anyhow!("get unexpected init response").into())
312                                            }
313                                            resp => Ok(resp),
314                                        }
315                                    })
316                                })
317                        ));
318                        break;
319                    }
320                    Poll::Pending => {
321                        continue;
322                    }
323                }
324            }
325        };
326
327        if let Poll::Ready((worker_id, Err(err))) = &poll_result {
328            let node = self
329                .connected_nodes
330                .remove(worker_id)
331                .expect("should exist when get shutdown resp");
332            warn!(worker_id = node.worker_id, host = ?node.host, err = %err.as_report(), "get error from response stream");
333        }
334
335        poll_result
336    }
337
338    #[await_tree::instrument("control_stream_next_response")]
339    pub(super) async fn next_response(
340        &mut self,
341    ) -> (
342        WorkerId,
343        MetaResult<streaming_control_stream_response::Response>,
344    ) {
345        poll_fn(|cx| self.poll_next_response(cx)).await
346    }
347
348    fn collect_init_request(
349        initial_inflight_infos: impl Iterator<
350            Item = (
351                DatabaseId,
352                &InflightSubscriptionInfo,
353                impl Iterator<Item = TableId>,
354            ),
355        >,
356        term_id: String,
357    ) -> PbInitRequest {
358        PbInitRequest {
359            databases: initial_inflight_infos
360                .map(|(database_id, subscriptions, creating_job_ids)| {
361                    let mut graphs = vec![PbInitialPartialGraph {
362                        partial_graph_id: to_partial_graph_id(None),
363                        subscriptions: subscriptions.into_iter().collect_vec(),
364                    }];
365                    graphs.extend(creating_job_ids.map(|job_id| PbInitialPartialGraph {
366                        partial_graph_id: to_partial_graph_id(Some(job_id)),
367                        subscriptions: vec![],
368                    }));
369                    PbDatabaseInitialPartialGraph {
370                        database_id: database_id.database_id,
371                        graphs,
372                    }
373                })
374                .collect(),
375            term_id,
376        }
377    }
378}
379
380pub(super) struct DatabaseInitialBarrierCollector {
381    database_id: DatabaseId,
382    node_to_collect: NodeToCollect,
383    database_state: BarrierWorkerState,
384    create_mview_tracker: CreateMviewProgressTracker,
385    creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
386    committed_epoch: u64,
387}
388
389impl Debug for DatabaseInitialBarrierCollector {
390    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
391        f.debug_struct("DatabaseInitialBarrierCollector")
392            .field("database_id", &self.database_id)
393            .field("node_to_collect", &self.node_to_collect)
394            .finish()
395    }
396}
397
398impl DatabaseInitialBarrierCollector {
399    pub(super) fn is_collected(&self) -> bool {
400        self.node_to_collect.is_empty()
401            && self
402                .creating_streaming_job_controls
403                .values()
404                .all(|job| job.is_empty())
405    }
406
407    pub(super) fn database_state(
408        &self,
409    ) -> (
410        &BarrierWorkerState,
411        &HashMap<TableId, CreatingStreamingJobControl>,
412    ) {
413        (&self.database_state, &self.creating_streaming_job_controls)
414    }
415
416    pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
417        assert_eq!(self.database_id.database_id, resp.database_id);
418        if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
419            self.creating_streaming_job_controls
420                .get_mut(&creating_job_id)
421                .expect("should exist")
422                .collect(resp);
423        } else {
424            assert_eq!(resp.epoch, self.committed_epoch);
425            assert!(
426                self.node_to_collect
427                    .remove(&(resp.worker_id as _))
428                    .is_some()
429            );
430        }
431    }
432
433    pub(super) fn finish(self) -> DatabaseCheckpointControl {
434        assert!(self.is_collected());
435        DatabaseCheckpointControl::recovery(
436            self.database_id,
437            self.create_mview_tracker,
438            self.database_state,
439            self.committed_epoch,
440            self.creating_streaming_job_controls,
441        )
442    }
443
444    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
445        is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
446            && self
447                .creating_streaming_job_controls
448                .values_mut()
449                .all(|job| job.is_valid_after_worker_err(worker_id))
450    }
451}
452
453impl ControlStreamManager {
454    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
455    #[expect(clippy::too_many_arguments)]
456    pub(super) fn inject_database_initial_barrier(
457        &mut self,
458        database_id: DatabaseId,
459        jobs: HashMap<TableId, InflightStreamingJobInfo>,
460        state_table_committed_epochs: &mut HashMap<TableId, u64>,
461        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
462        edges: &mut FragmentEdgeBuildResult,
463        stream_actors: &HashMap<ActorId, StreamActor>,
464        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
465        background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
466        mut subscription_info: InflightSubscriptionInfo,
467        is_paused: bool,
468        hummock_version_stats: &HummockVersionStats,
469    ) -> MetaResult<DatabaseInitialBarrierCollector> {
470        self.add_partial_graph(database_id, None);
471        let source_split_assignments = jobs
472            .values()
473            .flat_map(|job| job.fragment_infos())
474            .flat_map(|info| info.actors.keys())
475            .filter_map(|actor_id| {
476                let actor_id = *actor_id as ActorId;
477                source_splits
478                    .remove(&actor_id)
479                    .map(|splits| (actor_id, splits))
480            })
481            .collect();
482        let mutation = Mutation::Add(AddMutation {
483            // Actors built during recovery is not treated as newly added actors.
484            actor_dispatchers: Default::default(),
485            added_actors: Default::default(),
486            actor_splits: build_actor_connector_splits(&source_split_assignments),
487            pause: is_paused,
488            subscriptions_to_add: Default::default(),
489            // TODO(kwannoel): recover using backfill order plan
490            backfill_nodes_to_pause: Default::default(),
491        });
492
493        fn resolve_jobs_committed_epoch(
494            state_table_committed_epochs: &mut HashMap<TableId, u64>,
495            jobs: impl IntoIterator<Item = &InflightStreamingJobInfo>,
496        ) -> u64 {
497            let mut epochs = jobs
498                .into_iter()
499                .flat_map(InflightStreamingJobInfo::existing_table_ids)
500                .map(|table_id| {
501                    (
502                        table_id,
503                        state_table_committed_epochs
504                            .remove(&table_id)
505                            .expect("should exist"),
506                    )
507                });
508            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
509            for (table_id, epoch) in epochs {
510                assert_eq!(
511                    prev_epoch, epoch,
512                    "{} has different committed epoch to {}",
513                    first_table_id, table_id
514                );
515            }
516            prev_epoch
517        }
518
519        let mut database_jobs = HashMap::new();
520        let mut snapshot_backfill_jobs = HashMap::new();
521        let mut background_mviews = HashMap::new();
522
523        for (job_id, job) in jobs {
524            if let Some((definition, stream_job_fragments)) = background_jobs.remove(&job_id) {
525                if stream_job_fragments.fragments().any(|fragment| {
526                    fragment
527                        .fragment_type_mask
528                        .contains(FragmentTypeFlag::SnapshotBackfillStreamScan)
529                }) {
530                    debug!(%job_id, definition, "recovered snapshot backfill job");
531                    snapshot_backfill_jobs.insert(job_id, (job, definition, stream_job_fragments));
532                } else {
533                    database_jobs.insert(job_id, job);
534                    background_mviews.insert(job_id, (definition, stream_job_fragments));
535                }
536            } else {
537                database_jobs.insert(job_id, job);
538            }
539        }
540
541        let database_job_log_epochs: HashMap<_, _> = database_jobs
542            .keys()
543            .filter_map(|job_id| {
544                state_table_log_epochs
545                    .remove(job_id)
546                    .map(|epochs| (*job_id, epochs))
547            })
548            .collect();
549
550        let prev_epoch =
551            resolve_jobs_committed_epoch(state_table_committed_epochs, database_jobs.values());
552        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
553        // Use a different `curr_epoch` for each recovery attempt.
554        let curr_epoch = prev_epoch.next();
555        let barrier_info = BarrierInfo {
556            prev_epoch,
557            curr_epoch,
558            kind: BarrierKind::Initial,
559        };
560
561        let mut ongoing_snapshot_backfill_jobs: HashMap<TableId, _> = HashMap::new();
562        for (job_id, (info, definition, stream_job_fragments)) in snapshot_backfill_jobs {
563            let committed_epoch =
564                resolve_jobs_committed_epoch(state_table_committed_epochs, [&info]);
565            if committed_epoch == barrier_info.prev_epoch() {
566                info!(
567                    "recovered creating snapshot backfill job {} catch up with upstream already",
568                    job_id
569                );
570                background_mviews
571                    .try_insert(job_id, (definition, stream_job_fragments))
572                    .expect("non-duplicate");
573                database_jobs
574                    .try_insert(job_id, info)
575                    .expect("non-duplicate");
576                continue;
577            }
578            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
579                stream_job_fragments
580                    .fragments()
581                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
582            )?
583            .0
584            .ok_or_else(|| {
585                anyhow!(
586                    "recovered snapshot backfill job {} has no snapshot backfill info",
587                    job_id
588                )
589            })?;
590            let mut snapshot_epoch = None;
591            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
592                .upstream_mv_table_id_to_backfill_epoch
593                .keys()
594                .cloned()
595                .collect();
596            for (upstream_table_id, epoch) in
597                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
598            {
599                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
600                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
601                if *snapshot_epoch != epoch {
602                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
603                }
604            }
605            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
606                anyhow!(
607                    "snapshot backfill job {} has not set snapshot epoch",
608                    job_id
609                )
610            })?;
611            for upstream_table_id in &upstream_table_ids {
612                subscription_info
613                    .mv_depended_subscriptions
614                    .entry(*upstream_table_id)
615                    .or_default()
616                    .try_insert(job_id.into(), max(snapshot_epoch, committed_epoch))
617                    .expect("non-duplicate");
618            }
619            ongoing_snapshot_backfill_jobs
620                .try_insert(
621                    job_id,
622                    (
623                        info,
624                        definition,
625                        stream_job_fragments,
626                        upstream_table_ids,
627                        committed_epoch,
628                        snapshot_epoch,
629                    ),
630                )
631                .expect("non-duplicated");
632        }
633
634        let node_to_collect = {
635            let node_actors = edges.collect_actors_to_create(database_jobs.values().flatten().map(
636                move |fragment_info| {
637                    (
638                        fragment_info.fragment_id,
639                        &fragment_info.nodes,
640                        fragment_info.actors.iter().map(move |(actor_id, actor)| {
641                            (
642                                stream_actors.get(actor_id).expect("should exist"),
643                                actor.worker_id,
644                            )
645                        }),
646                    )
647                },
648            ));
649
650            let node_to_collect = self.inject_barrier(
651                database_id,
652                None,
653                Some(mutation.clone()),
654                &barrier_info,
655                database_jobs.values().flatten(),
656                database_jobs.values().flatten(),
657                Some(node_actors),
658                (&subscription_info).into_iter().collect(),
659                vec![],
660            )?;
661            debug!(
662                ?node_to_collect,
663                database_id = database_id.database_id,
664                "inject initial barrier"
665            );
666            node_to_collect
667        };
668
669        let tracker = CreateMviewProgressTracker::recover(
670            background_mviews
671                .iter()
672                .map(|(table_id, (definition, stream_job_fragments))| {
673                    (
674                        *table_id,
675                        (definition.clone(), stream_job_fragments, Default::default()),
676                    )
677                }),
678            hummock_version_stats,
679        );
680
681        let mut creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl> =
682            HashMap::new();
683        for (
684            job_id,
685            (
686                info,
687                definition,
688                stream_job_fragments,
689                upstream_table_ids,
690                committed_epoch,
691                snapshot_epoch,
692            ),
693        ) in ongoing_snapshot_backfill_jobs
694        {
695            let node_actors =
696                edges.collect_actors_to_create(info.fragment_infos().map(move |fragment_info| {
697                    (
698                        fragment_info.fragment_id,
699                        &fragment_info.nodes,
700                        fragment_info.actors.iter().map(move |(actor_id, actor)| {
701                            (
702                                stream_actors.get(actor_id).expect("should exist"),
703                                actor.worker_id,
704                            )
705                        }),
706                    )
707                }));
708
709            creating_streaming_job_controls.insert(
710                job_id,
711                CreatingStreamingJobControl::recover(
712                    database_id,
713                    job_id,
714                    definition,
715                    upstream_table_ids,
716                    &database_job_log_epochs,
717                    snapshot_epoch,
718                    committed_epoch,
719                    barrier_info.curr_epoch.value().0,
720                    info,
721                    stream_job_fragments,
722                    hummock_version_stats,
723                    node_actors,
724                    mutation.clone(),
725                    self,
726                )?,
727            );
728        }
729
730        let committed_epoch = barrier_info.prev_epoch();
731        let new_epoch = barrier_info.curr_epoch;
732        let mut database = InflightDatabaseInfo::empty();
733        database_jobs
734            .into_values()
735            .for_each(|job| database.extend(job));
736        let database_state =
737            BarrierWorkerState::recovery(new_epoch, database, subscription_info, is_paused);
738        Ok(DatabaseInitialBarrierCollector {
739            database_id,
740            node_to_collect,
741            database_state,
742            create_mview_tracker: tracker,
743            creating_streaming_job_controls,
744            committed_epoch,
745        })
746    }
747
748    pub(super) fn inject_command_ctx_barrier(
749        &mut self,
750        database_id: DatabaseId,
751        command: Option<&Command>,
752        barrier_info: &BarrierInfo,
753        is_paused: bool,
754        pre_applied_graph_info: &InflightDatabaseInfo,
755        applied_graph_info: &InflightDatabaseInfo,
756        edges: &mut Option<FragmentEdgeBuildResult>,
757    ) -> MetaResult<NodeToCollect> {
758        let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
759        let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
760            add.subscriptions_to_add.clone()
761        } else {
762            vec![]
763        };
764        let subscriptions_to_remove = if let Some(Mutation::DropSubscriptions(drop)) = &mutation {
765            drop.info.clone()
766        } else {
767            vec![]
768        };
769        self.inject_barrier(
770            database_id,
771            None,
772            mutation,
773            barrier_info,
774            pre_applied_graph_info.fragment_infos(),
775            applied_graph_info.fragment_infos(),
776            command
777                .as_ref()
778                .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
779                .unwrap_or_default(),
780            subscriptions_to_add,
781            subscriptions_to_remove,
782        )
783    }
784
785    pub(super) fn inject_barrier<'a>(
786        &mut self,
787        database_id: DatabaseId,
788        creating_table_id: Option<TableId>,
789        mutation: Option<Mutation>,
790        barrier_info: &BarrierInfo,
791        pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
792        applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
793        mut new_actors: Option<StreamJobActorsToCreate>,
794        subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
795        subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
796    ) -> MetaResult<NodeToCollect> {
797        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
798            "inject_barrier_err"
799        ));
800
801        let partial_graph_id = to_partial_graph_id(creating_table_id);
802
803        let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
804
805        for worker_id in node_actors.keys() {
806            if !self.connected_nodes.contains_key(worker_id) {
807                return Err(anyhow!("unconnected worker node {}", worker_id).into());
808            }
809        }
810
811        let table_ids_to_sync: HashSet<_> =
812            InflightFragmentInfo::existing_table_ids(applied_graph_info)
813                .map(|table_id| table_id.table_id)
814                .collect();
815
816        let mut node_need_collect = HashMap::new();
817
818        self.connected_nodes
819            .iter()
820            .try_for_each(|(node_id, node)| {
821                let actor_ids_to_collect = node_actors
822                    .get(node_id)
823                    .map(|actors| actors.iter().cloned())
824                    .into_iter()
825                    .flatten()
826                    .collect_vec();
827                let is_empty = actor_ids_to_collect.is_empty();
828                {
829                    let mutation = mutation.clone();
830                    let barrier = Barrier {
831                        epoch: Some(risingwave_pb::data::Epoch {
832                            curr: barrier_info.curr_epoch.value().0,
833                            prev: barrier_info.prev_epoch(),
834                        }),
835                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
836                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
837                            .to_protobuf(),
838                        kind: barrier_info.kind.to_protobuf() as i32,
839                        passed_actors: vec![],
840                    };
841
842                    node.handle
843                        .request_sender
844                        .send(StreamingControlStreamRequest {
845                            request: Some(
846                                streaming_control_stream_request::Request::InjectBarrier(
847                                    InjectBarrierRequest {
848                                        request_id: Uuid::new_v4().to_string(),
849                                        barrier: Some(barrier),
850                                        database_id: database_id.database_id,
851                                        actor_ids_to_collect,
852                                        table_ids_to_sync: table_ids_to_sync
853                                            .iter()
854                                            .cloned()
855                                            .collect(),
856                                        partial_graph_id,
857                                        actors_to_build: new_actors
858                                            .as_mut()
859                                            .map(|new_actors| new_actors.remove(&(*node_id as _)))
860                                            .into_iter()
861                                            .flatten()
862                                            .flatten()
863                                            .map(|(fragment_id, (node, actors))| {
864                                                FragmentBuildActorInfo {
865                                                    fragment_id,
866                                                    node: Some(node),
867                                                    actors: actors
868                                                        .into_iter()
869                                                        .map(|(actor, upstreams, dispatchers)| {
870                                                            BuildActorInfo {
871                                                                actor_id: actor.actor_id,
872                                                                fragment_upstreams: upstreams
873                                                                    .into_iter()
874                                                                    .map(|(fragment_id, upstreams)| {
875                                                                        (
876                                                                            fragment_id,
877                                                                            UpstreamActors {
878                                                                                actors: upstreams
879                                                                                    .into_values()
880                                                                                    .collect(),
881                                                                            },
882                                                                        )
883                                                                    })
884                                                                    .collect(),
885                                                                dispatchers,
886                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
887                                                                mview_definition: actor.mview_definition,
888                                                                expr_context: actor.expr_context,
889                                                            }
890                                                        })
891                                                        .collect(),
892                                                }
893                                            })
894                                            .collect(),
895                                        subscriptions_to_add: subscriptions_to_add.clone(),
896                                        subscriptions_to_remove: subscriptions_to_remove.clone(),
897                                    },
898                                ),
899                            ),
900                        })
901                        .map_err(|_| {
902                            MetaError::from(anyhow!(
903                                "failed to send request to {} {:?}",
904                                node.worker_id,
905                                node.host
906                            ))
907                        })?;
908
909                    node_need_collect.insert(*node_id as WorkerId, is_empty);
910                    Result::<_, MetaError>::Ok(())
911                }
912            })
913            .inspect_err(|e| {
914                // Record failure in event log.
915                use risingwave_pb::meta::event_log;
916                let event = event_log::EventInjectBarrierFail {
917                    prev_epoch: barrier_info.prev_epoch(),
918                    cur_epoch: barrier_info.curr_epoch.value().0,
919                    error: e.to_report_string(),
920                };
921                self.env
922                    .event_log_manager_ref()
923                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
924            })?;
925        Ok(node_need_collect)
926    }
927
928    pub(super) fn add_partial_graph(
929        &mut self,
930        database_id: DatabaseId,
931        creating_job_id: Option<TableId>,
932    ) {
933        let partial_graph_id = to_partial_graph_id(creating_job_id);
934        self.connected_nodes.iter().for_each(|(_, node)| {
935            if node
936                .handle
937                .request_sender
938                .send(StreamingControlStreamRequest {
939                    request: Some(
940                        streaming_control_stream_request::Request::CreatePartialGraph(
941                            CreatePartialGraphRequest {
942                                database_id: database_id.database_id,
943                                partial_graph_id,
944                            },
945                        ),
946                    ),
947                }).is_err() {
948                warn!(%database_id, ?creating_job_id, worker_id = node.worker_id, "fail to add partial graph to worker")
949            }
950        });
951    }
952
953    pub(super) fn remove_partial_graph(
954        &mut self,
955        database_id: DatabaseId,
956        creating_job_ids: Vec<TableId>,
957    ) {
958        if creating_job_ids.is_empty() {
959            return;
960        }
961        let partial_graph_ids = creating_job_ids
962            .into_iter()
963            .map(|job_id| to_partial_graph_id(Some(job_id)))
964            .collect_vec();
965        self.connected_nodes.iter().for_each(|(_, node)| {
966            if node.handle
967                .request_sender
968                .send(StreamingControlStreamRequest {
969                    request: Some(
970                        streaming_control_stream_request::Request::RemovePartialGraph(
971                            RemovePartialGraphRequest {
972                                partial_graph_ids: partial_graph_ids.clone(),
973                                database_id: database_id.database_id,
974                            },
975                        ),
976                    ),
977                })
978                .is_err()
979            {
980                warn!(worker_id = node.worker_id,node = ?node.host,"failed to send remove partial graph request");
981            }
982        })
983    }
984
985    pub(super) fn reset_database(
986        &mut self,
987        database_id: DatabaseId,
988        reset_request_id: u32,
989    ) -> HashSet<WorkerId> {
990        self.connected_nodes
991            .iter()
992            .filter_map(|(worker_id, node)| {
993                if node
994                    .handle
995                    .request_sender
996                    .send(StreamingControlStreamRequest {
997                        request: Some(streaming_control_stream_request::Request::ResetDatabase(
998                            ResetDatabaseRequest {
999                                database_id: database_id.database_id,
1000                                reset_request_id,
1001                            },
1002                        )),
1003                    })
1004                    .is_err()
1005                {
1006                    warn!(worker_id, node = ?node.host,"failed to send reset database request");
1007                    None
1008                } else {
1009                    Some(*worker_id)
1010                }
1011            })
1012            .collect()
1013    }
1014}
1015
1016impl GlobalBarrierWorkerContextImpl {
1017    pub(super) async fn new_control_stream_impl(
1018        &self,
1019        node: &WorkerNode,
1020        init_request: &PbInitRequest,
1021    ) -> MetaResult<StreamingControlHandle> {
1022        let handle = self
1023            .env
1024            .stream_client_pool()
1025            .get(node)
1026            .await?
1027            .start_streaming_control(init_request.clone())
1028            .await?;
1029        Ok(handle)
1030    }
1031}
1032
1033pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1034    message: &str,
1035    errors: impl IntoIterator<Item = (WorkerId, E)>,
1036) -> MetaError {
1037    use std::fmt::Write;
1038
1039    use risingwave_common::error::error_request_copy;
1040    use risingwave_common::error::tonic::extra::Score;
1041
1042    let errors = errors.into_iter().collect_vec();
1043
1044    if errors.is_empty() {
1045        return anyhow!(message.to_owned()).into();
1046    }
1047
1048    // Create the error from the single error.
1049    let single_error = |(worker_id, e)| {
1050        anyhow::Error::from(e)
1051            .context(format!("{message}, in worker node {worker_id}"))
1052            .into()
1053    };
1054
1055    if errors.len() == 1 {
1056        return single_error(errors.into_iter().next().unwrap());
1057    }
1058
1059    // Find the error with the highest score.
1060    let max_score = errors
1061        .iter()
1062        .filter_map(|(_, e)| error_request_copy::<Score>(e))
1063        .max();
1064
1065    if let Some(max_score) = max_score {
1066        let mut errors = errors;
1067        let max_scored = errors
1068            .extract_if(.., |(_, e)| {
1069                error_request_copy::<Score>(e) == Some(max_score)
1070            })
1071            .next()
1072            .unwrap();
1073
1074        return single_error(max_scored);
1075    }
1076
1077    // The errors do not have scores, so simply concatenate them.
1078    let concat: String = errors
1079        .into_iter()
1080        .fold(format!("{message}: "), |mut s, (w, e)| {
1081            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1082            s
1083        });
1084    anyhow!(concat).into()
1085}