1use std::collections::{HashMap, HashSet};
16use std::error::Error;
17use std::fmt::{Debug, Formatter};
18use std::future::poll_fn;
19use std::task::{Context, Poll};
20use std::time::Duration;
21
22use anyhow::anyhow;
23use fail::fail_point;
24use futures::StreamExt;
25use futures::future::join_all;
26use itertools::Itertools;
27use risingwave_common::catalog::{DatabaseId, TableId};
28use risingwave_common::util::epoch::Epoch;
29use risingwave_common::util::tracing::TracingContext;
30use risingwave_connector::source::SplitImpl;
31use risingwave_meta_model::WorkerId;
32use risingwave_pb::common::{ActorInfo, WorkerNode};
33use risingwave_pb::hummock::HummockVersionStats;
34use risingwave_pb::stream_plan::barrier_mutation::Mutation;
35use risingwave_pb::stream_plan::{AddMutation, Barrier, BarrierMutation, SubscriptionUpstreamInfo};
36use risingwave_pb::stream_service::inject_barrier_request::build_actor_info::UpstreamActors;
37use risingwave_pb::stream_service::inject_barrier_request::{
38 BuildActorInfo, FragmentBuildActorInfo,
39};
40use risingwave_pb::stream_service::streaming_control_stream_request::{
41 CreatePartialGraphRequest, PbDatabaseInitialPartialGraph, PbInitRequest, PbInitialPartialGraph,
42 RemovePartialGraphRequest, ResetDatabaseRequest,
43};
44use risingwave_pb::stream_service::{
45 BarrierCompleteResponse, InjectBarrierRequest, StreamingControlStreamRequest,
46 streaming_control_stream_request, streaming_control_stream_response,
47};
48use risingwave_rpc_client::StreamingControlHandle;
49use thiserror_ext::AsReport;
50use tokio::time::sleep;
51use tokio_retry::strategy::ExponentialBackoff;
52use tracing::{debug, error, info, warn};
53use uuid::Uuid;
54
55use super::{BarrierKind, Command, InflightSubscriptionInfo, TracedEpoch};
56use crate::barrier::checkpoint::{BarrierWorkerState, DatabaseCheckpointControl};
57use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerContextImpl};
58use crate::barrier::edge_builder::FragmentEdgeBuildResult;
59use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo};
60use crate::barrier::progress::CreateMviewProgressTracker;
61use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
62use crate::controller::fragment::InflightFragmentInfo;
63use crate::manager::MetaSrvEnv;
64use crate::model::{ActorId, StreamActor, StreamJobActorsToCreate, StreamJobFragments};
65use crate::stream::build_actor_connector_splits;
66use crate::{MetaError, MetaResult};
67
68fn to_partial_graph_id(job_id: Option<TableId>) -> u32 {
69 job_id
70 .map(|table| {
71 assert_ne!(table.table_id, u32::MAX);
72 table.table_id
73 })
74 .unwrap_or(u32::MAX)
75}
76
77pub(super) fn from_partial_graph_id(partial_graph_id: u32) -> Option<TableId> {
78 if partial_graph_id == u32::MAX {
79 None
80 } else {
81 Some(TableId::new(partial_graph_id))
82 }
83}
84
85struct ControlStreamNode {
86 worker: WorkerNode,
87 handle: StreamingControlHandle,
88}
89
90pub(super) struct ControlStreamManager {
91 nodes: HashMap<WorkerId, ControlStreamNode>,
92 pub(crate) env: MetaSrvEnv,
93}
94
95impl ControlStreamManager {
96 pub(super) fn new(env: MetaSrvEnv) -> Self {
97 Self {
98 nodes: Default::default(),
99 env,
100 }
101 }
102
103 pub(super) fn contains_worker(&self, worker_id: WorkerId) -> bool {
104 self.nodes.contains_key(&worker_id)
105 }
106
107 pub(super) async fn try_add_worker(
108 &mut self,
109 node: WorkerNode,
110 inflight_infos: impl Iterator<
111 Item = (
112 DatabaseId,
113 &InflightSubscriptionInfo,
114 &InflightDatabaseInfo,
115 impl Iterator<Item = &InflightStreamingJobInfo>,
116 ),
117 >,
118 term_id: String,
119 context: &impl GlobalBarrierWorkerContext,
120 ) {
121 let node_id = node.id as WorkerId;
122 if self.nodes.contains_key(&node_id) {
123 warn!(id = node.id, host = ?node.host, "node already exists");
124 return;
125 }
126 let node_host = node.host.clone().unwrap();
127
128 let init_request = self.collect_init_request(inflight_infos, term_id);
129 match context.new_control_stream(&node, &init_request).await {
130 Ok(handle) => {
131 assert!(
132 self.nodes
133 .insert(
134 node_id,
135 ControlStreamNode {
136 worker: node.clone(),
137 handle,
138 }
139 )
140 .is_none()
141 );
142 info!(?node_host, "add control stream worker");
143 }
144 Err(e) => {
145 error!(err = %e.as_report(), ?node_host, "fail to create worker node");
146 }
147 }
148 }
149
150 pub(super) async fn add_worker(
151 &mut self,
152 node: WorkerNode,
153 inflight_infos: impl Iterator<
154 Item = (
155 DatabaseId,
156 &InflightSubscriptionInfo,
157 &InflightDatabaseInfo,
158 impl Iterator<Item = &InflightStreamingJobInfo>,
159 ),
160 >,
161 term_id: String,
162 context: &impl GlobalBarrierWorkerContext,
163 ) {
164 let node_id = node.id as WorkerId;
165 if self.nodes.contains_key(&node_id) {
166 warn!(id = node.id, host = ?node.host, "node already exists");
167 return;
168 }
169 let node_host = node.host.clone().unwrap();
170 let mut backoff = ExponentialBackoff::from_millis(100)
171 .max_delay(Duration::from_secs(3))
172 .factor(5);
173 let init_request = self.collect_init_request(inflight_infos, term_id);
174 const MAX_RETRY: usize = 5;
175 for i in 1..=MAX_RETRY {
176 match context.new_control_stream(&node, &init_request).await {
177 Ok(handle) => {
178 assert!(
179 self.nodes
180 .insert(
181 node_id,
182 ControlStreamNode {
183 worker: node.clone(),
184 handle,
185 }
186 )
187 .is_none()
188 );
189 info!(?node_host, "add control stream worker");
190 return;
191 }
192 Err(e) => {
193 let delay = backoff.next().unwrap();
196 error!(attempt = i, backoff_delay = ?delay, err = %e.as_report(), ?node_host, "fail to resolve worker node address");
197 sleep(delay).await;
198 }
199 }
200 }
201 error!(?node_host, "fail to create worker node after retry");
202 }
203
204 pub(super) async fn reset(
205 &mut self,
206 nodes: &HashMap<WorkerId, WorkerNode>,
207 term_id: String,
208 context: &impl GlobalBarrierWorkerContext,
209 ) -> HashSet<WorkerId> {
210 let init_request = PbInitRequest {
211 databases: vec![],
212 term_id,
213 };
214 let init_request = &init_request;
215 let nodes = join_all(nodes.iter().map(|(worker_id, node)| async move {
216 let result = context.new_control_stream(node, init_request).await;
217 (*worker_id, node.clone(), result)
218 }))
219 .await;
220 self.nodes.clear();
221 let mut failed_workers = HashSet::new();
222 for (worker_id, node, result) in nodes {
223 match result {
224 Ok(handle) => {
225 assert!(
226 self.nodes
227 .insert(
228 worker_id,
229 ControlStreamNode {
230 worker: node,
231 handle
232 }
233 )
234 .is_none()
235 );
236 }
237 Err(e) => {
238 failed_workers.insert(worker_id);
239 warn!(
240 e = %e.as_report(),
241 worker_id,
242 ?node,
243 "failed to connect to node"
244 )
245 }
246 }
247 }
248
249 failed_workers
250 }
251
252 pub(super) fn clear(&mut self) {
254 *self = Self::new(self.env.clone());
255 }
256
257 fn poll_next_response(
258 &mut self,
259 cx: &mut Context<'_>,
260 ) -> Poll<(
261 WorkerId,
262 MetaResult<streaming_control_stream_response::Response>,
263 )> {
264 if self.nodes.is_empty() {
265 return Poll::Pending;
266 }
267 let mut poll_result: Poll<(WorkerId, MetaResult<_>)> = Poll::Pending;
268 {
269 for (worker_id, node) in &mut self.nodes {
270 match node.handle.response_stream.poll_next_unpin(cx) {
271 Poll::Ready(result) => {
272 poll_result = Poll::Ready((
273 *worker_id,
274 result
275 .ok_or_else(|| anyhow!("end of stream").into())
276 .and_then(|result| {
277 result.map_err(Into::<MetaError>::into).and_then(|resp| {
278 match resp
279 .response
280 .ok_or_else(||anyhow!("empty response"))?
281 {
282 streaming_control_stream_response::Response::Shutdown(_) => Err(anyhow!(
283 "worker node {worker_id} is shutting down"
284 )
285 .into()),
286 streaming_control_stream_response::Response::Init(_) => {
287 Err(anyhow!("get unexpected init response").into())
289 }
290 resp => Ok(resp),
291 }
292 })
293 })
294 ));
295 break;
296 }
297 Poll::Pending => {
298 continue;
299 }
300 }
301 }
302 };
303
304 if let Poll::Ready((worker_id, Err(err))) = &poll_result {
305 let node = self
306 .nodes
307 .remove(worker_id)
308 .expect("should exist when get shutdown resp");
309 warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream");
310 }
311
312 poll_result
313 }
314
315 pub(super) async fn next_response(
316 &mut self,
317 ) -> (
318 WorkerId,
319 MetaResult<streaming_control_stream_response::Response>,
320 ) {
321 poll_fn(|cx| self.poll_next_response(cx)).await
322 }
323
324 fn collect_init_request(
325 &self,
326 initial_inflight_infos: impl Iterator<
327 Item = (
328 DatabaseId,
329 &InflightSubscriptionInfo,
330 &InflightDatabaseInfo,
331 impl Iterator<Item = &InflightStreamingJobInfo>,
332 ),
333 >,
334 term_id: String,
335 ) -> PbInitRequest {
336 PbInitRequest {
337 databases: initial_inflight_infos
338 .map(
339 |(
340 database_id,
341 subscriptions,
342 database_inflight_info,
343 creating_job_inflight_info,
344 )| {
345 fn actor_infos(
346 nodes: &HashMap<WorkerId, ControlStreamNode>,
347 fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
348 ) -> Vec<ActorInfo> {
349 {
350 fragment_infos
351 .flat_map(|fragment| {
352 fragment.actors.iter().map(|(actor_id, actor)| {
353 let host_addr = nodes
354 .get(&actor.worker_id)
355 .expect("worker should exist for inflight actor")
356 .worker
357 .host
358 .clone()
359 .expect("should exist");
360 ActorInfo {
361 actor_id: *actor_id,
362 host: Some(host_addr),
363 }
364 })
365 })
366 .collect()
367 }
368 }
369 let mut graphs = vec![PbInitialPartialGraph {
370 partial_graph_id: to_partial_graph_id(None),
371 subscriptions: subscriptions.into_iter().collect_vec(),
372 actor_infos: actor_infos(
373 &self.nodes,
374 database_inflight_info.fragment_infos(),
375 ),
376 }];
377 graphs.extend(creating_job_inflight_info.map(|job| {
378 PbInitialPartialGraph {
379 partial_graph_id: to_partial_graph_id(Some(job.job_id)),
380 subscriptions: vec![],
381 actor_infos: actor_infos(&self.nodes, job.fragment_infos()),
382 }
383 }));
384 PbDatabaseInitialPartialGraph {
385 database_id: database_id.database_id,
386 graphs,
387 }
388 },
389 )
390 .collect(),
391 term_id,
392 }
393 }
394}
395
396pub(super) struct DatabaseInitialBarrierCollector {
397 database_id: DatabaseId,
398 node_to_collect: NodeToCollect,
399 database_state: BarrierWorkerState,
400 create_mview_tracker: CreateMviewProgressTracker,
401 committed_epoch: u64,
402}
403
404impl Debug for DatabaseInitialBarrierCollector {
405 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
406 f.debug_struct("DatabaseInitialBarrierCollector")
407 .field("database_id", &self.database_id)
408 .field("node_to_collect", &self.node_to_collect)
409 .finish()
410 }
411}
412
413impl DatabaseInitialBarrierCollector {
414 pub(super) fn is_collected(&self) -> bool {
415 self.node_to_collect.is_empty()
416 }
417
418 pub(super) fn database_state(&self) -> &BarrierWorkerState {
419 &self.database_state
420 }
421
422 pub(super) fn collect_resp(&mut self, resp: BarrierCompleteResponse) {
423 assert_eq!(self.database_id.database_id, resp.database_id);
424 assert_eq!(resp.epoch, self.committed_epoch);
425 assert!(
426 self.node_to_collect
427 .remove(&(resp.worker_id as _))
428 .is_some()
429 );
430 }
431
432 pub(super) fn finish(self) -> DatabaseCheckpointControl {
433 assert!(self.is_collected());
434 DatabaseCheckpointControl::recovery(
435 self.database_id,
436 self.create_mview_tracker,
437 self.database_state,
438 self.committed_epoch,
439 )
440 }
441
442 pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
443 is_valid_after_worker_err(&mut self.node_to_collect, worker_id)
444 }
445}
446
447impl ControlStreamManager {
448 #[expect(clippy::too_many_arguments)]
450 pub(super) fn inject_database_initial_barrier(
451 &mut self,
452 database_id: DatabaseId,
453 info: InflightDatabaseInfo,
454 state_table_committed_epochs: &mut HashMap<TableId, u64>,
455 edges: &mut FragmentEdgeBuildResult,
456 stream_actors: &HashMap<ActorId, StreamActor>,
457 source_splits: &mut HashMap<ActorId, Vec<SplitImpl>>,
458 background_jobs: &mut HashMap<TableId, (String, StreamJobFragments)>,
459 subscription_info: InflightSubscriptionInfo,
460 is_paused: bool,
461 hummock_version_stats: &HummockVersionStats,
462 ) -> MetaResult<DatabaseInitialBarrierCollector> {
463 self.add_partial_graph(database_id, None);
464 let source_split_assignments = info
465 .fragment_infos()
466 .flat_map(|info| info.actors.keys())
467 .filter_map(|actor_id| {
468 let actor_id = *actor_id as ActorId;
469 source_splits
470 .remove(&actor_id)
471 .map(|splits| (actor_id, splits))
472 })
473 .collect();
474 let mutation = Mutation::Add(AddMutation {
475 actor_dispatchers: Default::default(),
477 added_actors: Default::default(),
478 actor_splits: build_actor_connector_splits(&source_split_assignments),
479 pause: is_paused,
480 subscriptions_to_add: Default::default(),
481 });
482
483 let mut epochs = info.existing_table_ids().map(|table_id| {
484 (
485 table_id,
486 state_table_committed_epochs
487 .remove(&table_id)
488 .expect("should exist"),
489 )
490 });
491 let (first_table_id, prev_epoch) = epochs.next().expect("non-empty");
492 for (table_id, epoch) in epochs {
493 assert_eq!(
494 prev_epoch, epoch,
495 "{} has different committed epoch to {}",
496 first_table_id, table_id
497 );
498 }
499 let prev_epoch = TracedEpoch::new(Epoch(prev_epoch));
500 let curr_epoch = prev_epoch.next();
502 let barrier_info = BarrierInfo {
503 prev_epoch,
504 curr_epoch,
505 kind: BarrierKind::Initial,
506 };
507
508 let node_actors =
509 edges.collect_actors_to_create(info.fragment_infos().map(move |fragment_info| {
510 (
511 fragment_info.fragment_id,
512 &fragment_info.nodes,
513 fragment_info.actors.iter().map(move |(actor_id, actor)| {
514 (
515 stream_actors.get(actor_id).expect("should exist"),
516 actor.worker_id,
517 )
518 }),
519 )
520 }));
521
522 let background_mviews = info.job_ids().filter_map(|job_id| {
523 background_jobs
524 .get(&job_id)
525 .map(|(definition, stream_job_fragments)| {
526 (job_id, (definition.clone(), stream_job_fragments))
527 })
528 });
529 let tracker = CreateMviewProgressTracker::recover(background_mviews, hummock_version_stats);
530
531 let node_to_collect = self.inject_barrier(
532 database_id,
533 None,
534 Some(mutation),
535 &barrier_info,
536 info.fragment_infos(),
537 info.fragment_infos(),
538 Some(node_actors),
539 (&subscription_info).into_iter().collect(),
540 vec![],
541 )?;
542 debug!(
543 ?node_to_collect,
544 database_id = database_id.database_id,
545 "inject initial barrier"
546 );
547
548 let committed_epoch = barrier_info.prev_epoch();
549 let new_epoch = barrier_info.curr_epoch;
550 let database_state =
551 BarrierWorkerState::recovery(new_epoch, info, subscription_info, is_paused);
552 Ok(DatabaseInitialBarrierCollector {
553 database_id,
554 node_to_collect,
555 database_state,
556 create_mview_tracker: tracker,
557 committed_epoch,
558 })
559 }
560
561 pub(super) fn inject_command_ctx_barrier(
562 &mut self,
563 database_id: DatabaseId,
564 command: Option<&Command>,
565 barrier_info: &BarrierInfo,
566 is_paused: bool,
567 pre_applied_graph_info: &InflightDatabaseInfo,
568 applied_graph_info: &InflightDatabaseInfo,
569 edges: &mut Option<FragmentEdgeBuildResult>,
570 ) -> MetaResult<NodeToCollect> {
571 let mutation = command.and_then(|c| c.to_mutation(is_paused, edges));
572 let subscriptions_to_add = if let Some(Mutation::Add(add)) = &mutation {
573 add.subscriptions_to_add.clone()
574 } else {
575 vec![]
576 };
577 let subscriptions_to_remove = if let Some(Mutation::DropSubscriptions(drop)) = &mutation {
578 drop.info.clone()
579 } else {
580 vec![]
581 };
582 self.inject_barrier(
583 database_id,
584 None,
585 mutation,
586 barrier_info,
587 pre_applied_graph_info.fragment_infos(),
588 applied_graph_info.fragment_infos(),
589 command
590 .as_ref()
591 .map(|command| command.actors_to_create(pre_applied_graph_info, edges))
592 .unwrap_or_default(),
593 subscriptions_to_add,
594 subscriptions_to_remove,
595 )
596 }
597
598 pub(super) fn inject_barrier<'a>(
599 &mut self,
600 database_id: DatabaseId,
601 creating_table_id: Option<TableId>,
602 mutation: Option<Mutation>,
603 barrier_info: &BarrierInfo,
604 pre_applied_graph_info: impl IntoIterator<Item = &InflightFragmentInfo>,
605 applied_graph_info: impl IntoIterator<Item = &'a InflightFragmentInfo> + 'a,
606 mut new_actors: Option<StreamJobActorsToCreate>,
607 subscriptions_to_add: Vec<SubscriptionUpstreamInfo>,
608 subscriptions_to_remove: Vec<SubscriptionUpstreamInfo>,
609 ) -> MetaResult<NodeToCollect> {
610 fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
611 "inject_barrier_err"
612 ));
613
614 let partial_graph_id = to_partial_graph_id(creating_table_id);
615
616 let node_actors = InflightFragmentInfo::actor_ids_to_collect(pre_applied_graph_info);
617
618 for worker_id in node_actors.keys() {
619 if !self.nodes.contains_key(worker_id) {
620 return Err(anyhow!("unconnected worker node {}", worker_id).into());
621 }
622 }
623
624 let table_ids_to_sync: HashSet<_> =
625 InflightFragmentInfo::existing_table_ids(applied_graph_info)
626 .map(|table_id| table_id.table_id)
627 .collect();
628
629 let mut node_need_collect = HashMap::new();
630 let new_actors_location_to_broadcast = new_actors
631 .iter()
632 .flatten()
633 .flat_map(|(worker_id, actor_infos)| {
634 actor_infos
635 .iter()
636 .flat_map(|(_, (_, actors))| actors.iter())
637 .map(|actor_info| ActorInfo {
638 actor_id: actor_info.0.actor_id,
639 host: self
640 .nodes
641 .get(worker_id)
642 .expect("have checked exist previously")
643 .worker
644 .host
645 .clone(),
646 })
647 })
648 .collect_vec();
649
650 self.nodes
651 .iter()
652 .try_for_each(|(node_id, node)| {
653 let actor_ids_to_collect = node_actors
654 .get(node_id)
655 .map(|actors| actors.iter().cloned())
656 .into_iter()
657 .flatten()
658 .collect_vec();
659 let is_empty = actor_ids_to_collect.is_empty();
660 {
661 let mutation = mutation.clone();
662 let barrier = Barrier {
663 epoch: Some(risingwave_pb::data::Epoch {
664 curr: barrier_info.curr_epoch.value().0,
665 prev: barrier_info.prev_epoch(),
666 }),
667 mutation: mutation.clone().map(|_| BarrierMutation { mutation }),
668 tracing_context: TracingContext::from_span(barrier_info.curr_epoch.span())
669 .to_protobuf(),
670 kind: barrier_info.kind.to_protobuf() as i32,
671 passed_actors: vec![],
672 };
673
674 node.handle
675 .request_sender
676 .send(StreamingControlStreamRequest {
677 request: Some(
678 streaming_control_stream_request::Request::InjectBarrier(
679 InjectBarrierRequest {
680 request_id: Uuid::new_v4().to_string(),
681 barrier: Some(barrier),
682 database_id: database_id.database_id,
683 actor_ids_to_collect,
684 table_ids_to_sync: table_ids_to_sync
685 .iter()
686 .cloned()
687 .collect(),
688 partial_graph_id,
689 broadcast_info: new_actors_location_to_broadcast.clone(),
690 actors_to_build: new_actors
691 .as_mut()
692 .map(|new_actors| new_actors.remove(&(*node_id as _)))
693 .into_iter()
694 .flatten()
695 .flatten()
696 .map(|(fragment_id, (node, actors))| {
697 FragmentBuildActorInfo {
698 fragment_id,
699 node: Some(node),
700 actors: actors
701 .into_iter()
702 .map(|(actor, upstreams, dispatchers)| {
703 BuildActorInfo {
704 actor_id: actor.actor_id,
705 fragment_upstreams: upstreams
706 .into_iter()
707 .map(|(fragment_id, upstreams)| {
708 (
709 fragment_id,
710 UpstreamActors {
711 actors: upstreams
712 .into_iter()
713 .collect(),
714 },
715 )
716 })
717 .collect(),
718 dispatchers,
719 vnode_bitmap: actor.vnode_bitmap.map(|bitmap| bitmap.to_protobuf()),
720 mview_definition: actor.mview_definition,
721 expr_context: actor.expr_context,
722 }
723 })
724 .collect(),
725 }
726 })
727 .collect(),
728 subscriptions_to_add: subscriptions_to_add.clone(),
729 subscriptions_to_remove: subscriptions_to_remove.clone(),
730 },
731 ),
732 ),
733 })
734 .map_err(|_| {
735 MetaError::from(anyhow!(
736 "failed to send request to {} {:?}",
737 node.worker.id,
738 node.worker.host
739 ))
740 })?;
741
742 node_need_collect.insert(*node_id as WorkerId, is_empty);
743 Result::<_, MetaError>::Ok(())
744 }
745 })
746 .inspect_err(|e| {
747 use risingwave_pb::meta::event_log;
749 let event = event_log::EventInjectBarrierFail {
750 prev_epoch: barrier_info.prev_epoch(),
751 cur_epoch: barrier_info.curr_epoch.value().0,
752 error: e.to_report_string(),
753 };
754 self.env
755 .event_log_manager_ref()
756 .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
757 })?;
758 Ok(node_need_collect)
759 }
760
761 pub(super) fn add_partial_graph(
762 &mut self,
763 database_id: DatabaseId,
764 creating_job_id: Option<TableId>,
765 ) {
766 let partial_graph_id = to_partial_graph_id(creating_job_id);
767 self.nodes.iter().for_each(|(_, node)| {
768 if node
769 .handle
770 .request_sender
771 .send(StreamingControlStreamRequest {
772 request: Some(
773 streaming_control_stream_request::Request::CreatePartialGraph(
774 CreatePartialGraphRequest {
775 database_id: database_id.database_id,
776 partial_graph_id,
777 },
778 ),
779 ),
780 }).is_err() {
781 warn!(%database_id, ?creating_job_id, worker_id = node.worker.id, "fail to add partial graph to worker")
782 }
783 });
784 }
785
786 pub(super) fn remove_partial_graph(
787 &mut self,
788 database_id: DatabaseId,
789 creating_job_ids: Vec<TableId>,
790 ) {
791 if creating_job_ids.is_empty() {
792 return;
793 }
794 let partial_graph_ids = creating_job_ids
795 .into_iter()
796 .map(|job_id| to_partial_graph_id(Some(job_id)))
797 .collect_vec();
798 self.nodes.iter().for_each(|(_, node)| {
799 if node.handle
800 .request_sender
801 .send(StreamingControlStreamRequest {
802 request: Some(
803 streaming_control_stream_request::Request::RemovePartialGraph(
804 RemovePartialGraphRequest {
805 partial_graph_ids: partial_graph_ids.clone(),
806 database_id: database_id.database_id,
807 },
808 ),
809 ),
810 })
811 .is_err()
812 {
813 warn!(worker_id = node.worker.id,node = ?node.worker.host,"failed to send remove partial graph request");
814 }
815 })
816 }
817
818 pub(super) fn reset_database(
819 &mut self,
820 database_id: DatabaseId,
821 reset_request_id: u32,
822 ) -> HashSet<WorkerId> {
823 self.nodes.iter().filter_map(|(worker_id, node)| {
824 if node.handle
825 .request_sender
826 .send(StreamingControlStreamRequest {
827 request: Some(
828 streaming_control_stream_request::Request::ResetDatabase(
829 ResetDatabaseRequest {
830 database_id: database_id.database_id,
831 reset_request_id,
832 },
833 ),
834 ),
835 })
836 .is_err()
837 {
838 warn!(worker_id, node = ?node.worker.host,"failed to send reset database request");
839 None
840 } else {
841 Some(*worker_id)
842 }
843 }).collect()
844 }
845}
846
847impl GlobalBarrierWorkerContextImpl {
848 pub(super) async fn new_control_stream_impl(
849 &self,
850 node: &WorkerNode,
851 init_request: &PbInitRequest,
852 ) -> MetaResult<StreamingControlHandle> {
853 let handle = self
854 .env
855 .stream_client_pool()
856 .get(node)
857 .await?
858 .start_streaming_control(init_request.clone())
859 .await?;
860 Ok(handle)
861 }
862}
863
864pub(super) fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
865 message: &str,
866 errors: impl IntoIterator<Item = (WorkerId, E)>,
867) -> MetaError {
868 use std::error::request_value;
869 use std::fmt::Write;
870
871 use risingwave_common::error::tonic::extra::Score;
872
873 let errors = errors.into_iter().collect_vec();
874
875 if errors.is_empty() {
876 return anyhow!(message.to_owned()).into();
877 }
878
879 let single_error = |(worker_id, e)| {
881 anyhow::Error::from(e)
882 .context(format!("{message}, in worker node {worker_id}"))
883 .into()
884 };
885
886 if errors.len() == 1 {
887 return single_error(errors.into_iter().next().unwrap());
888 }
889
890 let max_score = errors
892 .iter()
893 .filter_map(|(_, e)| request_value::<Score>(e))
894 .max();
895
896 if let Some(max_score) = max_score {
897 let mut errors = errors;
898 let max_scored = errors
899 .extract_if(.., |(_, e)| request_value::<Score>(e) == Some(max_score))
900 .next()
901 .unwrap();
902
903 return single_error(max_scored);
904 }
905
906 let concat: String = errors
908 .into_iter()
909 .fold(format!("{message}: "), |mut s, (w, e)| {
910 write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
911 s
912 });
913 anyhow!(concat).into()
914}