risingwave_meta/barrier/
rpc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::error::Error;
19use std::fmt::{Debug, Formatter};
20use std::future::poll_fn;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::StreamExt;
27use futures::future::join_all;
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, TableId};
30use risingwave_common::util::epoch::Epoch;
31use risingwave_common::util::tracing::TracingContext;
32use risingwave_connector::source::SplitImpl;
33use risingwave_meta_model::WorkerId;
34use risingwave_pb::common::{HostAddress, WorkerNode};
35use risingwave_pb::hummock::HummockVersionStats;
36use risingwave_pb::stream_plan::barrier_mutation::Mutation;
37use risingwave_pb::stream_plan::{
38    AddMutation, Barrier, BarrierMutation, FragmentTypeFlag, SubscriptionUpstreamInfo,
39};
40use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
41use risingwave_pb::stream_service::inject_barrier_request::{
42    BuildActorInfo, FragmentBuildActorInfo,
43};
44use risingwave_pb::stream_service::streaming_control_stream_request::{
45    CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
46    RemovePartialGraphRequest, ResetDatabaseRequest,
47};
48use risingwave_pb::stream_service::{
49    BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
50    streaming_control_stream_request, streaming_control_stream_response,
51};
52use risingwave_rpc_client::StreamingControlHandle;
53use thiserror_ext::AsReport;
54use tokio::time::sleep;
55use tokio_retry::strategy::ExponentialBackoff;
56use tracing::{debug, error, info, warn};
57use uuid::Uuid;
58
59use super::{BarrierKind, Command, InflightSubscriptionInfo, TracedEpoch};
60use crate::barrier::checkpoint::{
61    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
62};
63use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
64use crate::barrier::edge_builder::FragmentEdgeBuildResult;
65use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo};
66use crate::barrier::progress::CreateMviewProgressTracker;
67use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
68use crate::controller::fragment::InflightFragmentInfo;
69use crate::manager::MetaSrvEnv;
70use crate::model::{ActorId, StreamActor, StreamJobActorsToCreate, StreamJobFragments};
71use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
72use crate::{MetaError, MetaResult};
73
74fn to_partial_graph_id(job_id: Option<TableId>) -> u32 {
75    job_id
76        .map(|table| {
77            assert_ne!(table.table_id, u32::MAX);
78            table.table_id
79        })
80        .unwrap_or(u32::MAX)
81}
82
83pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<TableId> {
84    if partial_graph_id == u32::MAX {
85        None
86    } else {
87        Some(TableId::new(partial_graph_id))
88    }
89}
90
91struct ControlStreamNode {
92    worker_id: WorkerId,
93    host: HostAddress,
94    handle: StreamingControlHandle,
95}
96
97pub(super) struct ControlStreamManager {
98    connected_nodes: HashMap<WorkerId, ControlStreamNode>,
99    workers: HashMap<WorkerId, WorkerNode>,
100    env: MetaSrvEnv,
101}
102
103impl ControlStreamManager {
104    pub(super) fn new(env: MetaSrvEnv) -> Self {
105        Self {
106            connected_nodes: Default::default(),
107            workers: Default::default(),
108            env,
109        }
110    }
111
112    pub(super) fn is_connected(&self, worker_id: WorkerId) -> bool {
113        self.connected_nodes.contains_key(&worker_id)
114    }
115
116    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
117        self.workers[&worker_id].host.clone().unwrap()
118    }
119
120    #[await_tree::instrument("try_reconnect_worker({worker_id})")]
121    pub(super) async fn try_reconnect_worker(
122        &mut self,
123        worker_id: WorkerId,
124        inflight_infos: impl Iterator<
125            Item = (
126                DatabaseId,
127                &InflightSubscriptionInfo,
128                impl Iterator<Item = TableId>,
129            ),
130        >,
131        term_id: String,
132        context: &impl GlobalBarrierWorkerContext,
133    ) {
134        if self.connected_nodes.contains_key(&worker_id) {
135            warn!(worker_id, "node already connected");
136            return;
137        }
138        let node = &self.workers[&worker_id];
139        let node_host = node.host.as_ref().unwrap();
140
141        let init_request = Self::collect_init_request(inflight_infos, term_id);
142        match context.new_control_stream(node, &init_request).await {
143            Ok(handle) => {
144                assert!(
145                    self.connected_nodes
146                        .insert(
147                            worker_id,
148                            ControlStreamNode {
149                                worker_id,
150                                host: node.host.clone().unwrap(),
151                                handle,
152                            }
153                        )
154                        .is_none()
155                );
156                info!(?node_host, "add control stream worker");
157            }
158            Err(e) => {
159                error!(err = %e.as_report(), ?node_host, "fail to create worker node");
160            }
161        }
162    }
163
164    pub(super) async fn add_worker(
165        &mut self,
166        node: WorkerNode,
167        inflight_infos: impl Iterator<
168            Item = (
169                DatabaseId,
170                &InflightSubscriptionInfo,
171                impl Iterator<Item = TableId>,
172            ),
173        >,
174        term_id: String,
175        context: &impl GlobalBarrierWorkerContext,
176    ) {
177        let node_id = node.id as WorkerId;
178        let node = match self.workers.entry(node_id) {
179            Entry::Occupied(entry) => {
180                let entry = entry.into_mut();
181                assert_eq!(entry.host, node.host);
182                warn!(id = node.id, host = ?node.host, "node already exists");
183                &*entry
184            }
185            Entry::Vacant(entry) => &*entry.insert(node),
186        };
187        if self.connected_nodes.contains_key(&node_id) {
188            warn!(id = node.id, host = ?node.host, "new node already connected");
189            return;
190        }
191        let node_host = node.host.clone().unwrap();
192        let mut backoff = ExponentialBackoff::from_millis(100)
193            .max_delay(Duration::from_secs(3))
194            .factor(5);
195        let init_request = Self::collect_init_request(inflight_infos, term_id);
196        const MAX_RETRY: usize = 5;
197        for i in 1..=MAX_RETRY {
198            match context.new_control_stream(node, &init_request).await {
199                Ok(handle) => {
200                    assert!(
201                        self.connected_nodes
202                            .insert(
203                                node_id,
204                                ControlStreamNode {
205                                    worker_id: node.id as _,
206                                    host: node.host.clone().unwrap(),
207                                    handle,
208                                }
209                            )
210                            .is_none()
211                    );
212                    info!(?node_host, "add control stream worker");
213                    return;
214                }
215                Err(e) => {
216                    // It may happen that the dns information of newly registered worker node
217                    // has not been propagated to the meta node and cause error. Wait for a while and retry
218                    let delay = backoff.next().unwrap();
219                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
220                    sleep(delay).await;
221                }
222            }
223        }
224        error!(?node_host, "fail to create worker node after retry");
225    }
226
227    pub(super) async fn reset(
228        &mut self,
229        nodes: &HashMap<WorkerId, WorkerNode>,
230        term_id: String,
231        context: &impl GlobalBarrierWorkerContext,
232    ) -> HashSet<WorkerId> {
233        let init_request = PbInitRequest {
234            databases: vec![],
235            term_id,
236        };
237        let init_request = &init_request;
238        self.workers = nodes.clone();
239        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async move {
240            let result = context.new_control_stream(node, init_request).await;
241            (*worker_id, node.clone(), result)
242        }))
243        .await;
244        self.connected_nodes.clear();
245        let mut failed_workers = HashSet::new();
246        for (worker_id, node, result) in nodes {
247            match result {
248                Ok(handle) => {
249                    assert!(
250                        self.connected_nodes
251                            .insert(
252                                worker_id,
253                                ControlStreamNode {
254                                    worker_id: node.id as _,
255                                    host: node.host.clone().unwrap(),
256                                    handle
257                                }
258                            )
259                            .is_none()
260                    );
261                }
262                Err(e) => {
263                    failed_workers.insert(worker_id);
264                    warn!(
265                        e = %e.as_report(),
266                        worker_id,
267                        ?node,
268                        "failed to connect to node"
269                    )
270                }
271            }
272        }
273
274        failed_workers
275    }
276
277    /// Clear all nodes and response streams in the manager.
278    pub(super) fn clear(&mut self) {
279        *self = Self::new(self.env.clone());
280    }
281
282    fn poll_next_response(
283        &mut self,
284        cx: &mut Context<'_>,
285    ) -> Poll<(
286        WorkerId,
287        MetaResult<streaming_control_stream_response::Response>,
288    )> {
289        if self.connected_nodes.is_empty() {
290            return Poll::Pending;
291        }
292        let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
293        {
294            for (worker_id, node) in &mut self.connected_nodes {
295                match node.handle.response_stream.poll_next_unpin(cx) {
296                    Poll::Ready(result) => {
297                        poll_result = Poll::Ready((
298                            *worker_id,
299                            result
300                                .ok_or_else(|| anyhow!("end of stream").into())
301                                .and_then(|result| {
302                                    result.map_err(Into::<MetaError>::into).and_then(|resp| {
303                                        match resp
304                                            .response
305                                            .ok_or_else(||anyhow!("empty response"))?
306                                        {
307                                            streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!(
308                                                "worker node {worker_id} is shutting down"
309                                            )
310                                            .into()),
311                                            streaming_control_stream_response::Response::Init(_) => {
312                                                // This arm should be unreachable.
313                                                Err(anyhow!("get unexpected init response").into())
314                                            }
315                                            resp => Ok(resp),
316                                        }
317                                    })
318                                })
319                        ));
320                        break;
321                    }
322                    Poll::Pending => {
323                        continue;
324                    }
325                }
326            }
327        };
328
329        if let Poll::Ready((worker_id, Err(err))) = &poll_result {
330            let node = self
331                .connected_nodes
332                .remove(worker_id)
333                .expect("should exist when get shutdown resp");
334            warn!(worker_id = node.worker_id, host = ?node.host, err = %err.as_report(), "get error from response stream");
335        }
336
337        poll_result
338    }
339
340    #[await_tree::instrument("control_stream_next_response")]
341    pub(super) async fn next_response(
342        &mut self,
343    ) -> (
344        WorkerId,
345        MetaResult<streaming_control_stream_response::Response>,
346    ) {
347        poll_fn(|cx| self.poll_next_response(cx)).await
348    }
349
350    fn collect_init_request(
351        initial_inflight_infos: impl Iterator<
352            Item = (
353                DatabaseId,
354                &InflightSubscriptionInfo,
355                impl Iterator<Item = TableId>,
356            ),
357        >,
358        term_id: String,
359    ) -> PbInitRequest {
360        PbInitRequest {
361            databases: initial_inflight_infos
362                .map(|(database_id, subscriptions, creating_job_ids)| {
363                    let mut graphs = vec![PbInitialPartialGraph {
364                        partial_graph_id: to_partial_graph_id(None),
365                        subscriptions: subscriptions.into_iter().collect_vec(),
366                    }];
367                    graphs.extend(creating_job_ids.map(|job_id| PbInitialPartialGraph {
368                        partial_graph_id: to_partial_graph_id(Some(job_id)),
369                        subscriptions: vec![],
370                    }));
371                    PbDatabaseInitialPartialGraph {
372                        database_id: database_id.database_id,
373                        graphs,
374                    }
375                })
376                .collect(),
377            term_id,
378        }
379    }
380}
381
382pub(super) struct DatabaseInitialBarrierCollector {
383    database_id: DatabaseId,
384    node_to_collect: NodeToCollect,
385    database_state: BarrierWorkerState,
386    create_mview_tracker: CreateMviewProgressTracker,
387    creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
388    committed_epoch: u64,
389}
390
391impl Debug for DatabaseInitialBarrierCollector {
392    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
393        f.debug_struct("DatabaseInitialBarrierCollector")
394            .field("database_id", &self.database_id)
395            .field("node_to_collect", &self.node_to_collect)
396            .finish()
397    }
398}
399
400impl DatabaseInitialBarrierCollector {
401    pub(super) fn is_collected(&self) -> bool {
402        self.node_to_collect.is_empty()
403            && self
404                .creating_streaming_job_controls
405                .values()
406                .all(|job| job.is_empty())
407    }
408
409    pub(super) fn database_state(
410        &self,
411    ) -> (
412        &BarrierWorkerState,
413        &HashMap<TableId, CreatingStreamingJobControl>,
414    ) {
415        (&self.database_state, &self.creating_streaming_job_controls)
416    }
417
418    pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
419        assert_eq!(self.database_id.database_id, resp.database_id);
420        if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
421            self.creating_streaming_job_controls
422                .get_mut(&creating_job_id)
423                .expect("should exist")
424                .collect(resp);
425        } else {
426            assert_eq!(resp.epoch, self.committed_epoch);
427            assert!(
428                self.node_to_collect
429                    .remove(&(resp.worker_id as _))
430                    .is_some()
431            );
432        }
433    }
434
435    pub(super) fn finish(self) -> DatabaseCheckpointControl {
436        assert!(self.is_collected());
437        DatabaseCheckpointControl::recovery(
438            self.database_id,
439            self.create_mview_tracker,
440            self.database_state,
441            self.committed_epoch,
442            self.creating_streaming_job_controls,
443        )
444    }
445
446    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
447        is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
448            && self
449                .creating_streaming_job_controls
450                .values_mut()
451                .all(|job| job.is_valid_after_worker_err(worker_id))
452    }
453}
454
455impl ControlStreamManager {
456    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
457    #[expect(clippy::too_many_arguments)]
458    pub(super) fn inject_database_initial_barrier(
459        &mut self,
460        database_id: DatabaseId,
461        jobs: HashMap<TableId, InflightStreamingJobInfo>,
462        state_table_committed_epochs: &mut HashMap<TableId, u64>,
463        state_table_log_epochs: &mut HashMap<TableId, Vec<(Vec<u64>, u64)>>,
464        edges: &mut FragmentEdgeBuildResult,
465        stream_actors: &HashMap<ActorId, StreamActor>,
466        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
467        background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
468        mut subscription_info: InflightSubscriptionInfo,
469        is_paused: bool,
470        hummock_version_stats: &HummockVersionStats,
471    ) -> MetaResult<DatabaseInitialBarrierCollector> {
472        self.add_partial_graph(database_id, None);
473        let source_split_assignments = jobs
474            .values()
475            .flat_map(|job| job.fragment_infos())
476            .flat_map(|info| info.actors.keys())
477            .filter_map(|actor_id| {
478                let actor_id = *actor_id as ActorId;
479                source_splits
480                    .remove(&actor_id)
481                    .map(|splits| (actor_id, splits))
482            })
483            .collect();
484        let mutation = Mutation::Add(AddMutation {
485            // Actors built during recovery is not treated as newly added actors.
486            actor_dispatchers: Default::default(),
487            added_actors: Default::default(),
488            actor_splits: build_actor_connector_splits(&source_split_assignments),
489            pause: is_paused,
490            subscriptions_to_add: Default::default(),
491            // TODO(kwannoel): recover using backfill order plan
492            backfill_nodes_to_pause: Default::default(),
493        });
494
495        fn resolve_jobs_committed_epoch(
496            state_table_committed_epochs: &mut HashMap<TableId, u64>,
497            jobs: impl IntoIterator<Item = &InflightStreamingJobInfo>,
498        ) -> u64 {
499            let mut epochs = jobs
500                .into_iter()
501                .flat_map(InflightStreamingJobInfo::existing_table_ids)
502                .map(|table_id| {
503                    (
504                        table_id,
505                        state_table_committed_epochs
506                            .remove(&table_id)
507                            .expect("should exist"),
508                    )
509                });
510            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
511            for (table_id, epoch) in epochs {
512                assert_eq!(
513                    prev_epoch, epoch,
514                    "{} has different committed epoch to {}",
515                    first_table_id, table_id
516                );
517            }
518            prev_epoch
519        }
520
521        let mut database_jobs = HashMap::new();
522        let mut snapshot_backfill_jobs = HashMap::new();
523        let mut background_mviews = HashMap::new();
524
525        for (job_id, job) in jobs {
526            if let Some((definition, stream_job_fragments)) = background_jobs.remove(&job_id) {
527                if stream_job_fragments.fragments().any(|fragment| {
528                    (fragment.fragment_type_mask
529                        & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32))
530                        != 0
531                }) {
532                    debug!(%job_id, definition, "recovered snapshot backfill job");
533                    snapshot_backfill_jobs.insert(job_id, (job, definition, stream_job_fragments));
534                } else {
535                    database_jobs.insert(job_id, job);
536                    background_mviews.insert(job_id, (definition, stream_job_fragments));
537                }
538            } else {
539                database_jobs.insert(job_id, job);
540            }
541        }
542
543        let database_job_log_epochs: HashMap<_, _> = database_jobs
544            .keys()
545            .filter_map(|job_id| {
546                state_table_log_epochs
547                    .remove(job_id)
548                    .map(|epochs| (*job_id, epochs))
549            })
550            .collect();
551
552        let prev_epoch =
553            resolve_jobs_committed_epoch(state_table_committed_epochs, database_jobs.values());
554        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
555        // Use a different `curr_epoch` for each recovery attempt.
556        let curr_epoch = prev_epoch.next();
557        let barrier_info = BarrierInfo {
558            prev_epoch,
559            curr_epoch,
560            kind: BarrierKind::Initial,
561        };
562
563        let mut ongoing_snapshot_backfill_jobs: HashMap<TableId, _> = HashMap::new();
564        for (job_id, (info, definition, stream_job_fragments)) in snapshot_backfill_jobs {
565            let committed_epoch =
566                resolve_jobs_committed_epoch(state_table_committed_epochs, [&info]);
567            if committed_epoch == barrier_info.prev_epoch() {
568                info!(
569                    "recovered creating snapshot backfill job {} catch up with upstream already",
570                    job_id
571                );
572                background_mviews
573                    .try_insert(job_id, (definition, stream_job_fragments))
574                    .expect("non-duplicate");
575                database_jobs
576                    .try_insert(job_id, info)
577                    .expect("non-duplicate");
578                continue;
579            }
580            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
581                stream_job_fragments
582                    .fragments()
583                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
584            )?
585            .0
586            .ok_or_else(|| {
587                anyhow!(
588                    "recovered snapshot backfill job {} has no snapshot backfill info",
589                    job_id
590                )
591            })?;
592            let mut snapshot_epoch = None;
593            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
594                .upstream_mv_table_id_to_backfill_epoch
595                .keys()
596                .cloned()
597                .collect();
598            for (upstream_table_id, epoch) in
599                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
600            {
601                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
602                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
603                if *snapshot_epoch != epoch {
604                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
605                }
606            }
607            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
608                anyhow!(
609                    "snapshot backfill job {} has not set snapshot epoch",
610                    job_id
611                )
612            })?;
613            for upstream_table_id in &upstream_table_ids {
614                subscription_info
615                    .mv_depended_subscriptions
616                    .entry(*upstream_table_id)
617                    .or_default()
618                    .try_insert(job_id.into(), max(snapshot_epoch, committed_epoch))
619                    .expect("non-duplicate");
620            }
621            ongoing_snapshot_backfill_jobs
622                .try_insert(
623                    job_id,
624                    (
625                        info,
626                        definition,
627                        stream_job_fragments,
628                        upstream_table_ids,
629                        committed_epoch,
630                        snapshot_epoch,
631                    ),
632                )
633                .expect("non-duplicated");
634        }
635
636        let node_to_collect = {
637            let node_actors = edges.collect_actors_to_create(database_jobs.values().flatten().map(
638                move |fragment_info| {
639                    (
640                        fragment_info.fragment_id,
641                        &fragment_info.nodes,
642                        fragment_info.actors.iter().map(move |(actor_id, actor)| {
643                            (
644                                stream_actors.get(actor_id).expect("should exist"),
645                                actor.worker_id,
646                            )
647                        }),
648                    )
649                },
650            ));
651
652            let node_to_collect = self.inject_barrier(
653                database_id,
654                None,
655                Some(mutation.clone()),
656                &barrier_info,
657                database_jobs.values().flatten(),
658                database_jobs.values().flatten(),
659                Some(node_actors),
660                (&subscription_info).into_iter().collect(),
661                vec![],
662            )?;
663            debug!(
664                ?node_to_collect,
665                database_id = database_id.database_id,
666                "inject initial barrier"
667            );
668            node_to_collect
669        };
670
671        let tracker = CreateMviewProgressTracker::recover(
672            background_mviews
673                .iter()
674                .map(|(table_id, (definition, stream_job_fragments))| {
675                    (*table_id, (definition.clone(), stream_job_fragments))
676                }),
677            hummock_version_stats,
678        );
679
680        let mut creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl> =
681            HashMap::new();
682        for (
683            job_id,
684            (
685                info,
686                definition,
687                stream_job_fragments,
688                upstream_table_ids,
689                committed_epoch,
690                snapshot_epoch,
691            ),
692        ) in ongoing_snapshot_backfill_jobs
693        {
694            let node_actors =
695                edges.collect_actors_to_create(info.fragment_infos().map(move |fragment_info| {
696                    (
697                        fragment_info.fragment_id,
698                        &fragment_info.nodes,
699                        fragment_info.actors.iter().map(move |(actor_id, actor)| {
700                            (
701                                stream_actors.get(actor_id).expect("should exist"),
702                                actor.worker_id,
703                            )
704                        }),
705                    )
706                }));
707
708            creating_streaming_job_controls.insert(
709                job_id,
710                CreatingStreamingJobControl::recover(
711                    database_id,
712                    job_id,
713                    definition,
714                    upstream_table_ids,
715                    &database_job_log_epochs,
716                    snapshot_epoch,
717                    committed_epoch,
718                    barrier_info.curr_epoch.value().0,
719                    info,
720                    stream_job_fragments,
721                    hummock_version_stats,
722                    node_actors,
723                    mutation.clone(),
724                    self,
725                )?,
726            );
727        }
728
729        let committed_epoch = barrier_info.prev_epoch();
730        let new_epoch = barrier_info.curr_epoch;
731        let mut database = InflightDatabaseInfo::empty();
732        database_jobs
733            .into_values()
734            .for_each(|job| database.extend(job));
735        let database_state =
736            BarrierWorkerState::recovery(new_epoch, database, subscription_info, is_paused);
737        Ok(DatabaseInitialBarrierCollector {
738            database_id,
739            node_to_collect,
740            database_state,
741            create_mview_tracker: tracker,
742            creating_streaming_job_controls,
743            committed_epoch,
744        })
745    }
746
747    pub(super) fn inject_command_ctx_barrier(
748        &mut self,
749        database_id: DatabaseId,
750        command: Option<&Command>,
751        barrier_info: &BarrierInfo,
752        is_paused: bool,
753        pre_applied_graph_info: &InflightDatabaseInfo,
754        applied_graph_info: &InflightDatabaseInfo,
755        edges: &mut Option<FragmentEdgeBuildResult>,
756    ) -> MetaResult<NodeToCollect> {
757        let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
758        let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
759            add.subscriptions_to_add.clone()
760        } else {
761            vec![]
762        };
763        let subscriptions_to_remove = if let Some(Mutation::DropSubscriptions(drop)) = &mutation {
764            drop.info.clone()
765        } else {
766            vec![]
767        };
768        self.inject_barrier(
769            database_id,
770            None,
771            mutation,
772            barrier_info,
773            pre_applied_graph_info.fragment_infos(),
774            applied_graph_info.fragment_infos(),
775            command
776                .as_ref()
777                .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
778                .unwrap_or_default(),
779            subscriptions_to_add,
780            subscriptions_to_remove,
781        )
782    }
783
784    pub(super) fn inject_barrier<'a>(
785        &mut self,
786        database_id: DatabaseId,
787        creating_table_id: Option<TableId>,
788        mutation: Option<Mutation>,
789        barrier_info: &BarrierInfo,
790        pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
791        applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
792        mut new_actors: Option<StreamJobActorsToCreate>,
793        subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
794        subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
795    ) -> MetaResult<NodeToCollect> {
796        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
797            "inject_barrier_err"
798        ));
799
800        let partial_graph_id = to_partial_graph_id(creating_table_id);
801
802        let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
803
804        for worker_id in node_actors.keys() {
805            if !self.connected_nodes.contains_key(worker_id) {
806                return Err(anyhow!("unconnected worker node {}", worker_id).into());
807            }
808        }
809
810        let table_ids_to_sync: HashSet<_> =
811            InflightFragmentInfo::existing_table_ids(applied_graph_info)
812                .map(|table_id| table_id.table_id)
813                .collect();
814
815        let mut node_need_collect = HashMap::new();
816
817        self.connected_nodes
818            .iter()
819            .try_for_each(|(node_id, node)| {
820                let actor_ids_to_collect = node_actors
821                    .get(node_id)
822                    .map(|actors| actors.iter().cloned())
823                    .into_iter()
824                    .flatten()
825                    .collect_vec();
826                let is_empty = actor_ids_to_collect.is_empty();
827                {
828                    let mutation = mutation.clone();
829                    let barrier = Barrier {
830                        epoch: Some(risingwave_pb::data::Epoch {
831                            curr: barrier_info.curr_epoch.value().0,
832                            prev: barrier_info.prev_epoch(),
833                        }),
834                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
835                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
836                            .to_protobuf(),
837                        kind: barrier_info.kind.to_protobuf() as i32,
838                        passed_actors: vec![],
839                    };
840
841                    node.handle
842                        .request_sender
843                        .send(StreamingControlStreamRequest {
844                            request: Some(
845                                streaming_control_stream_request::Request::InjectBarrier(
846                                    InjectBarrierRequest {
847                                        request_id: Uuid::new_v4().to_string(),
848                                        barrier: Some(barrier),
849                                        database_id: database_id.database_id,
850                                        actor_ids_to_collect,
851                                        table_ids_to_sync: table_ids_to_sync
852                                            .iter()
853                                            .cloned()
854                                            .collect(),
855                                        partial_graph_id,
856                                        actors_to_build: new_actors
857                                            .as_mut()
858                                            .map(|new_actors| new_actors.remove(&(*node_id as _)))
859                                            .into_iter()
860                                            .flatten()
861                                            .flatten()
862                                            .map(|(fragment_id, (node, actors))| {
863                                                FragmentBuildActorInfo {
864                                                    fragment_id,
865                                                    node: Some(node),
866                                                    actors: actors
867                                                        .into_iter()
868                                                        .map(|(actor, upstreams, dispatchers)| {
869                                                            BuildActorInfo {
870                                                                actor_id: actor.actor_id,
871                                                                fragment_upstreams: upstreams
872                                                                    .into_iter()
873                                                                    .map(|(fragment_id, upstreams)| {
874                                                                        (
875                                                                            fragment_id,
876                                                                            UpstreamActors {
877                                                                                actors: upstreams
878                                                                                    .into_values()
879                                                                                    .collect(),
880                                                                            },
881                                                                        )
882                                                                    })
883                                                                    .collect(),
884                                                                dispatchers,
885                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
886                                                                mview_definition: actor.mview_definition,
887                                                                expr_context: actor.expr_context,
888                                                            }
889                                                        })
890                                                        .collect(),
891                                                }
892                                            })
893                                            .collect(),
894                                        subscriptions_to_add: subscriptions_to_add.clone(),
895                                        subscriptions_to_remove: subscriptions_to_remove.clone(),
896                                    },
897                                ),
898                            ),
899                        })
900                        .map_err(|_| {
901                            MetaError::from(anyhow!(
902                                "failed to send request to {} {:?}",
903                                node.worker_id,
904                                node.host
905                            ))
906                        })?;
907
908                    node_need_collect.insert(*node_id as WorkerId, is_empty);
909                    Result::<_, MetaError>::Ok(())
910                }
911            })
912            .inspect_err(|e| {
913                // Record failure in event log.
914                use risingwave_pb::meta::event_log;
915                let event = event_log::EventInjectBarrierFail {
916                    prev_epoch: barrier_info.prev_epoch(),
917                    cur_epoch: barrier_info.curr_epoch.value().0,
918                    error: e.to_report_string(),
919                };
920                self.env
921                    .event_log_manager_ref()
922                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
923            })?;
924        Ok(node_need_collect)
925    }
926
927    pub(super) fn add_partial_graph(
928        &mut self,
929        database_id: DatabaseId,
930        creating_job_id: Option<TableId>,
931    ) {
932        let partial_graph_id = to_partial_graph_id(creating_job_id);
933        self.connected_nodes.iter().for_each(|(_, node)| {
934            if node
935                .handle
936                .request_sender
937                .send(StreamingControlStreamRequest {
938                    request: Some(
939                        streaming_control_stream_request::Request::CreatePartialGraph(
940                            CreatePartialGraphRequest {
941                                database_id: database_id.database_id,
942                                partial_graph_id,
943                            },
944                        ),
945                    ),
946                }).is_err() {
947                warn!(%database_id, ?creating_job_id, worker_id = node.worker_id, "fail to add partial graph to worker")
948            }
949        });
950    }
951
952    pub(super) fn remove_partial_graph(
953        &mut self,
954        database_id: DatabaseId,
955        creating_job_ids: Vec<TableId>,
956    ) {
957        if creating_job_ids.is_empty() {
958            return;
959        }
960        let partial_graph_ids = creating_job_ids
961            .into_iter()
962            .map(|job_id| to_partial_graph_id(Some(job_id)))
963            .collect_vec();
964        self.connected_nodes.iter().for_each(|(_, node)| {
965            if node.handle
966                .request_sender
967                .send(StreamingControlStreamRequest {
968                    request: Some(
969                        streaming_control_stream_request::Request::RemovePartialGraph(
970                            RemovePartialGraphRequest {
971                                partial_graph_ids: partial_graph_ids.clone(),
972                                database_id: database_id.database_id,
973                            },
974                        ),
975                    ),
976                })
977                .is_err()
978            {
979                warn!(worker_id = node.worker_id,node = ?node.host,"failed to send remove partial graph request");
980            }
981        })
982    }
983
984    pub(super) fn reset_database(
985        &mut self,
986        database_id: DatabaseId,
987        reset_request_id: u32,
988    ) -> HashSet<WorkerId> {
989        self.connected_nodes
990            .iter()
991            .filter_map(|(worker_id, node)| {
992                if node
993                    .handle
994                    .request_sender
995                    .send(StreamingControlStreamRequest {
996                        request: Some(streaming_control_stream_request::Request::ResetDatabase(
997                            ResetDatabaseRequest {
998                                database_id: database_id.database_id,
999                                reset_request_id,
1000                            },
1001                        )),
1002                    })
1003                    .is_err()
1004                {
1005                    warn!(worker_id, node = ?node.host,"failed to send reset database request");
1006                    None
1007                } else {
1008                    Some(*worker_id)
1009                }
1010            })
1011            .collect()
1012    }
1013}
1014
1015impl GlobalBarrierWorkerContextImpl {
1016    pub(super) async fn new_control_stream_impl(
1017        &self,
1018        node: &WorkerNode,
1019        init_request: &PbInitRequest,
1020    ) -> MetaResult<StreamingControlHandle> {
1021        let handle = self
1022            .env
1023            .stream_client_pool()
1024            .get(node)
1025            .await?
1026            .start_streaming_control(init_request.clone())
1027            .await?;
1028        Ok(handle)
1029    }
1030}
1031
1032pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1033    message: &str,
1034    errors: impl IntoIterator<Item = (WorkerId, E)>,
1035) -> MetaError {
1036    use std::error::request_value;
1037    use std::fmt::Write;
1038
1039    use risingwave_common::error::tonic::extra::Score;
1040
1041    let errors = errors.into_iter().collect_vec();
1042
1043    if errors.is_empty() {
1044        return anyhow!(message.to_owned()).into();
1045    }
1046
1047    // Create the error from the single error.
1048    let single_error = |(worker_id, e)| {
1049        anyhow::Error::from(e)
1050            .context(format!("{message}, in worker node {worker_id}"))
1051            .into()
1052    };
1053
1054    if errors.len() == 1 {
1055        return single_error(errors.into_iter().next().unwrap());
1056    }
1057
1058    // Find the error with the highest score.
1059    let max_score = errors
1060        .iter()
1061        .filter_map(|(_, e)| request_value::<Score>(e))
1062        .max();
1063
1064    if let Some(max_score) = max_score {
1065        let mut errors = errors;
1066        let max_scored = errors
1067            .extract_if(.., |(_, e)| request_value::<Score>(e) == Some(max_score))
1068            .next()
1069            .unwrap();
1070
1071        return single_error(max_scored);
1072    }
1073
1074    // The errors do not have scores, so simply concatenate them.
1075    let concat: String = errors
1076        .into_iter()
1077        .fold(format!("{message}: "), |mut s, (w, e)| {
1078            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1079            s
1080        });
1081    anyhow!(concat).into()
1082}