risingwave_meta/barrier/
rpc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::error::Error;
19use std::fmt::{Debug, Formatter};
20use std::future::poll_fn;
21use std::task::{Context, Poll};
22use std::time::Duration;
23
24use anyhow::anyhow;
25use fail::fail_point;
26use futures::StreamExt;
27use futures::future::join_all;
28use itertools::Itertools;
29use risingwave_common::catalog::{DatabaseId, TableId};
30use risingwave_common::util::epoch::Epoch;
31use risingwave_common::util::tracing::TracingContext;
32use risingwave_connector::source::SplitImpl;
33use risingwave_meta_model::WorkerId;
34use risingwave_pb::common::{HostAddress, WorkerNode};
35use risingwave_pb::hummock::HummockVersionStats;
36use risingwave_pb::stream_plan::barrier_mutation::Mutation;
37use risingwave_pb::stream_plan::{
38    AddMutation, Barrier, BarrierMutation, FragmentTypeFlag, SubscriptionUpstreamInfo,
39};
40use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
41use risingwave_pb::stream_service::inject_barrier_request::{
42    BuildActorInfo, FragmentBuildActorInfo,
43};
44use risingwave_pb::stream_service::streaming_control_stream_request::{
45    CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
46    RemovePartialGraphRequest, ResetDatabaseRequest,
47};
48use risingwave_pb::stream_service::{
49    BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
50    streaming_control_stream_request, streaming_control_stream_response,
51};
52use risingwave_rpc_client::StreamingControlHandle;
53use thiserror_ext::AsReport;
54use tokio::time::sleep;
55use tokio_retry::strategy::ExponentialBackoff;
56use tracing::{debug, error, info, warn};
57use uuid::Uuid;
58
59use super::{BarrierKind, Command, InflightSubscriptionInfo, TracedEpoch};
60use crate::barrier::checkpoint::{
61    BarrierWorkerState, CreatingStreamingJobControl, DatabaseCheckpointControl,
62};
63use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
64use crate::barrier::edge_builder::FragmentEdgeBuildResult;
65use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo};
66use crate::barrier::progress::CreateMviewProgressTracker;
67use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
68use crate::controller::fragment::InflightFragmentInfo;
69use crate::manager::MetaSrvEnv;
70use crate::model::{ActorId, StreamActor, StreamJobActorsToCreate, StreamJobFragments};
71use crate::stream::{StreamFragmentGraph, build_actor_connector_splits};
72use crate::{MetaError, MetaResult};
73
74fn to_partial_graph_id(job_id: Option<TableId>) -> u32 {
75    job_id
76        .map(|table| {
77            assert_ne!(table.table_id, u32::MAX);
78            table.table_id
79        })
80        .unwrap_or(u32::MAX)
81}
82
83pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<TableId> {
84    if partial_graph_id == u32::MAX {
85        None
86    } else {
87        Some(TableId::new(partial_graph_id))
88    }
89}
90
91struct ControlStreamNode {
92    worker_id: WorkerId,
93    host: HostAddress,
94    handle: StreamingControlHandle,
95}
96
97pub(super) struct ControlStreamManager {
98    connected_nodes: HashMap<WorkerId, ControlStreamNode>,
99    workers: HashMap<WorkerId, WorkerNode>,
100    env: MetaSrvEnv,
101}
102
103impl ControlStreamManager {
104    pub(super) fn new(env: MetaSrvEnv) -> Self {
105        Self {
106            connected_nodes: Default::default(),
107            workers: Default::default(),
108            env,
109        }
110    }
111
112    pub(super) fn is_connected(&self, worker_id: WorkerId) -> bool {
113        self.connected_nodes.contains_key(&worker_id)
114    }
115
116    pub(super) fn host_addr(&self, worker_id: WorkerId) -> HostAddress {
117        self.workers[&worker_id].host.clone().unwrap()
118    }
119
120    pub(super) async fn try_reconnect_worker(
121        &mut self,
122        worker_id: WorkerId,
123        inflight_infos: impl Iterator<
124            Item = (
125                DatabaseId,
126                &InflightSubscriptionInfo,
127                impl Iterator<Item = TableId>,
128            ),
129        >,
130        term_id: String,
131        context: &impl GlobalBarrierWorkerContext,
132    ) {
133        if self.connected_nodes.contains_key(&worker_id) {
134            warn!(worker_id, "node already connected");
135            return;
136        }
137        let node = &self.workers[&worker_id];
138        let node_host = node.host.as_ref().unwrap();
139
140        let init_request = Self::collect_init_request(inflight_infos, term_id);
141        match context.new_control_stream(node, &init_request).await {
142            Ok(handle) => {
143                assert!(
144                    self.connected_nodes
145                        .insert(
146                            worker_id,
147                            ControlStreamNode {
148                                worker_id,
149                                host: node.host.clone().unwrap(),
150                                handle,
151                            }
152                        )
153                        .is_none()
154                );
155                info!(?node_host, "add control stream worker");
156            }
157            Err(e) => {
158                error!(err = %e.as_report(), ?node_host, "fail to create worker node");
159            }
160        }
161    }
162
163    pub(super) async fn add_worker(
164        &mut self,
165        node: WorkerNode,
166        inflight_infos: impl Iterator<
167            Item = (
168                DatabaseId,
169                &InflightSubscriptionInfo,
170                impl Iterator<Item = TableId>,
171            ),
172        >,
173        term_id: String,
174        context: &impl GlobalBarrierWorkerContext,
175    ) {
176        let node_id = node.id as WorkerId;
177        let node = match self.workers.entry(node_id) {
178            Entry::Occupied(entry) => {
179                let entry = entry.into_mut();
180                assert_eq!(entry.host, node.host);
181                warn!(id = node.id, host = ?node.host, "node already exists");
182                &*entry
183            }
184            Entry::Vacant(entry) => &*entry.insert(node),
185        };
186        if self.connected_nodes.contains_key(&node_id) {
187            warn!(id = node.id, host = ?node.host, "new node already connected");
188            return;
189        }
190        let node_host = node.host.clone().unwrap();
191        let mut backoff = ExponentialBackoff::from_millis(100)
192            .max_delay(Duration::from_secs(3))
193            .factor(5);
194        let init_request = Self::collect_init_request(inflight_infos, term_id);
195        const MAX_RETRY: usize = 5;
196        for i in 1..=MAX_RETRY {
197            match context.new_control_stream(node, &init_request).await {
198                Ok(handle) => {
199                    assert!(
200                        self.connected_nodes
201                            .insert(
202                                node_id,
203                                ControlStreamNode {
204                                    worker_id: node.id as _,
205                                    host: node.host.clone().unwrap(),
206                                    handle,
207                                }
208                            )
209                            .is_none()
210                    );
211                    info!(?node_host, "add control stream worker");
212                    return;
213                }
214                Err(e) => {
215                    // It may happen that the dns information of newly registered worker node
216                    // has not been propagated to the meta node and cause error. Wait for a while and retry
217                    let delay = backoff.next().unwrap();
218                    error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
219                    sleep(delay).await;
220                }
221            }
222        }
223        error!(?node_host, "fail to create worker node after retry");
224    }
225
226    pub(super) async fn reset(
227        &mut self,
228        nodes: &HashMap<WorkerId, WorkerNode>,
229        term_id: String,
230        context: &impl GlobalBarrierWorkerContext,
231    ) -> HashSet<WorkerId> {
232        let init_request = PbInitRequest {
233            databases: vec![],
234            term_id,
235        };
236        let init_request = &init_request;
237        self.workers = nodes.clone();
238        let nodes = join_all(nodes.iter().map(|(worker_id, node)| async move {
239            let result = context.new_control_stream(node, init_request).await;
240            (*worker_id, node.clone(), result)
241        }))
242        .await;
243        self.connected_nodes.clear();
244        let mut failed_workers = HashSet::new();
245        for (worker_id, node, result) in nodes {
246            match result {
247                Ok(handle) => {
248                    assert!(
249                        self.connected_nodes
250                            .insert(
251                                worker_id,
252                                ControlStreamNode {
253                                    worker_id: node.id as _,
254                                    host: node.host.clone().unwrap(),
255                                    handle
256                                }
257                            )
258                            .is_none()
259                    );
260                }
261                Err(e) => {
262                    failed_workers.insert(worker_id);
263                    warn!(
264                        e = %e.as_report(),
265                        worker_id,
266                        ?node,
267                        "failed to connect to node"
268                    )
269                }
270            }
271        }
272
273        failed_workers
274    }
275
276    /// Clear all nodes and response streams in the manager.
277    pub(super) fn clear(&mut self) {
278        *self = Self::new(self.env.clone());
279    }
280
281    fn poll_next_response(
282        &mut self,
283        cx: &mut Context<'_>,
284    ) -> Poll<(
285        WorkerId,
286        MetaResult<streaming_control_stream_response::Response>,
287    )> {
288        if self.connected_nodes.is_empty() {
289            return Poll::Pending;
290        }
291        let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
292        {
293            for (worker_id, node) in &mut self.connected_nodes {
294                match node.handle.response_stream.poll_next_unpin(cx) {
295                    Poll::Ready(result) => {
296                        poll_result = Poll::Ready((
297                            *worker_id,
298                            result
299                                .ok_or_else(|| anyhow!("end of stream").into())
300                                .and_then(|result| {
301                                    result.map_err(Into::<MetaError>::into).and_then(|resp| {
302                                        match resp
303                                            .response
304                                            .ok_or_else(||anyhow!("empty response"))?
305                                        {
306                                            streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!(
307                                                "worker node {worker_id} is shutting down"
308                                            )
309                                            .into()),
310                                            streaming_control_stream_response::Response::Init(_) => {
311                                                // This arm should be unreachable.
312                                                Err(anyhow!("get unexpected init response").into())
313                                            }
314                                            resp => Ok(resp),
315                                        }
316                                    })
317                                })
318                        ));
319                        break;
320                    }
321                    Poll::Pending => {
322                        continue;
323                    }
324                }
325            }
326        };
327
328        if let Poll::Ready((worker_id, Err(err))) = &poll_result {
329            let node = self
330                .connected_nodes
331                .remove(worker_id)
332                .expect("should exist when get shutdown resp");
333            warn!(worker_id = node.worker_id, host = ?node.host, err = %err.as_report(), "get error from response stream");
334        }
335
336        poll_result
337    }
338
339    pub(super) async fn next_response(
340        &mut self,
341    ) -> (
342        WorkerId,
343        MetaResult<streaming_control_stream_response::Response>,
344    ) {
345        poll_fn(|cx| self.poll_next_response(cx)).await
346    }
347
348    fn collect_init_request(
349        initial_inflight_infos: impl Iterator<
350            Item = (
351                DatabaseId,
352                &InflightSubscriptionInfo,
353                impl Iterator<Item = TableId>,
354            ),
355        >,
356        term_id: String,
357    ) -> PbInitRequest {
358        PbInitRequest {
359            databases: initial_inflight_infos
360                .map(|(database_id, subscriptions, creating_job_ids)| {
361                    let mut graphs = vec![PbInitialPartialGraph {
362                        partial_graph_id: to_partial_graph_id(None),
363                        subscriptions: subscriptions.into_iter().collect_vec(),
364                    }];
365                    graphs.extend(creating_job_ids.map(|job_id| PbInitialPartialGraph {
366                        partial_graph_id: to_partial_graph_id(Some(job_id)),
367                        subscriptions: vec![],
368                    }));
369                    PbDatabaseInitialPartialGraph {
370                        database_id: database_id.database_id,
371                        graphs,
372                    }
373                })
374                .collect(),
375            term_id,
376        }
377    }
378}
379
380pub(super) struct DatabaseInitialBarrierCollector {
381    database_id: DatabaseId,
382    node_to_collect: NodeToCollect,
383    database_state: BarrierWorkerState,
384    create_mview_tracker: CreateMviewProgressTracker,
385    creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
386    committed_epoch: u64,
387}
388
389impl Debug for DatabaseInitialBarrierCollector {
390    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
391        f.debug_struct("DatabaseInitialBarrierCollector")
392            .field("database_id", &self.database_id)
393            .field("node_to_collect", &self.node_to_collect)
394            .finish()
395    }
396}
397
398impl DatabaseInitialBarrierCollector {
399    pub(super) fn is_collected(&self) -> bool {
400        self.node_to_collect.is_empty()
401            && self
402                .creating_streaming_job_controls
403                .values()
404                .all(|job| job.is_empty())
405    }
406
407    pub(super) fn database_state(
408        &self,
409    ) -> (
410        &BarrierWorkerState,
411        &HashMap<TableId, CreatingStreamingJobControl>,
412    ) {
413        (&self.database_state, &self.creating_streaming_job_controls)
414    }
415
416    pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
417        assert_eq!(self.database_id.database_id, resp.database_id);
418        if let Some(creating_job_id) = from_partial_graph_id(resp.partial_graph_id) {
419            self.creating_streaming_job_controls
420                .get_mut(&creating_job_id)
421                .expect("should exist")
422                .collect(resp);
423        } else {
424            assert_eq!(resp.epoch, self.committed_epoch);
425            assert!(
426                self.node_to_collect
427                    .remove(&(resp.worker_id as _))
428                    .is_some()
429            );
430        }
431    }
432
433    pub(super) fn finish(self) -> DatabaseCheckpointControl {
434        assert!(self.is_collected());
435        DatabaseCheckpointControl::recovery(
436            self.database_id,
437            self.create_mview_tracker,
438            self.database_state,
439            self.committed_epoch,
440            self.creating_streaming_job_controls,
441        )
442    }
443
444    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
445        is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
446            && self
447                .creating_streaming_job_controls
448                .values_mut()
449                .all(|job| job.is_valid_after_worker_err(worker_id))
450    }
451}
452
453impl ControlStreamManager {
454    /// Extract information from the loaded runtime barrier worker snapshot info, and inject the initial barrier.
455    #[expect(clippy::too_many_arguments)]
456    pub(super) fn inject_database_initial_barrier(
457        &mut self,
458        database_id: DatabaseId,
459        jobs: HashMap<TableId, InflightStreamingJobInfo>,
460        state_table_committed_epochs: &mut HashMap<TableId, u64>,
461        state_table_log_epochs: &mut HashMap<TableId, Vec<Vec<u64>>>,
462        edges: &mut FragmentEdgeBuildResult,
463        stream_actors: &HashMap<ActorId, StreamActor>,
464        source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
465        background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
466        mut subscription_info: InflightSubscriptionInfo,
467        is_paused: bool,
468        hummock_version_stats: &HummockVersionStats,
469    ) -> MetaResult<DatabaseInitialBarrierCollector> {
470        self.add_partial_graph(database_id, None);
471        let source_split_assignments = jobs
472            .values()
473            .flat_map(|job| job.fragment_infos())
474            .flat_map(|info| info.actors.keys())
475            .filter_map(|actor_id| {
476                let actor_id = *actor_id as ActorId;
477                source_splits
478                    .remove(&actor_id)
479                    .map(|splits| (actor_id, splits))
480            })
481            .collect();
482        let mutation = Mutation::Add(AddMutation {
483            // Actors built during recovery is not treated as newly added actors.
484            actor_dispatchers: Default::default(),
485            added_actors: Default::default(),
486            actor_splits: build_actor_connector_splits(&source_split_assignments),
487            pause: is_paused,
488            subscriptions_to_add: Default::default(),
489        });
490
491        fn resolve_jobs_committed_epoch(
492            state_table_committed_epochs: &mut HashMap<TableId, u64>,
493            jobs: impl IntoIterator<Item = &InflightStreamingJobInfo>,
494        ) -> u64 {
495            let mut epochs = jobs
496                .into_iter()
497                .flat_map(InflightStreamingJobInfo::existing_table_ids)
498                .map(|table_id| {
499                    (
500                        table_id,
501                        state_table_committed_epochs
502                            .remove(&table_id)
503                            .expect("should exist"),
504                    )
505                });
506            let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
507            for (table_id, epoch) in epochs {
508                assert_eq!(
509                    prev_epoch, epoch,
510                    "{} has different committed epoch to {}",
511                    first_table_id, table_id
512                );
513            }
514            prev_epoch
515        }
516
517        let mut database_jobs = HashMap::new();
518        let mut snapshot_backfill_jobs = HashMap::new();
519        let mut background_mviews = HashMap::new();
520
521        for (job_id, job) in jobs {
522            if let Some((definition, stream_job_fragments)) = background_jobs.remove(&job_id) {
523                if stream_job_fragments.fragments().any(|fragment| {
524                    (fragment.fragment_type_mask
525                        & (FragmentTypeFlag::SnapshotBackfillStreamScan as u32))
526                        != 0
527                }) {
528                    debug!(%job_id, definition, "recovered snapshot backfill job");
529                    snapshot_backfill_jobs.insert(job_id, (job, definition, stream_job_fragments));
530                } else {
531                    database_jobs.insert(job_id, job);
532                    background_mviews.insert(job_id, (definition, stream_job_fragments));
533                }
534            } else {
535                database_jobs.insert(job_id, job);
536            }
537        }
538
539        let database_job_log_epochs: HashMap<_, _> = database_jobs
540            .keys()
541            .filter_map(|job_id| {
542                state_table_log_epochs
543                    .remove(job_id)
544                    .map(|epochs| (*job_id, epochs))
545            })
546            .collect();
547
548        let prev_epoch =
549            resolve_jobs_committed_epoch(state_table_committed_epochs, database_jobs.values());
550        let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
551        // Use a different `curr_epoch` for each recovery attempt.
552        let curr_epoch = prev_epoch.next();
553        let barrier_info = BarrierInfo {
554            prev_epoch,
555            curr_epoch,
556            kind: BarrierKind::Initial,
557        };
558
559        let mut ongoing_snapshot_backfill_jobs: HashMap<TableId, _> = HashMap::new();
560        for (job_id, (info, definition, stream_job_fragments)) in snapshot_backfill_jobs {
561            let committed_epoch =
562                resolve_jobs_committed_epoch(state_table_committed_epochs, [&info]);
563            if committed_epoch == barrier_info.prev_epoch() {
564                info!(
565                    "recovered creating snapshot backfill job {} catch up with upstream already",
566                    job_id
567                );
568                background_mviews
569                    .try_insert(job_id, (definition, stream_job_fragments))
570                    .expect("non-duplicate");
571                database_jobs
572                    .try_insert(job_id, info)
573                    .expect("non-duplicate");
574                continue;
575            }
576            let snapshot_backfill_info = StreamFragmentGraph::collect_snapshot_backfill_info_impl(
577                stream_job_fragments
578                    .fragments()
579                    .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
580            )?
581            .0
582            .ok_or_else(|| {
583                anyhow!(
584                    "recovered snapshot backfill job {} has no snapshot backfill info",
585                    job_id
586                )
587            })?;
588            let mut snapshot_epoch = None;
589            let upstream_table_ids: HashSet<_> = snapshot_backfill_info
590                .upstream_mv_table_id_to_backfill_epoch
591                .keys()
592                .cloned()
593                .collect();
594            for (upstream_table_id, epoch) in
595                snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
596            {
597                let epoch = epoch.ok_or_else(|| anyhow!("recovered snapshot backfill job {} to upstream {} has not set snapshot epoch", job_id, upstream_table_id))?;
598                let snapshot_epoch = snapshot_epoch.get_or_insert(epoch);
599                if *snapshot_epoch != epoch {
600                    return Err(anyhow!("snapshot epoch {} to upstream {} different to snapshot epoch {} to previous upstream", epoch, upstream_table_id, snapshot_epoch).into());
601                }
602            }
603            let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
604                anyhow!(
605                    "snapshot backfill job {} has not set snapshot epoch",
606                    job_id
607                )
608            })?;
609            for upstream_table_id in &upstream_table_ids {
610                subscription_info
611                    .mv_depended_subscriptions
612                    .entry(*upstream_table_id)
613                    .or_default()
614                    .try_insert(job_id.into(), max(snapshot_epoch, committed_epoch))
615                    .expect("non-duplicate");
616            }
617            ongoing_snapshot_backfill_jobs
618                .try_insert(
619                    job_id,
620                    (
621                        info,
622                        definition,
623                        stream_job_fragments,
624                        upstream_table_ids,
625                        committed_epoch,
626                        snapshot_epoch,
627                    ),
628                )
629                .expect("non-duplicated");
630        }
631
632        let node_to_collect = {
633            let node_actors = edges.collect_actors_to_create(database_jobs.values().flatten().map(
634                move |fragment_info| {
635                    (
636                        fragment_info.fragment_id,
637                        &fragment_info.nodes,
638                        fragment_info.actors.iter().map(move |(actor_id, actor)| {
639                            (
640                                stream_actors.get(actor_id).expect("should exist"),
641                                actor.worker_id,
642                            )
643                        }),
644                    )
645                },
646            ));
647
648            let node_to_collect = self.inject_barrier(
649                database_id,
650                None,
651                Some(mutation.clone()),
652                &barrier_info,
653                database_jobs.values().flatten(),
654                database_jobs.values().flatten(),
655                Some(node_actors),
656                (&subscription_info).into_iter().collect(),
657                vec![],
658            )?;
659            debug!(
660                ?node_to_collect,
661                database_id = database_id.database_id,
662                "inject initial barrier"
663            );
664            node_to_collect
665        };
666
667        let tracker = CreateMviewProgressTracker::recover(
668            background_mviews
669                .iter()
670                .map(|(table_id, (definition, stream_job_fragments))| {
671                    (*table_id, (definition.clone(), stream_job_fragments))
672                }),
673            hummock_version_stats,
674        );
675
676        let mut creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl> =
677            HashMap::new();
678        for (
679            job_id,
680            (
681                info,
682                definition,
683                stream_job_fragments,
684                upstream_table_ids,
685                committed_epoch,
686                snapshot_epoch,
687            ),
688        ) in ongoing_snapshot_backfill_jobs
689        {
690            let node_actors =
691                edges.collect_actors_to_create(info.fragment_infos().map(move |fragment_info| {
692                    (
693                        fragment_info.fragment_id,
694                        &fragment_info.nodes,
695                        fragment_info.actors.iter().map(move |(actor_id, actor)| {
696                            (
697                                stream_actors.get(actor_id).expect("should exist"),
698                                actor.worker_id,
699                            )
700                        }),
701                    )
702                }));
703
704            creating_streaming_job_controls.insert(
705                job_id,
706                CreatingStreamingJobControl::recover(
707                    database_id,
708                    job_id,
709                    definition,
710                    upstream_table_ids,
711                    &database_job_log_epochs,
712                    snapshot_epoch,
713                    committed_epoch,
714                    barrier_info.curr_epoch.value().0,
715                    info,
716                    stream_job_fragments,
717                    hummock_version_stats,
718                    node_actors,
719                    mutation.clone(),
720                    self,
721                )?,
722            );
723        }
724
725        let committed_epoch = barrier_info.prev_epoch();
726        let new_epoch = barrier_info.curr_epoch;
727        let mut database = InflightDatabaseInfo::empty();
728        database_jobs
729            .into_values()
730            .for_each(|job| database.extend(job));
731        let database_state =
732            BarrierWorkerState::recovery(new_epoch, database, subscription_info, is_paused);
733        Ok(DatabaseInitialBarrierCollector {
734            database_id,
735            node_to_collect,
736            database_state,
737            create_mview_tracker: tracker,
738            creating_streaming_job_controls,
739            committed_epoch,
740        })
741    }
742
743    pub(super) fn inject_command_ctx_barrier(
744        &mut self,
745        database_id: DatabaseId,
746        command: Option<&Command>,
747        barrier_info: &BarrierInfo,
748        is_paused: bool,
749        pre_applied_graph_info: &InflightDatabaseInfo,
750        applied_graph_info: &InflightDatabaseInfo,
751        edges: &mut Option<FragmentEdgeBuildResult>,
752    ) -> MetaResult<NodeToCollect> {
753        let mutation = command.and_then(|c| c.to_mutation(is_paused, edges, self));
754        let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
755            add.subscriptions_to_add.clone()
756        } else {
757            vec![]
758        };
759        let subscriptions_to_remove = if let Some(Mutation::DropSubscriptions(drop)) = &mutation {
760            drop.info.clone()
761        } else {
762            vec![]
763        };
764        self.inject_barrier(
765            database_id,
766            None,
767            mutation,
768            barrier_info,
769            pre_applied_graph_info.fragment_infos(),
770            applied_graph_info.fragment_infos(),
771            command
772                .as_ref()
773                .map(|command| command.actors_to_create(pre_applied_graph_info, edges, self))
774                .unwrap_or_default(),
775            subscriptions_to_add,
776            subscriptions_to_remove,
777        )
778    }
779
780    pub(super) fn inject_barrier<'a>(
781        &mut self,
782        database_id: DatabaseId,
783        creating_table_id: Option<TableId>,
784        mutation: Option<Mutation>,
785        barrier_info: &BarrierInfo,
786        pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
787        applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
788        mut new_actors: Option<StreamJobActorsToCreate>,
789        subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
790        subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
791    ) -> MetaResult<NodeToCollect> {
792        fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
793            "inject_barrier_err"
794        ));
795
796        let partial_graph_id = to_partial_graph_id(creating_table_id);
797
798        let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
799
800        for worker_id in node_actors.keys() {
801            if !self.connected_nodes.contains_key(worker_id) {
802                return Err(anyhow!("unconnected worker node {}", worker_id).into());
803            }
804        }
805
806        let table_ids_to_sync: HashSet<_> =
807            InflightFragmentInfo::existing_table_ids(applied_graph_info)
808                .map(|table_id| table_id.table_id)
809                .collect();
810
811        let mut node_need_collect = HashMap::new();
812
813        self.connected_nodes
814            .iter()
815            .try_for_each(|(node_id, node)| {
816                let actor_ids_to_collect = node_actors
817                    .get(node_id)
818                    .map(|actors| actors.iter().cloned())
819                    .into_iter()
820                    .flatten()
821                    .collect_vec();
822                let is_empty = actor_ids_to_collect.is_empty();
823                {
824                    let mutation = mutation.clone();
825                    let barrier = Barrier {
826                        epoch: Some(risingwave_pb::data::Epoch {
827                            curr: barrier_info.curr_epoch.value().0,
828                            prev: barrier_info.prev_epoch(),
829                        }),
830                        mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
831                        tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
832                            .to_protobuf(),
833                        kind: barrier_info.kind.to_protobuf() as i32,
834                        passed_actors: vec![],
835                    };
836
837                    node.handle
838                        .request_sender
839                        .send(StreamingControlStreamRequest {
840                            request: Some(
841                                streaming_control_stream_request::Request::InjectBarrier(
842                                    InjectBarrierRequest {
843                                        request_id: Uuid::new_v4().to_string(),
844                                        barrier: Some(barrier),
845                                        database_id: database_id.database_id,
846                                        actor_ids_to_collect,
847                                        table_ids_to_sync: table_ids_to_sync
848                                            .iter()
849                                            .cloned()
850                                            .collect(),
851                                        partial_graph_id,
852                                        actors_to_build: new_actors
853                                            .as_mut()
854                                            .map(|new_actors| new_actors.remove(&(*node_id as _)))
855                                            .into_iter()
856                                            .flatten()
857                                            .flatten()
858                                            .map(|(fragment_id, (node, actors))| {
859                                                FragmentBuildActorInfo {
860                                                    fragment_id,
861                                                    node: Some(node),
862                                                    actors: actors
863                                                        .into_iter()
864                                                        .map(|(actor, upstreams, dispatchers)| {
865                                                            BuildActorInfo {
866                                                                actor_id: actor.actor_id,
867                                                                fragment_upstreams: upstreams
868                                                                    .into_iter()
869                                                                    .map(|(fragment_id, upstreams)| {
870                                                                        (
871                                                                            fragment_id,
872                                                                            UpstreamActors {
873                                                                                actors: upstreams
874                                                                                    .into_values()
875                                                                                    .collect(),
876                                                                            },
877                                                                        )
878                                                                    })
879                                                                    .collect(),
880                                                                dispatchers,
881                                                                vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
882                                                                mview_definition: actor.mview_definition,
883                                                                expr_context: actor.expr_context,
884                                                            }
885                                                        })
886                                                        .collect(),
887                                                }
888                                            })
889                                            .collect(),
890                                        subscriptions_to_add: subscriptions_to_add.clone(),
891                                        subscriptions_to_remove: subscriptions_to_remove.clone(),
892                                    },
893                                ),
894                            ),
895                        })
896                        .map_err(|_| {
897                            MetaError::from(anyhow!(
898                                "failed to send request to {} {:?}",
899                                node.worker_id,
900                                node.host
901                            ))
902                        })?;
903
904                    node_need_collect.insert(*node_id as WorkerId, is_empty);
905                    Result::<_, MetaError>::Ok(())
906                }
907            })
908            .inspect_err(|e| {
909                // Record failure in event log.
910                use risingwave_pb::meta::event_log;
911                let event = event_log::EventInjectBarrierFail {
912                    prev_epoch: barrier_info.prev_epoch(),
913                    cur_epoch: barrier_info.curr_epoch.value().0,
914                    error: e.to_report_string(),
915                };
916                self.env
917                    .event_log_manager_ref()
918                    .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
919            })?;
920        Ok(node_need_collect)
921    }
922
923    pub(super) fn add_partial_graph(
924        &mut self,
925        database_id: DatabaseId,
926        creating_job_id: Option<TableId>,
927    ) {
928        let partial_graph_id = to_partial_graph_id(creating_job_id);
929        self.connected_nodes.iter().for_each(|(_, node)| {
930            if node
931                .handle
932                .request_sender
933                .send(StreamingControlStreamRequest {
934                    request: Some(
935                        streaming_control_stream_request::Request::CreatePartialGraph(
936                            CreatePartialGraphRequest {
937                                database_id: database_id.database_id,
938                                partial_graph_id,
939                            },
940                        ),
941                    ),
942                }).is_err() {
943                warn!(%database_id, ?creating_job_id, worker_id = node.worker_id, "fail to add partial graph to worker")
944            }
945        });
946    }
947
948    pub(super) fn remove_partial_graph(
949        &mut self,
950        database_id: DatabaseId,
951        creating_job_ids: Vec<TableId>,
952    ) {
953        if creating_job_ids.is_empty() {
954            return;
955        }
956        let partial_graph_ids = creating_job_ids
957            .into_iter()
958            .map(|job_id| to_partial_graph_id(Some(job_id)))
959            .collect_vec();
960        self.connected_nodes.iter().for_each(|(_, node)| {
961            if node.handle
962                .request_sender
963                .send(StreamingControlStreamRequest {
964                    request: Some(
965                        streaming_control_stream_request::Request::RemovePartialGraph(
966                            RemovePartialGraphRequest {
967                                partial_graph_ids: partial_graph_ids.clone(),
968                                database_id: database_id.database_id,
969                            },
970                        ),
971                    ),
972                })
973                .is_err()
974            {
975                warn!(worker_id = node.worker_id,node = ?node.host,"failed to send remove partial graph request");
976            }
977        })
978    }
979
980    pub(super) fn reset_database(
981        &mut self,
982        database_id: DatabaseId,
983        reset_request_id: u32,
984    ) -> HashSet<WorkerId> {
985        self.connected_nodes
986            .iter()
987            .filter_map(|(worker_id, node)| {
988                if node
989                    .handle
990                    .request_sender
991                    .send(StreamingControlStreamRequest {
992                        request: Some(streaming_control_stream_request::Request::ResetDatabase(
993                            ResetDatabaseRequest {
994                                database_id: database_id.database_id,
995                                reset_request_id,
996                            },
997                        )),
998                    })
999                    .is_err()
1000                {
1001                    warn!(worker_id, node = ?node.host,"failed to send reset database request");
1002                    None
1003                } else {
1004                    Some(*worker_id)
1005                }
1006            })
1007            .collect()
1008    }
1009}
1010
1011impl GlobalBarrierWorkerContextImpl {
1012    pub(super) async fn new_control_stream_impl(
1013        &self,
1014        node: &WorkerNode,
1015        init_request: &PbInitRequest,
1016    ) -> MetaResult<StreamingControlHandle> {
1017        let handle = self
1018            .env
1019            .stream_client_pool()
1020            .get(node)
1021            .await?
1022            .start_streaming_control(init_request.clone())
1023            .await?;
1024        Ok(handle)
1025    }
1026}
1027
1028pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
1029    message: &str,
1030    errors: impl IntoIterator<Item = (WorkerId, E)>,
1031) -> MetaError {
1032    use std::error::request_value;
1033    use std::fmt::Write;
1034
1035    use risingwave_common::error::tonic::extra::Score;
1036
1037    let errors = errors.into_iter().collect_vec();
1038
1039    if errors.is_empty() {
1040        return anyhow!(message.to_owned()).into();
1041    }
1042
1043    // Create the error from the single error.
1044    let single_error = |(worker_id, e)| {
1045        anyhow::Error::from(e)
1046            .context(format!("{message}, in worker node {worker_id}"))
1047            .into()
1048    };
1049
1050    if errors.len() == 1 {
1051        return single_error(errors.into_iter().next().unwrap());
1052    }
1053
1054    // Find the error with the highest score.
1055    let max_score = errors
1056        .iter()
1057        .filter_map(|(_, e)| request_value::<Score>(e))
1058        .max();
1059
1060    if let Some(max_score) = max_score {
1061        let mut errors = errors;
1062        let max_scored = errors
1063            .extract_if(.., |(_, e)| request_value::<Score>(e) == Some(max_score))
1064            .next()
1065            .unwrap();
1066
1067        return single_error(max_scored);
1068    }
1069
1070    // The errors do not have scores, so simply concatenate them.
1071    let concat: String = errors
1072        .into_iter()
1073        .fold(format!("{message}: "), |mut s, (w, e)| {
1074            write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
1075            s
1076        });
1077    anyhow!(concat).into()
1078}