1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
17
18use anyhow::{Context, anyhow};
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
25use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId};
26use risingwave_common::util::stream_graph_visitor::{
27 visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_connector::source::SplitImpl;
30use risingwave_meta_model::actor::ActorStatus;
31use risingwave_meta_model::fragment::DistributionType;
32use risingwave_meta_model::object::ObjectType;
33use risingwave_meta_model::prelude::{
34 Actor, Fragment as FragmentModel, FragmentRelation, Sink, SourceSplits, StreamingJob,
35};
36use risingwave_meta_model::{
37 ActorId, ConnectorSplits, DatabaseId, DispatcherType, ExprContext, FragmentId, I32Array,
38 JobStatus, ObjectId, SchemaId, SinkId, SourceId, StreamNode, StreamingParallelism, TableId,
39 VnodeBitmap, WorkerId, actor, database, fragment, fragment_relation, object, sink, source,
40 source_splits, streaming_job, table,
41};
42use risingwave_meta_model_migration::{Alias, ExprTrait, OnConflict, SelectStatement, SimpleExpr};
43use risingwave_pb::catalog::PbTable;
44use risingwave_pb::common::PbActorLocation;
45use risingwave_pb::meta::subscribe_response::{
46 Info as NotificationInfo, Operation as NotificationOperation,
47};
48use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
49use risingwave_pb::meta::table_fragments::fragment::{
50 FragmentDistributionType, PbFragmentDistributionType,
51};
52use risingwave_pb::meta::table_fragments::{PbActorStatus, PbState};
53use risingwave_pb::meta::{FragmentWorkerSlotMapping, PbFragmentWorkerSlotMapping};
54use risingwave_pb::source::{ConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57 PbDispatchOutputMapping, PbDispatcherType, PbStreamContext, PbStreamNode, PbStreamScanType,
58 StreamScanType,
59};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::Expr;
62use sea_orm::{
63 ColumnTrait, DbErr, EntityTrait, FromQueryResult, JoinType, ModelTrait, PaginatorTrait,
64 QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
65};
66use serde::{Deserialize, Serialize};
67use tracing::debug;
68
69use crate::barrier::SnapshotBackfillInfo;
70use crate::controller::catalog::{CatalogController, CatalogControllerInner};
71use crate::controller::scale::resolve_streaming_job_definition;
72use crate::controller::utils::{
73 FragmentDesc, PartialActorLocation, PartialFragmentStateTables, get_fragment_actor_dispatchers,
74 get_fragment_mappings, get_sink_fragment_by_ids, has_table_been_migrated,
75 resolve_no_shuffle_actor_dispatcher,
76};
77use crate::manager::{LocalNotification, NotificationManager};
78use crate::model::{
79 DownstreamFragmentRelation, Fragment, FragmentActorDispatchers, FragmentDownstreamRelation,
80 StreamActor, StreamContext, StreamJobFragments, TableParallelism,
81};
82use crate::rpc::ddl_controller::build_upstream_sink_info;
83use crate::stream::{SplitAssignment, UpstreamSinkInfo};
84use crate::{MetaError, MetaResult};
85
86#[derive(Clone, Debug)]
88pub struct InflightActorInfo {
89 pub worker_id: WorkerId,
90 pub vnode_bitmap: Option<Bitmap>,
91 pub splits: Vec<SplitImpl>,
92}
93
94#[derive(Clone, Debug)]
95pub struct InflightFragmentInfo {
96 pub fragment_id: crate::model::FragmentId,
97 pub distribution_type: DistributionType,
98 pub nodes: PbStreamNode,
99 pub actors: HashMap<crate::model::ActorId, InflightActorInfo>,
100 pub state_table_ids: HashSet<risingwave_common::catalog::TableId>,
101}
102
103#[derive(Clone, Debug)]
104pub struct FragmentParallelismInfo {
105 pub distribution_type: FragmentDistributionType,
106 pub actor_count: usize,
107 pub vnode_count: usize,
108}
109
110#[easy_ext::ext(FragmentTypeMaskExt)]
111pub impl FragmentTypeMask {
112 fn intersects(flag: FragmentTypeFlag) -> SimpleExpr {
113 Expr::col(fragment::Column::FragmentTypeMask)
114 .bit_and(Expr::value(flag as i32))
115 .ne(0)
116 }
117
118 fn intersects_any(flags: impl IntoIterator<Item = FragmentTypeFlag>) -> SimpleExpr {
119 Expr::col(fragment::Column::FragmentTypeMask)
120 .bit_and(Expr::value(FragmentTypeFlag::raw_flag(flags) as i32))
121 .ne(0)
122 }
123
124 fn disjoint(flag: FragmentTypeFlag) -> SimpleExpr {
125 Expr::col(fragment::Column::FragmentTypeMask)
126 .bit_and(Expr::value(flag as i32))
127 .eq(0)
128 }
129}
130
131#[derive(Clone, Debug, FromQueryResult, Serialize, Deserialize)]
132#[serde(rename_all = "camelCase")] pub struct StreamingJobInfo {
134 pub job_id: ObjectId,
135 pub obj_type: ObjectType,
136 pub name: String,
137 pub job_status: JobStatus,
138 pub parallelism: StreamingParallelism,
139 pub max_parallelism: i32,
140 pub resource_group: String,
141 pub database_id: DatabaseId,
142 pub schema_id: SchemaId,
143}
144
145impl CatalogControllerInner {
146 pub async fn all_running_fragment_mappings(
148 &self,
149 ) -> MetaResult<impl Iterator<Item = FragmentWorkerSlotMapping> + '_> {
150 let txn = self.db.begin().await?;
151
152 let job_ids: Vec<ObjectId> = StreamingJob::find()
153 .select_only()
154 .column(streaming_job::Column::JobId)
155 .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
156 .into_tuple()
157 .all(&txn)
158 .await?;
159
160 let mut result = vec![];
161 for job_id in job_ids {
162 let mappings = get_fragment_mappings(&txn, job_id).await?;
163
164 result.extend(mappings.into_iter());
165 }
166
167 Ok(result.into_iter())
168 }
169}
170
171impl NotificationManager {
172 pub(crate) fn notify_fragment_mapping(
173 &self,
174 operation: NotificationOperation,
175 fragment_mappings: Vec<PbFragmentWorkerSlotMapping>,
176 ) {
177 let fragment_ids = fragment_mappings
178 .iter()
179 .map(|mapping| mapping.fragment_id)
180 .collect_vec();
181 if fragment_ids.is_empty() {
182 return;
183 }
184 for fragment_mapping in fragment_mappings {
186 self.notify_frontend_without_version(
187 operation,
188 NotificationInfo::StreamingWorkerSlotMapping(fragment_mapping),
189 );
190 }
191
192 match operation {
194 NotificationOperation::Add | NotificationOperation::Update => {
195 self.notify_local_subscribers(LocalNotification::FragmentMappingsUpsert(
196 fragment_ids,
197 ));
198 }
199 NotificationOperation::Delete => {
200 self.notify_local_subscribers(LocalNotification::FragmentMappingsDelete(
201 fragment_ids,
202 ));
203 }
204 op => {
205 tracing::warn!("unexpected fragment mapping op: {}", op.as_str_name());
206 }
207 }
208 }
209}
210
211impl CatalogController {
212 pub fn extract_fragment_and_actors_from_fragments(
213 job_id: ObjectId,
214 fragments: impl Iterator<Item = &Fragment>,
215 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
216 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
217 ) -> MetaResult<Vec<(fragment::Model, Vec<actor::Model>)>> {
218 fragments
219 .map(|fragment| {
220 Self::extract_fragment_and_actors_for_new_job(
221 job_id,
222 fragment,
223 actor_status,
224 actor_splits,
225 )
226 })
227 .try_collect()
228 }
229
230 pub fn extract_fragment_and_actors_for_new_job(
231 job_id: ObjectId,
232 fragment: &Fragment,
233 actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
234 actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
235 ) -> MetaResult<(fragment::Model, Vec<actor::Model>)> {
236 let vnode_count = fragment.vnode_count();
237 let Fragment {
238 fragment_id: pb_fragment_id,
239 fragment_type_mask: pb_fragment_type_mask,
240 distribution_type: pb_distribution_type,
241 actors: pb_actors,
242 state_table_ids: pb_state_table_ids,
243 nodes,
244 ..
245 } = fragment;
246
247 let state_table_ids = pb_state_table_ids.clone().into();
248
249 assert!(!pb_actors.is_empty());
250
251 let stream_node = {
252 let mut stream_node = nodes.clone();
253 visit_stream_node_mut(&mut stream_node, |body| {
254 #[expect(deprecated)]
255 if let NodeBody::Merge(m) = body {
256 m.upstream_actor_id = vec![];
257 }
258 });
259
260 stream_node
261 };
262
263 let mut actors = vec![];
264
265 for actor in pb_actors {
266 let StreamActor {
267 actor_id,
268 fragment_id,
269 vnode_bitmap,
270 mview_definition: _,
271 expr_context: pb_expr_context,
272 ..
273 } = actor;
274
275 let splits = actor_splits.get(actor_id).map(|splits| {
276 ConnectorSplits::from(&PbConnectorSplits {
277 splits: splits.iter().map(ConnectorSplit::from).collect(),
278 })
279 });
280 let status = actor_status.get(actor_id).cloned().ok_or_else(|| {
281 anyhow::anyhow!(
282 "actor {} in fragment {} has no actor_status",
283 actor_id,
284 fragment_id
285 )
286 })?;
287
288 let worker_id = status.worker_id() as _;
289
290 let pb_expr_context = pb_expr_context
291 .as_ref()
292 .expect("no expression context found");
293
294 #[expect(deprecated)]
295 actors.push(actor::Model {
296 actor_id: *actor_id as _,
297 fragment_id: *fragment_id as _,
298 status: status.get_state().unwrap().into(),
299 splits,
300 worker_id,
301 upstream_actor_ids: Default::default(),
302 vnode_bitmap: vnode_bitmap
303 .as_ref()
304 .map(|bitmap| VnodeBitmap::from(&bitmap.to_protobuf())),
305 expr_context: ExprContext::from(pb_expr_context),
306 });
307 }
308
309 let stream_node = StreamNode::from(&stream_node);
310
311 let distribution_type = PbFragmentDistributionType::try_from(*pb_distribution_type)
312 .unwrap()
313 .into();
314
315 #[expect(deprecated)]
316 let fragment = fragment::Model {
317 fragment_id: *pb_fragment_id as _,
318 job_id,
319 fragment_type_mask: (*pb_fragment_type_mask).into(),
320 distribution_type,
321 stream_node,
322 state_table_ids,
323 upstream_fragment_id: Default::default(),
324 vnode_count: vnode_count as _,
325 };
326
327 Ok((fragment, actors))
328 }
329
330 pub fn compose_table_fragments(
331 table_id: u32,
332 state: PbState,
333 ctx: Option<PbStreamContext>,
334 fragments: Vec<(fragment::Model, Vec<actor::Model>)>,
335 parallelism: StreamingParallelism,
336 max_parallelism: usize,
337 job_definition: Option<String>,
338 ) -> MetaResult<StreamJobFragments> {
339 let mut pb_fragments = BTreeMap::new();
340 let mut pb_actor_status = BTreeMap::new();
341
342 for (fragment, actors) in fragments {
343 let (fragment, fragment_actor_status, _) =
344 Self::compose_fragment(fragment, actors, job_definition.clone())?;
345
346 pb_fragments.insert(fragment.fragment_id, fragment);
347 pb_actor_status.extend(fragment_actor_status.into_iter());
348 }
349
350 let table_fragments = StreamJobFragments {
351 stream_job_id: table_id.into(),
352 state: state as _,
353 fragments: pb_fragments,
354 actor_status: pb_actor_status,
355 ctx: ctx
356 .as_ref()
357 .map(StreamContext::from_protobuf)
358 .unwrap_or_default(),
359 assigned_parallelism: match parallelism {
360 StreamingParallelism::Custom => TableParallelism::Custom,
361 StreamingParallelism::Adaptive => TableParallelism::Adaptive,
362 StreamingParallelism::Fixed(n) => TableParallelism::Fixed(n as _),
363 },
364 max_parallelism,
365 };
366
367 Ok(table_fragments)
368 }
369
370 #[allow(clippy::type_complexity)]
371 pub(crate) fn compose_fragment(
372 fragment: fragment::Model,
373 actors: Vec<actor::Model>,
374 job_definition: Option<String>,
375 ) -> MetaResult<(
376 Fragment,
377 HashMap<crate::model::ActorId, PbActorStatus>,
378 HashMap<crate::model::ActorId, PbConnectorSplits>,
379 )> {
380 let fragment::Model {
381 fragment_id,
382 fragment_type_mask,
383 distribution_type,
384 stream_node,
385 state_table_ids,
386 vnode_count,
387 ..
388 } = fragment;
389
390 let stream_node = stream_node.to_protobuf();
391 let mut upstream_fragments = HashSet::new();
392 visit_stream_node_body(&stream_node, |body| {
393 if let NodeBody::Merge(m) = body {
394 assert!(
395 upstream_fragments.insert(m.upstream_fragment_id),
396 "non-duplicate upstream fragment"
397 );
398 }
399 });
400
401 let mut pb_actors = vec![];
402
403 let mut pb_actor_status = HashMap::new();
404 let mut pb_actor_splits = HashMap::new();
405
406 for actor in actors {
407 if actor.fragment_id != fragment_id {
408 bail!(
409 "fragment id {} from actor {} is different from fragment {}",
410 actor.fragment_id,
411 actor.actor_id,
412 fragment_id
413 )
414 }
415
416 let actor::Model {
417 actor_id,
418 fragment_id,
419 status,
420 worker_id,
421 splits,
422 vnode_bitmap,
423 expr_context,
424 ..
425 } = actor;
426
427 let vnode_bitmap =
428 vnode_bitmap.map(|vnode_bitmap| Bitmap::from(vnode_bitmap.to_protobuf()));
429 let pb_expr_context = Some(expr_context.to_protobuf());
430
431 pb_actor_status.insert(
432 actor_id as _,
433 PbActorStatus {
434 location: PbActorLocation::from_worker(worker_id as u32),
435 state: PbActorState::from(status) as _,
436 },
437 );
438
439 if let Some(splits) = splits {
440 pb_actor_splits.insert(actor_id as _, splits.to_protobuf());
441 }
442
443 pb_actors.push(StreamActor {
444 actor_id: actor_id as _,
445 fragment_id: fragment_id as _,
446 vnode_bitmap,
447 mview_definition: job_definition.clone().unwrap_or("".to_owned()),
448 expr_context: pb_expr_context,
449 })
450 }
451
452 let pb_state_table_ids = state_table_ids.into_u32_array();
453 let pb_distribution_type = PbFragmentDistributionType::from(distribution_type) as _;
454 let pb_fragment = Fragment {
455 fragment_id: fragment_id as _,
456 fragment_type_mask: fragment_type_mask.into(),
457 distribution_type: pb_distribution_type,
458 actors: pb_actors,
459 state_table_ids: pb_state_table_ids,
460 maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(),
461 nodes: stream_node,
462 };
463
464 Ok((pb_fragment, pb_actor_status, pb_actor_splits))
465 }
466
467 pub async fn running_fragment_parallelisms(
468 &self,
469 id_filter: Option<HashSet<FragmentId>>,
470 ) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
471 let inner = self.inner.read().await;
472
473 let query_alias = Alias::new("fragment_actor_count");
474 let count_alias = Alias::new("count");
475
476 let mut query = SelectStatement::new()
477 .column(actor::Column::FragmentId)
478 .expr_as(actor::Column::ActorId.count(), count_alias.clone())
479 .from(Actor)
480 .group_by_col(actor::Column::FragmentId)
481 .to_owned();
482
483 if let Some(id_filter) = id_filter {
484 query.cond_having(actor::Column::FragmentId.is_in(id_filter));
485 }
486
487 let outer = SelectStatement::new()
488 .column((FragmentModel, fragment::Column::FragmentId))
489 .column(count_alias)
490 .column(fragment::Column::DistributionType)
491 .column(fragment::Column::VnodeCount)
492 .from_subquery(query.to_owned(), query_alias.clone())
493 .inner_join(
494 FragmentModel,
495 Expr::col((query_alias, actor::Column::FragmentId))
496 .equals((FragmentModel, fragment::Column::FragmentId)),
497 )
498 .to_owned();
499
500 let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
501 Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
502 outer.to_owned(),
503 )
504 .all(&inner.db)
505 .await?;
506
507 Ok(fragment_parallelisms
508 .into_iter()
509 .map(|(fragment_id, count, distribution_type, vnode_count)| {
510 (
511 fragment_id,
512 FragmentParallelismInfo {
513 distribution_type: distribution_type.into(),
514 actor_count: count as usize,
515 vnode_count: vnode_count as usize,
516 },
517 )
518 })
519 .collect())
520 }
521
522 pub async fn fragment_job_mapping(&self) -> MetaResult<HashMap<FragmentId, ObjectId>> {
523 let inner = self.inner.read().await;
524 let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
525 .select_only()
526 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
527 .into_tuple()
528 .all(&inner.db)
529 .await?;
530 Ok(fragment_jobs.into_iter().collect())
531 }
532
533 pub async fn get_fragment_job_id(
534 &self,
535 fragment_ids: Vec<FragmentId>,
536 ) -> MetaResult<Vec<ObjectId>> {
537 let inner = self.inner.read().await;
538
539 let object_ids: Vec<ObjectId> = FragmentModel::find()
540 .select_only()
541 .column(fragment::Column::JobId)
542 .filter(fragment::Column::FragmentId.is_in(fragment_ids))
543 .into_tuple()
544 .all(&inner.db)
545 .await?;
546
547 Ok(object_ids)
548 }
549
550 pub async fn get_fragment_desc_by_id(
551 &self,
552 fragment_id: FragmentId,
553 ) -> MetaResult<Option<(FragmentDesc, Vec<FragmentId>)>> {
554 let inner = self.inner.read().await;
555
556 let fragment_opt = FragmentModel::find()
558 .select_only()
559 .columns([
560 fragment::Column::FragmentId,
561 fragment::Column::JobId,
562 fragment::Column::FragmentTypeMask,
563 fragment::Column::DistributionType,
564 fragment::Column::StateTableIds,
565 fragment::Column::VnodeCount,
566 fragment::Column::StreamNode,
567 ])
568 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
569 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
570 .filter(fragment::Column::FragmentId.eq(fragment_id))
571 .group_by(fragment::Column::FragmentId)
572 .into_model::<FragmentDesc>()
573 .one(&inner.db)
574 .await?;
575
576 let Some(fragment) = fragment_opt else {
577 return Ok(None); };
579
580 let upstreams: Vec<FragmentId> = FragmentRelation::find()
582 .select_only()
583 .column(fragment_relation::Column::SourceFragmentId)
584 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment_id))
585 .into_tuple()
586 .all(&inner.db)
587 .await?;
588
589 Ok(Some((fragment, upstreams)))
590 }
591
592 pub async fn list_fragment_database_ids(
593 &self,
594 select_fragment_ids: Option<Vec<FragmentId>>,
595 ) -> MetaResult<Vec<(FragmentId, DatabaseId)>> {
596 let inner = self.inner.read().await;
597 let select = FragmentModel::find()
598 .select_only()
599 .column(fragment::Column::FragmentId)
600 .column(object::Column::DatabaseId)
601 .join(JoinType::InnerJoin, fragment::Relation::Object.def());
602 let select = if let Some(select_fragment_ids) = select_fragment_ids {
603 select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids))
604 } else {
605 select
606 };
607 Ok(select.into_tuple().all(&inner.db).await?)
608 }
609
610 pub async fn get_job_fragments_by_id(
611 &self,
612 job_id: ObjectId,
613 ) -> MetaResult<StreamJobFragments> {
614 let inner = self.inner.read().await;
615 let fragment_actors = FragmentModel::find()
616 .find_with_related(Actor)
617 .filter(fragment::Column::JobId.eq(job_id))
618 .all(&inner.db)
619 .await?;
620
621 let job_info = StreamingJob::find_by_id(job_id)
622 .one(&inner.db)
623 .await?
624 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
625
626 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
627 .await?
628 .remove(&job_id);
629
630 Self::compose_table_fragments(
631 job_id as _,
632 job_info.job_status.into(),
633 job_info.timezone.map(|tz| PbStreamContext { timezone: tz }),
634 fragment_actors,
635 job_info.parallelism.clone(),
636 job_info.max_parallelism as _,
637 job_definition,
638 )
639 }
640
641 pub async fn get_fragment_actor_dispatchers(
642 &self,
643 fragment_ids: Vec<FragmentId>,
644 ) -> MetaResult<FragmentActorDispatchers> {
645 let inner = self.inner.read().await;
646 get_fragment_actor_dispatchers(&inner.db, fragment_ids).await
647 }
648
649 pub async fn get_fragment_downstream_relations(
650 &self,
651 fragment_ids: Vec<FragmentId>,
652 ) -> MetaResult<FragmentDownstreamRelation> {
653 let inner = self.inner.read().await;
654 let mut stream = FragmentRelation::find()
655 .filter(fragment_relation::Column::SourceFragmentId.is_in(fragment_ids))
656 .stream(&inner.db)
657 .await?;
658 let mut relations = FragmentDownstreamRelation::new();
659 while let Some(relation) = stream.try_next().await? {
660 relations
661 .entry(relation.source_fragment_id as _)
662 .or_default()
663 .push(DownstreamFragmentRelation {
664 downstream_fragment_id: relation.target_fragment_id as _,
665 dispatcher_type: relation.dispatcher_type,
666 dist_key_indices: relation.dist_key_indices.into_u32_array(),
667 output_mapping: PbDispatchOutputMapping {
668 indices: relation.output_indices.into_u32_array(),
669 types: relation
670 .output_type_mapping
671 .unwrap_or_default()
672 .to_protobuf(),
673 },
674 });
675 }
676 Ok(relations)
677 }
678
679 pub async fn get_job_fragment_backfill_scan_type(
680 &self,
681 job_id: ObjectId,
682 ) -> MetaResult<HashMap<crate::model::FragmentId, PbStreamScanType>> {
683 let inner = self.inner.read().await;
684 let fragments: Vec<_> = FragmentModel::find()
685 .filter(fragment::Column::JobId.eq(job_id))
686 .all(&inner.db)
687 .await?;
688
689 let mut result = HashMap::new();
690
691 for fragment::Model {
692 fragment_id,
693 stream_node,
694 ..
695 } in fragments
696 {
697 let stream_node = stream_node.to_protobuf();
698 visit_stream_node_body(&stream_node, |body| {
699 if let NodeBody::StreamScan(node) = body {
700 match node.stream_scan_type() {
701 StreamScanType::Unspecified => {}
702 scan_type => {
703 result.insert(fragment_id as crate::model::FragmentId, scan_type);
704 }
705 }
706 }
707 });
708 }
709
710 Ok(result)
711 }
712
713 pub async fn list_streaming_job_infos(&self) -> MetaResult<Vec<StreamingJobInfo>> {
714 let inner = self.inner.read().await;
715 let job_states = StreamingJob::find()
716 .select_only()
717 .column(streaming_job::Column::JobId)
718 .join(JoinType::InnerJoin, streaming_job::Relation::Object.def())
719 .join(JoinType::InnerJoin, object::Relation::Database2.def())
720 .column(object::Column::ObjType)
721 .join(JoinType::LeftJoin, table::Relation::Object1.def().rev())
722 .join(JoinType::LeftJoin, source::Relation::Object.def().rev())
723 .join(JoinType::LeftJoin, sink::Relation::Object.def().rev())
724 .column_as(
725 Expr::if_null(
726 Expr::col((table::Entity, table::Column::Name)),
727 Expr::if_null(
728 Expr::col((source::Entity, source::Column::Name)),
729 Expr::if_null(
730 Expr::col((sink::Entity, sink::Column::Name)),
731 Expr::val("<unknown>"),
732 ),
733 ),
734 ),
735 "name",
736 )
737 .columns([
738 streaming_job::Column::JobStatus,
739 streaming_job::Column::Parallelism,
740 streaming_job::Column::MaxParallelism,
741 ])
742 .column_as(
743 Expr::if_null(
744 Expr::col((
745 streaming_job::Entity,
746 streaming_job::Column::SpecificResourceGroup,
747 )),
748 Expr::col((database::Entity, database::Column::ResourceGroup)),
749 ),
750 "resource_group",
751 )
752 .column(object::Column::DatabaseId)
753 .column(object::Column::SchemaId)
754 .into_model()
755 .all(&inner.db)
756 .await?;
757 Ok(job_states)
758 }
759
760 pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult<usize> {
761 let inner = self.inner.read().await;
762 let max_parallelism: i32 = StreamingJob::find_by_id(job_id)
763 .select_only()
764 .column(streaming_job::Column::MaxParallelism)
765 .into_tuple()
766 .one(&inner.db)
767 .await?
768 .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?;
769 Ok(max_parallelism as usize)
770 }
771
772 pub async fn get_job_actor_mapping(
774 &self,
775 job_ids: Vec<ObjectId>,
776 ) -> MetaResult<HashMap<ObjectId, Vec<ActorId>>> {
777 let inner = self.inner.read().await;
778 let job_actors: Vec<(ObjectId, ActorId)> = Actor::find()
779 .select_only()
780 .column(fragment::Column::JobId)
781 .column(actor::Column::ActorId)
782 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
783 .filter(fragment::Column::JobId.is_in(job_ids))
784 .into_tuple()
785 .all(&inner.db)
786 .await?;
787 Ok(job_actors.into_iter().into_group_map())
788 }
789
790 pub async fn get_job_internal_table_ids(&self) -> Option<Vec<(ObjectId, Vec<TableId>)>> {
792 if let Ok(inner) = self.inner.try_read()
793 && let Ok(job_state_tables) = FragmentModel::find()
794 .select_only()
795 .columns([fragment::Column::JobId, fragment::Column::StateTableIds])
796 .into_tuple::<(ObjectId, I32Array)>()
797 .all(&inner.db)
798 .await
799 {
800 let mut job_internal_table_ids = HashMap::new();
801 for (job_id, state_table_ids) in job_state_tables {
802 job_internal_table_ids
803 .entry(job_id)
804 .or_insert_with(Vec::new)
805 .extend(state_table_ids.into_inner());
806 }
807 return Some(job_internal_table_ids.into_iter().collect());
808 }
809 None
810 }
811
812 pub async fn has_any_running_jobs(&self) -> MetaResult<bool> {
813 let inner = self.inner.read().await;
814 let count = FragmentModel::find().count(&inner.db).await?;
815 Ok(count > 0)
816 }
817
818 pub async fn worker_actor_count(&self) -> MetaResult<HashMap<WorkerId, usize>> {
819 let inner = self.inner.read().await;
820 let actor_cnt: Vec<(WorkerId, i64)> = Actor::find()
821 .select_only()
822 .column(actor::Column::WorkerId)
823 .column_as(actor::Column::ActorId.count(), "count")
824 .group_by(actor::Column::WorkerId)
825 .into_tuple()
826 .all(&inner.db)
827 .await?;
828
829 Ok(actor_cnt
830 .into_iter()
831 .map(|(worker_id, count)| (worker_id, count as usize))
832 .collect())
833 }
834
835 pub async fn table_fragments(&self) -> MetaResult<BTreeMap<ObjectId, StreamJobFragments>> {
837 let inner = self.inner.read().await;
838 let jobs = StreamingJob::find().all(&inner.db).await?;
839
840 let mut job_definition = resolve_streaming_job_definition(
841 &inner.db,
842 &HashSet::from_iter(jobs.iter().map(|job| job.job_id)),
843 )
844 .await?;
845
846 let mut table_fragments = BTreeMap::new();
847 for job in jobs {
848 let fragment_actors = FragmentModel::find()
849 .find_with_related(Actor)
850 .filter(fragment::Column::JobId.eq(job.job_id))
851 .all(&inner.db)
852 .await?;
853
854 table_fragments.insert(
855 job.job_id as ObjectId,
856 Self::compose_table_fragments(
857 job.job_id as _,
858 job.job_status.into(),
859 job.timezone.map(|tz| PbStreamContext { timezone: tz }),
860 fragment_actors,
861 job.parallelism.clone(),
862 job.max_parallelism as _,
863 job_definition.remove(&job.job_id),
864 )?,
865 );
866 }
867
868 Ok(table_fragments)
869 }
870
871 pub async fn cdc_table_backfill_actor_ids(&self) -> MetaResult<HashMap<u32, HashSet<u32>>> {
873 let inner = self.inner.read().await;
874 let mut job_id_actor_ids = HashMap::default();
875 let stream_cdc_scan_flag = FragmentTypeFlag::StreamCdcScan as i32;
876 let fragment_type_mask = stream_cdc_scan_flag;
877 let fragment_actors: Vec<(fragment::Model, Vec<actor::Model>)> = FragmentModel::find()
878 .find_with_related(Actor)
879 .filter(fragment::Column::FragmentTypeMask.eq(fragment_type_mask))
880 .all(&inner.db)
881 .await?;
882 for (fragment, actors) in fragment_actors {
883 let e: &mut HashSet<u32> = job_id_actor_ids.entry(fragment.job_id as _).or_default();
884 e.extend(actors.iter().map(|a| a.actor_id as u32));
885 }
886 Ok(job_id_actor_ids)
887 }
888
889 pub async fn upstream_fragments(
890 &self,
891 fragment_ids: impl Iterator<Item = crate::model::FragmentId>,
892 ) -> MetaResult<HashMap<crate::model::FragmentId, HashSet<crate::model::FragmentId>>> {
893 let inner = self.inner.read().await;
894 let mut stream = FragmentRelation::find()
895 .select_only()
896 .columns([
897 fragment_relation::Column::SourceFragmentId,
898 fragment_relation::Column::TargetFragmentId,
899 ])
900 .filter(
901 fragment_relation::Column::TargetFragmentId
902 .is_in(fragment_ids.map(|id| id as FragmentId)),
903 )
904 .into_tuple::<(FragmentId, FragmentId)>()
905 .stream(&inner.db)
906 .await?;
907 let mut upstream_fragments: HashMap<_, HashSet<_>> = HashMap::new();
908 while let Some((upstream_fragment_id, downstream_fragment_id)) = stream.try_next().await? {
909 upstream_fragments
910 .entry(downstream_fragment_id as crate::model::FragmentId)
911 .or_default()
912 .insert(upstream_fragment_id as crate::model::FragmentId);
913 }
914 Ok(upstream_fragments)
915 }
916
917 pub async fn list_actor_locations(&self) -> MetaResult<Vec<PartialActorLocation>> {
918 let inner = self.inner.read().await;
919 let actor_locations: Vec<PartialActorLocation> =
920 Actor::find().into_partial_model().all(&inner.db).await?;
921 Ok(actor_locations)
922 }
923
924 pub async fn list_actor_info(
925 &self,
926 ) -> MetaResult<Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)>> {
927 let inner = self.inner.read().await;
928 let actor_locations: Vec<(ActorId, FragmentId, ObjectId, SchemaId, ObjectType)> =
929 Actor::find()
930 .join(JoinType::LeftJoin, actor::Relation::Fragment.def())
931 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
932 .select_only()
933 .columns([actor::Column::ActorId, actor::Column::FragmentId])
934 .column_as(object::Column::Oid, "job_id")
935 .column_as(object::Column::SchemaId, "schema_id")
936 .column_as(object::Column::ObjType, "type")
937 .into_tuple()
938 .all(&inner.db)
939 .await?;
940 Ok(actor_locations)
941 }
942
943 pub async fn list_source_actors(&self) -> MetaResult<Vec<(ActorId, FragmentId)>> {
944 let inner = self.inner.read().await;
945
946 let source_actors: Vec<(ActorId, FragmentId)> = Actor::find()
947 .select_only()
948 .filter(actor::Column::Splits.is_not_null())
949 .columns([actor::Column::ActorId, actor::Column::FragmentId])
950 .into_tuple()
951 .all(&inner.db)
952 .await?;
953
954 Ok(source_actors)
955 }
956
957 pub async fn list_creating_fragment_descs(
958 &self,
959 ) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
960 let inner = self.inner.read().await;
961 let mut result = Vec::new();
962 let fragments = FragmentModel::find()
963 .select_only()
964 .columns([
965 fragment::Column::FragmentId,
966 fragment::Column::JobId,
967 fragment::Column::FragmentTypeMask,
968 fragment::Column::DistributionType,
969 fragment::Column::StateTableIds,
970 fragment::Column::VnodeCount,
971 fragment::Column::StreamNode,
972 ])
973 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
974 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
975 .join(JoinType::LeftJoin, fragment::Relation::Object.def())
976 .join(JoinType::LeftJoin, object::Relation::StreamingJob.def())
977 .filter(
978 streaming_job::Column::JobStatus
979 .eq(JobStatus::Initial)
980 .or(streaming_job::Column::JobStatus.eq(JobStatus::Creating)),
981 )
982 .group_by(fragment::Column::FragmentId)
983 .into_model::<FragmentDesc>()
984 .all(&inner.db)
985 .await?;
986 for fragment in fragments {
987 let upstreams: Vec<FragmentId> = FragmentRelation::find()
988 .select_only()
989 .column(fragment_relation::Column::SourceFragmentId)
990 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
991 .into_tuple()
992 .all(&inner.db)
993 .await?;
994 result.push((fragment, upstreams));
995 }
996 Ok(result)
997 }
998
999 pub async fn list_fragment_descs(&self) -> MetaResult<Vec<(FragmentDesc, Vec<FragmentId>)>> {
1000 let inner = self.inner.read().await;
1001 let mut result = Vec::new();
1002 let fragments = FragmentModel::find()
1003 .select_only()
1004 .columns([
1005 fragment::Column::FragmentId,
1006 fragment::Column::JobId,
1007 fragment::Column::FragmentTypeMask,
1008 fragment::Column::DistributionType,
1009 fragment::Column::StateTableIds,
1010 fragment::Column::VnodeCount,
1011 fragment::Column::StreamNode,
1012 ])
1013 .column_as(Expr::col(actor::Column::ActorId).count(), "parallelism")
1014 .join(JoinType::LeftJoin, fragment::Relation::Actor.def())
1015 .group_by(fragment::Column::FragmentId)
1016 .into_model::<FragmentDesc>()
1017 .all(&inner.db)
1018 .await?;
1019 for fragment in fragments {
1020 let upstreams: Vec<FragmentId> = FragmentRelation::find()
1021 .select_only()
1022 .column(fragment_relation::Column::SourceFragmentId)
1023 .filter(fragment_relation::Column::TargetFragmentId.eq(fragment.fragment_id))
1024 .into_tuple()
1025 .all(&inner.db)
1026 .await?;
1027 result.push((fragment, upstreams));
1028 }
1029 Ok(result)
1030 }
1031
1032 pub async fn list_sink_actor_mapping(
1033 &self,
1034 ) -> MetaResult<HashMap<SinkId, (String, Vec<ActorId>)>> {
1035 let inner = self.inner.read().await;
1036 let sink_id_names: Vec<(SinkId, String)> = Sink::find()
1037 .select_only()
1038 .columns([sink::Column::SinkId, sink::Column::Name])
1039 .into_tuple()
1040 .all(&inner.db)
1041 .await?;
1042 let (sink_ids, _): (Vec<_>, Vec<_>) = sink_id_names.iter().cloned().unzip();
1043 let sink_name_mapping: HashMap<SinkId, String> = sink_id_names.into_iter().collect();
1044
1045 let actor_with_type: Vec<(ActorId, SinkId)> = Actor::find()
1046 .select_only()
1047 .column(actor::Column::ActorId)
1048 .column(fragment::Column::JobId)
1049 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1050 .filter(
1051 fragment::Column::JobId
1052 .is_in(sink_ids)
1053 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
1054 )
1055 .into_tuple()
1056 .all(&inner.db)
1057 .await?;
1058
1059 let mut sink_actor_mapping = HashMap::new();
1060 for (actor_id, sink_id) in actor_with_type {
1061 sink_actor_mapping
1062 .entry(sink_id)
1063 .or_insert_with(|| (sink_name_mapping.get(&sink_id).unwrap().clone(), vec![]))
1064 .1
1065 .push(actor_id);
1066 }
1067
1068 Ok(sink_actor_mapping)
1069 }
1070
1071 pub async fn list_fragment_state_tables(&self) -> MetaResult<Vec<PartialFragmentStateTables>> {
1072 let inner = self.inner.read().await;
1073 let fragment_state_tables: Vec<PartialFragmentStateTables> = FragmentModel::find()
1074 .select_only()
1075 .columns([
1076 fragment::Column::FragmentId,
1077 fragment::Column::JobId,
1078 fragment::Column::StateTableIds,
1079 ])
1080 .into_partial_model()
1081 .all(&inner.db)
1082 .await?;
1083 Ok(fragment_state_tables)
1084 }
1085
1086 pub async fn load_all_actors(
1089 &self,
1090 database_id: Option<DatabaseId>,
1091 ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
1092 {
1093 let inner = self.inner.read().await;
1094 let filter_condition = actor::Column::Status.eq(ActorStatus::Running);
1095 let filter_condition = if let Some(database_id) = database_id {
1096 filter_condition.and(object::Column::DatabaseId.eq(database_id))
1097 } else {
1098 filter_condition
1099 };
1100 #[expect(clippy::type_complexity)]
1101 let mut actor_info_stream: BoxStream<
1102 '_,
1103 Result<
1104 (
1105 ActorId,
1106 WorkerId,
1107 Option<VnodeBitmap>,
1108 Option<ConnectorSplits>,
1109 FragmentId,
1110 StreamNode,
1111 I32Array,
1112 DistributionType,
1113 DatabaseId,
1114 ObjectId,
1115 ),
1116 _,
1117 >,
1118 > = Actor::find()
1119 .select_only()
1120 .column(actor::Column::ActorId)
1121 .column(actor::Column::WorkerId)
1122 .column(actor::Column::VnodeBitmap)
1123 .column(actor::Column::Splits)
1124 .column(fragment::Column::FragmentId)
1125 .column(fragment::Column::StreamNode)
1126 .column(fragment::Column::StateTableIds)
1127 .column(fragment::Column::DistributionType)
1128 .column(object::Column::DatabaseId)
1129 .column(object::Column::Oid)
1130 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1131 .join(JoinType::InnerJoin, fragment::Relation::Object.def())
1132 .filter(filter_condition)
1133 .into_tuple()
1134 .stream(&inner.db)
1135 .await?;
1136
1137 let mut database_fragment_infos: HashMap<_, HashMap<_, HashMap<_, InflightFragmentInfo>>> =
1138 HashMap::new();
1139
1140 while let Some((
1141 actor_id,
1142 worker_id,
1143 vnode_bitmap,
1144 splits,
1145 fragment_id,
1146 node,
1147 state_table_ids,
1148 distribution_type,
1149 database_id,
1150 job_id,
1151 )) = actor_info_stream.try_next().await?
1152 {
1153 let fragment_infos = database_fragment_infos
1154 .entry(database_id)
1155 .or_default()
1156 .entry(job_id)
1157 .or_default();
1158 let state_table_ids = state_table_ids.into_inner();
1159 let state_table_ids = state_table_ids
1160 .into_iter()
1161 .map(|table_id| risingwave_common::catalog::TableId::new(table_id as _))
1162 .collect();
1163
1164 let actor_info = InflightActorInfo {
1165 worker_id,
1166 vnode_bitmap: vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
1167 splits: splits
1168 .map(|connector_splits| {
1169 connector_splits
1170 .to_protobuf()
1171 .splits
1172 .iter()
1173 .map(|connector_split| SplitImpl::try_from(connector_split).unwrap())
1174 .collect_vec()
1175 })
1176 .unwrap_or_default(),
1177 };
1178 match fragment_infos.entry(fragment_id) {
1179 Entry::Occupied(mut entry) => {
1180 let info: &mut InflightFragmentInfo = entry.get_mut();
1181 assert_eq!(info.state_table_ids, state_table_ids);
1182 assert!(info.actors.insert(actor_id as _, actor_info).is_none());
1183 }
1184 Entry::Vacant(entry) => {
1185 entry.insert(InflightFragmentInfo {
1186 fragment_id: fragment_id as _,
1187 distribution_type,
1188 nodes: node.to_protobuf(),
1189 actors: HashMap::from_iter([(actor_id as _, actor_info)]),
1190 state_table_ids,
1191 });
1192 }
1193 }
1194 }
1195
1196 debug!(?database_fragment_infos, "reload all actors");
1197
1198 Ok(database_fragment_infos)
1199 }
1200
1201 pub async fn migrate_actors(
1202 &self,
1203 plan: HashMap<WorkerSlotId, WorkerSlotId>,
1204 ) -> MetaResult<()> {
1205 let inner = self.inner.read().await;
1206 let txn = inner.db.begin().await?;
1207
1208 let actors: Vec<(
1209 FragmentId,
1210 DistributionType,
1211 ActorId,
1212 Option<VnodeBitmap>,
1213 WorkerId,
1214 ActorStatus,
1215 )> = Actor::find()
1216 .select_only()
1217 .columns([
1218 fragment::Column::FragmentId,
1219 fragment::Column::DistributionType,
1220 ])
1221 .columns([
1222 actor::Column::ActorId,
1223 actor::Column::VnodeBitmap,
1224 actor::Column::WorkerId,
1225 actor::Column::Status,
1226 ])
1227 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1228 .into_tuple()
1229 .all(&txn)
1230 .await?;
1231
1232 let mut actor_locations = HashMap::new();
1233
1234 for (fragment_id, _, actor_id, _, worker_id, status) in &actors {
1235 if *status != ActorStatus::Running {
1236 tracing::warn!(
1237 "skipping actor {} in fragment {} with status {:?}",
1238 actor_id,
1239 fragment_id,
1240 status
1241 );
1242 continue;
1243 }
1244
1245 actor_locations
1246 .entry(*worker_id)
1247 .or_insert(HashMap::new())
1248 .entry(*fragment_id)
1249 .or_insert(BTreeSet::new())
1250 .insert(*actor_id);
1251 }
1252
1253 let expired_or_changed_workers: HashSet<_> =
1254 plan.keys().map(|k| k.worker_id() as WorkerId).collect();
1255
1256 let mut actor_migration_plan = HashMap::new();
1257 for (worker, fragment) in actor_locations {
1258 if expired_or_changed_workers.contains(&worker) {
1259 for (fragment_id, actors) in fragment {
1260 debug!(
1261 "worker {} expired or changed, migrating fragment {}",
1262 worker, fragment_id
1263 );
1264 let worker_slot_to_actor: HashMap<_, _> = actors
1265 .iter()
1266 .enumerate()
1267 .map(|(idx, actor_id)| {
1268 (WorkerSlotId::new(worker as _, idx as _), *actor_id)
1269 })
1270 .collect();
1271
1272 for (worker_slot, actor) in worker_slot_to_actor {
1273 if let Some(target) = plan.get(&worker_slot) {
1274 actor_migration_plan.insert(actor, target.worker_id() as WorkerId);
1275 }
1276 }
1277 }
1278 }
1279 }
1280
1281 for (actor, worker) in actor_migration_plan {
1282 Actor::update_many()
1283 .col_expr(
1284 actor::Column::WorkerId,
1285 Expr::value(Value::Int(Some(worker))),
1286 )
1287 .filter(actor::Column::ActorId.eq(actor))
1288 .exec(&txn)
1289 .await?;
1290 }
1291
1292 txn.commit().await?;
1293
1294 Ok(())
1295 }
1296
1297 pub async fn all_inuse_worker_slots(&self) -> MetaResult<HashSet<WorkerSlotId>> {
1298 let inner = self.inner.read().await;
1299
1300 let actors: Vec<(FragmentId, ActorId, WorkerId)> = Actor::find()
1301 .select_only()
1302 .columns([fragment::Column::FragmentId])
1303 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1304 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1305 .into_tuple()
1306 .all(&inner.db)
1307 .await?;
1308
1309 let mut actor_locations = HashMap::new();
1310
1311 for (fragment_id, _, worker_id) in actors {
1312 *actor_locations
1313 .entry(worker_id)
1314 .or_insert(HashMap::new())
1315 .entry(fragment_id)
1316 .or_insert(0_usize) += 1;
1317 }
1318
1319 let mut result = HashSet::new();
1320 for (worker_id, mapping) in actor_locations {
1321 let max_fragment_len = mapping.values().max().unwrap();
1322
1323 result
1324 .extend((0..*max_fragment_len).map(|idx| WorkerSlotId::new(worker_id as u32, idx)))
1325 }
1326
1327 Ok(result)
1328 }
1329
1330 pub async fn all_node_actors(
1331 &self,
1332 include_inactive: bool,
1333 ) -> MetaResult<HashMap<WorkerId, Vec<StreamActor>>> {
1334 let inner = self.inner.read().await;
1335 let fragment_actors = if include_inactive {
1336 FragmentModel::find()
1337 .find_with_related(Actor)
1338 .all(&inner.db)
1339 .await?
1340 } else {
1341 FragmentModel::find()
1342 .find_with_related(Actor)
1343 .filter(actor::Column::Status.eq(ActorStatus::Running))
1344 .all(&inner.db)
1345 .await?
1346 };
1347
1348 let job_definitions = resolve_streaming_job_definition(
1349 &inner.db,
1350 &HashSet::from_iter(fragment_actors.iter().map(|(fragment, _)| fragment.job_id)),
1351 )
1352 .await?;
1353
1354 let mut node_actors = HashMap::new();
1355 for (fragment, actors) in fragment_actors {
1356 let job_id = fragment.job_id;
1357 let (table_fragments, actor_status, _) = Self::compose_fragment(
1358 fragment,
1359 actors,
1360 job_definitions.get(&(job_id as _)).cloned(),
1361 )?;
1362 for actor in table_fragments.actors {
1363 let node_id = actor_status[&actor.actor_id].worker_id() as WorkerId;
1364 node_actors
1365 .entry(node_id)
1366 .or_insert_with(Vec::new)
1367 .push(actor);
1368 }
1369 }
1370
1371 Ok(node_actors)
1372 }
1373
1374 pub async fn get_worker_actor_ids(
1375 &self,
1376 job_ids: Vec<ObjectId>,
1377 ) -> MetaResult<BTreeMap<WorkerId, Vec<ActorId>>> {
1378 let inner = self.inner.read().await;
1379 let actor_workers: Vec<(ActorId, WorkerId)> = Actor::find()
1380 .select_only()
1381 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1382 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1383 .filter(fragment::Column::JobId.is_in(job_ids))
1384 .into_tuple()
1385 .all(&inner.db)
1386 .await?;
1387
1388 let mut worker_actors = BTreeMap::new();
1389 for (actor_id, worker_id) in actor_workers {
1390 worker_actors
1391 .entry(worker_id)
1392 .or_insert_with(Vec::new)
1393 .push(actor_id);
1394 }
1395
1396 Ok(worker_actors)
1397 }
1398
1399 pub async fn update_actor_splits(&self, split_assignment: &SplitAssignment) -> MetaResult<()> {
1400 let inner = self.inner.read().await;
1401 let txn = inner.db.begin().await?;
1402 for assignments in split_assignment.values() {
1403 for (actor_id, splits) in assignments {
1404 let actor_splits = splits.iter().map(Into::into).collect_vec();
1405 Actor::update(actor::ActiveModel {
1406 actor_id: Set(*actor_id as _),
1407 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1408 splits: actor_splits,
1409 }))),
1410 ..Default::default()
1411 })
1412 .exec(&txn)
1413 .await
1414 .map_err(|err| {
1415 if err == DbErr::RecordNotUpdated {
1416 MetaError::catalog_id_not_found("actor_id", actor_id)
1417 } else {
1418 err.into()
1419 }
1420 })?;
1421 }
1422 }
1423 txn.commit().await?;
1424
1425 Ok(())
1426 }
1427
1428 #[await_tree::instrument]
1429 pub async fn fill_snapshot_backfill_epoch(
1430 &self,
1431 fragment_ids: impl Iterator<Item = FragmentId>,
1432 snapshot_backfill_info: Option<&SnapshotBackfillInfo>,
1433 cross_db_snapshot_backfill_info: &SnapshotBackfillInfo,
1434 ) -> MetaResult<()> {
1435 let inner = self.inner.write().await;
1436 let txn = inner.db.begin().await?;
1437 for fragment_id in fragment_ids {
1438 let fragment = FragmentModel::find_by_id(fragment_id)
1439 .one(&txn)
1440 .await?
1441 .context(format!("fragment {} not found", fragment_id))?;
1442 let mut node = fragment.stream_node.to_protobuf();
1443 if crate::stream::fill_snapshot_backfill_epoch(
1444 &mut node,
1445 snapshot_backfill_info,
1446 cross_db_snapshot_backfill_info,
1447 )? {
1448 let node = StreamNode::from(&node);
1449 FragmentModel::update(fragment::ActiveModel {
1450 fragment_id: Set(fragment_id),
1451 stream_node: Set(node),
1452 ..Default::default()
1453 })
1454 .exec(&txn)
1455 .await?;
1456 }
1457 }
1458 txn.commit().await?;
1459 Ok(())
1460 }
1461
1462 pub async fn get_running_actors_of_fragment(
1464 &self,
1465 fragment_id: FragmentId,
1466 ) -> MetaResult<Vec<ActorId>> {
1467 let inner = self.inner.read().await;
1468 let actors: Vec<ActorId> = Actor::find()
1469 .select_only()
1470 .column(actor::Column::ActorId)
1471 .filter(actor::Column::FragmentId.eq(fragment_id))
1472 .filter(actor::Column::Status.eq(ActorStatus::Running))
1473 .into_tuple()
1474 .all(&inner.db)
1475 .await?;
1476 Ok(actors)
1477 }
1478
1479 pub async fn get_running_actors_for_source_backfill(
1482 &self,
1483 source_backfill_fragment_id: FragmentId,
1484 source_fragment_id: FragmentId,
1485 ) -> MetaResult<Vec<(ActorId, ActorId)>> {
1486 let inner = self.inner.read().await;
1487 let txn = inner.db.begin().await?;
1488
1489 let fragment_relation: DispatcherType = FragmentRelation::find()
1490 .select_only()
1491 .column(fragment_relation::Column::DispatcherType)
1492 .filter(fragment_relation::Column::SourceFragmentId.eq(source_fragment_id))
1493 .filter(fragment_relation::Column::TargetFragmentId.eq(source_backfill_fragment_id))
1494 .into_tuple()
1495 .one(&txn)
1496 .await?
1497 .ok_or_else(|| {
1498 anyhow!(
1499 "no fragment connection from source fragment {} to source backfill fragment {}",
1500 source_fragment_id,
1501 source_backfill_fragment_id
1502 )
1503 })?;
1504
1505 if fragment_relation != DispatcherType::NoShuffle {
1506 return Err(anyhow!("expect NoShuffle but get {:?}", fragment_relation).into());
1507 }
1508
1509 let load_fragment_distribution_type = |txn, fragment_id: FragmentId| async move {
1510 let result: MetaResult<DistributionType> = try {
1511 FragmentModel::find_by_id(fragment_id)
1512 .select_only()
1513 .column(fragment::Column::DistributionType)
1514 .into_tuple()
1515 .one(txn)
1516 .await?
1517 .ok_or_else(|| anyhow!("failed to find fragment: {}", fragment_id))?
1518 };
1519 result
1520 };
1521
1522 let source_backfill_distribution_type =
1523 load_fragment_distribution_type(&txn, source_backfill_fragment_id).await?;
1524 let source_distribution_type =
1525 load_fragment_distribution_type(&txn, source_fragment_id).await?;
1526
1527 let load_fragment_actor_distribution = |txn, fragment_id: FragmentId| async move {
1528 Actor::find()
1529 .select_only()
1530 .column(actor::Column::ActorId)
1531 .column(actor::Column::VnodeBitmap)
1532 .filter(actor::Column::FragmentId.eq(fragment_id))
1533 .into_tuple()
1534 .stream(txn)
1535 .await?
1536 .map(|result| {
1537 result.map(|(actor_id, vnode): (ActorId, Option<VnodeBitmap>)| {
1538 (
1539 actor_id as _,
1540 vnode.map(|bitmap| Bitmap::from(bitmap.to_protobuf())),
1541 )
1542 })
1543 })
1544 .try_collect()
1545 .await
1546 };
1547
1548 let source_backfill_actors: HashMap<crate::model::ActorId, Option<Bitmap>> =
1549 load_fragment_actor_distribution(&txn, source_backfill_fragment_id).await?;
1550
1551 let source_actors = load_fragment_actor_distribution(&txn, source_fragment_id).await?;
1552
1553 Ok(resolve_no_shuffle_actor_dispatcher(
1554 source_distribution_type,
1555 &source_actors,
1556 source_backfill_distribution_type,
1557 &source_backfill_actors,
1558 )
1559 .into_iter()
1560 .map(|(source_actor, source_backfill_actor)| {
1561 (source_backfill_actor as _, source_actor as _)
1562 })
1563 .collect())
1564 }
1565
1566 pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
1567 let inner = self.inner.read().await;
1568 let actors: Vec<ActorId> = Actor::find()
1569 .select_only()
1570 .column(actor::Column::ActorId)
1571 .join(JoinType::InnerJoin, actor::Relation::Fragment.def())
1572 .filter(fragment::Column::JobId.is_in(job_ids))
1573 .into_tuple()
1574 .all(&inner.db)
1575 .await?;
1576 Ok(actors)
1577 }
1578
1579 pub async fn get_root_fragments(
1592 &self,
1593 job_ids: Vec<ObjectId>,
1594 ) -> MetaResult<(HashMap<ObjectId, Fragment>, Vec<(ActorId, WorkerId)>)> {
1595 let inner = self.inner.read().await;
1596
1597 let job_definitions = resolve_streaming_job_definition(
1598 &inner.db,
1599 &HashSet::from_iter(job_ids.iter().copied()),
1600 )
1601 .await?;
1602
1603 let all_fragments = FragmentModel::find()
1604 .filter(fragment::Column::JobId.is_in(job_ids))
1605 .all(&inner.db)
1606 .await?;
1607 let mut root_fragments = HashMap::<ObjectId, fragment::Model>::new();
1609 for fragment in all_fragments {
1610 let mask = FragmentTypeMask::from(fragment.fragment_type_mask);
1611 if mask.contains_any([FragmentTypeFlag::Mview, FragmentTypeFlag::Sink]) {
1612 _ = root_fragments.insert(fragment.job_id, fragment);
1613 } else if mask.contains(FragmentTypeFlag::Source) {
1614 _ = root_fragments.try_insert(fragment.job_id, fragment);
1617 }
1618 }
1619
1620 let mut root_fragments_pb = HashMap::new();
1621 for (_, fragment) in root_fragments {
1622 let actors = fragment.find_related(Actor).all(&inner.db).await?;
1623
1624 let job_id = fragment.job_id;
1625 root_fragments_pb.insert(
1626 fragment.job_id,
1627 Self::compose_fragment(
1628 fragment,
1629 actors,
1630 job_definitions.get(&(job_id as _)).cloned(),
1631 )?
1632 .0,
1633 );
1634 }
1635
1636 let actors: Vec<(ActorId, WorkerId)> = Actor::find()
1637 .select_only()
1638 .columns([actor::Column::ActorId, actor::Column::WorkerId])
1639 .into_tuple()
1640 .all(&inner.db)
1641 .await?;
1642
1643 Ok((root_fragments_pb, actors))
1644 }
1645
1646 pub async fn get_root_fragment(
1647 &self,
1648 job_id: ObjectId,
1649 ) -> MetaResult<(Fragment, Vec<(ActorId, WorkerId)>)> {
1650 let (mut root_fragments, actors) = self.get_root_fragments(vec![job_id]).await?;
1651 let root_fragment = root_fragments
1652 .remove(&job_id)
1653 .context(format!("root fragment for job {} not found", job_id))?;
1654 Ok((root_fragment, actors))
1655 }
1656
1657 pub async fn get_downstream_fragments(
1659 &self,
1660 job_id: ObjectId,
1661 ) -> MetaResult<(Vec<(PbDispatcherType, Fragment)>, Vec<(ActorId, WorkerId)>)> {
1662 let (root_fragment, actors) = self.get_root_fragment(job_id).await?;
1663
1664 let inner = self.inner.read().await;
1665 let downstream_fragment_relations: Vec<fragment_relation::Model> = FragmentRelation::find()
1666 .filter(
1667 fragment_relation::Column::SourceFragmentId
1668 .eq(root_fragment.fragment_id as FragmentId),
1669 )
1670 .all(&inner.db)
1671 .await?;
1672 let job_definition = resolve_streaming_job_definition(&inner.db, &HashSet::from([job_id]))
1673 .await?
1674 .remove(&job_id);
1675
1676 let mut downstream_fragments = vec![];
1677 for fragment_relation::Model {
1678 target_fragment_id: fragment_id,
1679 dispatcher_type,
1680 ..
1681 } in downstream_fragment_relations
1682 {
1683 let mut fragment_actors = FragmentModel::find_by_id(fragment_id)
1684 .find_with_related(Actor)
1685 .all(&inner.db)
1686 .await?;
1687 if fragment_actors.is_empty() {
1688 bail!("No fragment found for fragment id {}", fragment_id);
1689 }
1690 assert_eq!(fragment_actors.len(), 1);
1691 let (fragment, actors) = fragment_actors.pop().unwrap();
1692 let dispatch_type = PbDispatcherType::from(dispatcher_type);
1693 let fragment = Self::compose_fragment(fragment, actors, job_definition.clone())?.0;
1694 downstream_fragments.push((dispatch_type, fragment));
1695 }
1696
1697 Ok((downstream_fragments, actors))
1698 }
1699
1700 pub async fn load_source_fragment_ids(
1701 &self,
1702 ) -> MetaResult<HashMap<SourceId, BTreeSet<FragmentId>>> {
1703 let inner = self.inner.read().await;
1704 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1705 .select_only()
1706 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1707 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::Source))
1708 .into_tuple()
1709 .all(&inner.db)
1710 .await?;
1711
1712 let mut source_fragment_ids = HashMap::new();
1713 for (fragment_id, stream_node) in fragments {
1714 if let Some(source_id) = stream_node.to_protobuf().find_stream_source() {
1715 source_fragment_ids
1716 .entry(source_id as SourceId)
1717 .or_insert_with(BTreeSet::new)
1718 .insert(fragment_id);
1719 }
1720 }
1721 Ok(source_fragment_ids)
1722 }
1723
1724 pub async fn load_backfill_fragment_ids(
1725 &self,
1726 ) -> MetaResult<HashMap<SourceId, BTreeSet<(FragmentId, u32)>>> {
1727 let inner = self.inner.read().await;
1728 let fragments: Vec<(FragmentId, StreamNode)> = FragmentModel::find()
1729 .select_only()
1730 .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1731 .filter(FragmentTypeMask::intersects(FragmentTypeFlag::SourceScan))
1732 .into_tuple()
1733 .all(&inner.db)
1734 .await?;
1735
1736 let mut source_fragment_ids = HashMap::new();
1737 for (fragment_id, stream_node) in fragments {
1738 if let Some((source_id, upstream_source_fragment_id)) =
1739 stream_node.to_protobuf().find_source_backfill()
1740 {
1741 source_fragment_ids
1742 .entry(source_id as SourceId)
1743 .or_insert_with(BTreeSet::new)
1744 .insert((fragment_id, upstream_source_fragment_id));
1745 }
1746 }
1747 Ok(source_fragment_ids)
1748 }
1749
1750 pub async fn load_actor_splits(&self) -> MetaResult<HashMap<ActorId, ConnectorSplits>> {
1751 let inner = self.inner.read().await;
1752 let splits: Vec<(ActorId, ConnectorSplits)> = Actor::find()
1753 .select_only()
1754 .columns([actor::Column::ActorId, actor::Column::Splits])
1755 .filter(actor::Column::Splits.is_not_null())
1756 .into_tuple()
1757 .all(&inner.db)
1758 .await?;
1759 Ok(splits.into_iter().collect())
1760 }
1761
1762 pub async fn get_actual_job_fragment_parallelism(
1764 &self,
1765 job_id: ObjectId,
1766 ) -> MetaResult<Option<usize>> {
1767 let inner = self.inner.read().await;
1768 let fragments: Vec<(FragmentId, i64)> = FragmentModel::find()
1769 .join(JoinType::InnerJoin, fragment::Relation::Actor.def())
1770 .select_only()
1771 .columns([fragment::Column::FragmentId])
1772 .column_as(actor::Column::ActorId.count(), "count")
1773 .filter(
1774 fragment::Column::JobId
1775 .eq(job_id)
1776 .and(FragmentTypeMask::intersects_any([
1777 FragmentTypeFlag::Mview,
1778 FragmentTypeFlag::Sink,
1779 ])),
1780 )
1781 .group_by(fragment::Column::FragmentId)
1782 .into_tuple()
1783 .all(&inner.db)
1784 .await?;
1785
1786 Ok(fragments
1787 .into_iter()
1788 .at_most_one()
1789 .ok()
1790 .flatten()
1791 .map(|(_, count)| count as usize))
1792 }
1793
1794 pub async fn get_all_upstream_sink_infos(
1795 &self,
1796 target_table: &PbTable,
1797 target_fragment_id: FragmentId,
1798 ) -> MetaResult<Vec<UpstreamSinkInfo>> {
1799 let incoming_sinks = self.get_table_incoming_sinks(target_table.id as _).await?;
1800
1801 let inner = self.inner.read().await;
1802 let txn = inner.db.begin().await?;
1803
1804 let sink_ids = incoming_sinks.iter().map(|s| s.id as SinkId).collect_vec();
1805 let sink_fragment_ids = get_sink_fragment_by_ids(&txn, sink_ids).await?;
1806
1807 let mut upstream_sink_infos = Vec::with_capacity(incoming_sinks.len());
1808 for pb_sink in &incoming_sinks {
1809 let sink_fragment_id =
1810 sink_fragment_ids
1811 .get(&(pb_sink.id as _))
1812 .cloned()
1813 .ok_or(anyhow::anyhow!(
1814 "sink fragment not found for sink id {}",
1815 pb_sink.id
1816 ))?;
1817 let upstream_info = build_upstream_sink_info(
1818 pb_sink,
1819 sink_fragment_id,
1820 target_table,
1821 target_fragment_id,
1822 )?;
1823 upstream_sink_infos.push(upstream_info);
1824 }
1825
1826 Ok(upstream_sink_infos)
1827 }
1828
1829 pub async fn get_mview_fragment_by_id(&self, table_id: TableId) -> MetaResult<FragmentId> {
1830 let inner = self.inner.read().await;
1831 let txn = inner.db.begin().await?;
1832
1833 let mview_fragment: Vec<FragmentId> = FragmentModel::find()
1834 .select_only()
1835 .column(fragment::Column::FragmentId)
1836 .filter(
1837 fragment::Column::JobId
1838 .eq(table_id)
1839 .and(FragmentTypeMask::intersects(FragmentTypeFlag::Mview)),
1840 )
1841 .into_tuple()
1842 .all(&txn)
1843 .await?;
1844
1845 if mview_fragment.len() != 1 {
1846 return Err(anyhow::anyhow!(
1847 "expected exactly one mview fragment for table {}, found {}",
1848 table_id,
1849 mview_fragment.len()
1850 )
1851 .into());
1852 }
1853
1854 Ok(mview_fragment.into_iter().next().unwrap())
1855 }
1856
1857 pub async fn has_table_been_migrated(&self, table_id: TableId) -> MetaResult<bool> {
1858 let inner = self.inner.read().await;
1859 let txn = inner.db.begin().await?;
1860 has_table_been_migrated(&txn, table_id).await
1861 }
1862
1863 pub async fn update_source_splits(
1864 &self,
1865 source_splits: &HashMap<SourceId, Vec<SplitImpl>>,
1866 ) -> MetaResult<()> {
1867 let inner = self.inner.read().await;
1868 let txn = inner.db.begin().await?;
1869
1870 let models: Vec<source_splits::ActiveModel> = source_splits
1871 .iter()
1872 .map(|(source_id, splits)| source_splits::ActiveModel {
1873 source_id: Set(*source_id as _),
1874 splits: Set(Some(ConnectorSplits::from(&PbConnectorSplits {
1875 splits: splits.iter().map(Into::into).collect_vec(),
1876 }))),
1877 })
1878 .collect();
1879
1880 SourceSplits::insert_many(models)
1881 .on_conflict(
1882 OnConflict::column(source_splits::Column::SourceId)
1883 .update_column(source_splits::Column::Splits)
1884 .to_owned(),
1885 )
1886 .exec(&txn)
1887 .await?;
1888
1889 txn.commit().await?;
1890
1891 Ok(())
1892 }
1893}
1894
1895#[cfg(test)]
1896mod tests {
1897 use std::collections::{BTreeMap, HashMap, HashSet};
1898
1899 use itertools::Itertools;
1900 use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
1901 use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount};
1902 use risingwave_common::util::iter_util::ZipEqDebug;
1903 use risingwave_common::util::stream_graph_visitor::visit_stream_node_body;
1904 use risingwave_meta_model::actor::ActorStatus;
1905 use risingwave_meta_model::fragment::DistributionType;
1906 use risingwave_meta_model::{
1907 ActorId, ConnectorSplits, ExprContext, FragmentId, I32Array, ObjectId, StreamNode, TableId,
1908 VnodeBitmap, actor, fragment,
1909 };
1910 use risingwave_pb::common::PbActorLocation;
1911 use risingwave_pb::meta::table_fragments::PbActorStatus;
1912 use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
1913 use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType;
1914 use risingwave_pb::plan_common::PbExprContext;
1915 use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
1916 use risingwave_pb::stream_plan::stream_node::PbNodeBody;
1917 use risingwave_pb::stream_plan::{MergeNode, PbStreamNode, PbUnionNode};
1918
1919 use crate::MetaResult;
1920 use crate::controller::catalog::CatalogController;
1921 use crate::model::{Fragment, StreamActor};
1922
1923 type ActorUpstreams = BTreeMap<crate::model::FragmentId, HashSet<crate::model::ActorId>>;
1924
1925 type FragmentActorUpstreams = HashMap<crate::model::ActorId, ActorUpstreams>;
1926
1927 const TEST_FRAGMENT_ID: FragmentId = 1;
1928
1929 const TEST_UPSTREAM_FRAGMENT_ID: FragmentId = 2;
1930
1931 const TEST_JOB_ID: ObjectId = 1;
1932
1933 const TEST_STATE_TABLE_ID: TableId = 1000;
1934
1935 fn generate_upstream_actor_ids_for_actor(actor_id: u32) -> ActorUpstreams {
1936 let mut upstream_actor_ids = BTreeMap::new();
1937 upstream_actor_ids.insert(
1938 TEST_UPSTREAM_FRAGMENT_ID as crate::model::FragmentId,
1939 HashSet::from_iter([(actor_id + 100)]),
1940 );
1941 upstream_actor_ids.insert(
1942 (TEST_UPSTREAM_FRAGMENT_ID + 1) as _,
1943 HashSet::from_iter([(actor_id + 200)]),
1944 );
1945 upstream_actor_ids
1946 }
1947
1948 fn generate_merger_stream_node(actor_upstream_actor_ids: &ActorUpstreams) -> PbStreamNode {
1949 let mut input = vec![];
1950 for upstream_fragment_id in actor_upstream_actor_ids.keys() {
1951 input.push(PbStreamNode {
1952 node_body: Some(PbNodeBody::Merge(Box::new(MergeNode {
1953 upstream_fragment_id: *upstream_fragment_id as _,
1954 ..Default::default()
1955 }))),
1956 ..Default::default()
1957 });
1958 }
1959
1960 PbStreamNode {
1961 input,
1962 node_body: Some(PbNodeBody::Union(PbUnionNode {})),
1963 ..Default::default()
1964 }
1965 }
1966
1967 #[tokio::test]
1968 async fn test_extract_fragment() -> MetaResult<()> {
1969 let actor_count = 3u32;
1970 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
1971 .map(|actor_id| {
1972 (
1973 actor_id as _,
1974 generate_upstream_actor_ids_for_actor(actor_id),
1975 )
1976 })
1977 .collect();
1978
1979 let actor_bitmaps = ActorMapping::new_uniform(
1980 (0..actor_count).map(|i| i as _),
1981 VirtualNode::COUNT_FOR_TEST,
1982 )
1983 .to_bitmaps();
1984
1985 let stream_node = generate_merger_stream_node(upstream_actor_ids.values().next().unwrap());
1986
1987 let pb_actors = (0..actor_count)
1988 .map(|actor_id| StreamActor {
1989 actor_id: actor_id as _,
1990 fragment_id: TEST_FRAGMENT_ID as _,
1991 vnode_bitmap: actor_bitmaps.get(&actor_id).cloned(),
1992 mview_definition: "".to_owned(),
1993 expr_context: Some(PbExprContext {
1994 time_zone: String::from("America/New_York"),
1995 strict_mode: false,
1996 }),
1997 })
1998 .collect_vec();
1999
2000 let pb_fragment = Fragment {
2001 fragment_id: TEST_FRAGMENT_ID as _,
2002 fragment_type_mask: FragmentTypeMask::from(FragmentTypeFlag::Source as u32),
2003 distribution_type: PbFragmentDistributionType::Hash as _,
2004 actors: pb_actors.clone(),
2005 state_table_ids: vec![TEST_STATE_TABLE_ID as _],
2006 maybe_vnode_count: VnodeCount::for_test().to_protobuf(),
2007 nodes: stream_node.clone(),
2008 };
2009
2010 let pb_actor_status = (0..actor_count)
2011 .map(|actor_id| {
2012 (
2013 actor_id,
2014 PbActorStatus {
2015 location: PbActorLocation::from_worker(0),
2016 state: PbActorState::Running as _,
2017 },
2018 )
2019 })
2020 .collect();
2021
2022 let pb_actor_splits = Default::default();
2023
2024 let (fragment, actors) = CatalogController::extract_fragment_and_actors_for_new_job(
2025 TEST_JOB_ID,
2026 &pb_fragment,
2027 &pb_actor_status,
2028 &pb_actor_splits,
2029 )?;
2030
2031 check_fragment(fragment, pb_fragment);
2032 check_actors(
2033 actors,
2034 &upstream_actor_ids,
2035 pb_actors,
2036 Default::default(),
2037 &stream_node,
2038 );
2039
2040 Ok(())
2041 }
2042
2043 #[tokio::test]
2044 async fn test_compose_fragment() -> MetaResult<()> {
2045 let actor_count = 3u32;
2046
2047 let upstream_actor_ids: FragmentActorUpstreams = (0..actor_count)
2048 .map(|actor_id| {
2049 (
2050 actor_id as _,
2051 generate_upstream_actor_ids_for_actor(actor_id),
2052 )
2053 })
2054 .collect();
2055
2056 let mut actor_bitmaps = ActorMapping::new_uniform(
2057 (0..actor_count).map(|i| i as _),
2058 VirtualNode::COUNT_FOR_TEST,
2059 )
2060 .to_bitmaps();
2061
2062 let actors = (0..actor_count)
2063 .map(|actor_id| {
2064 let actor_splits = Some(ConnectorSplits::from(&PbConnectorSplits {
2065 splits: vec![PbConnectorSplit {
2066 split_type: "dummy".to_owned(),
2067 ..Default::default()
2068 }],
2069 }));
2070
2071 #[expect(deprecated)]
2072 actor::Model {
2073 actor_id: actor_id as ActorId,
2074 fragment_id: TEST_FRAGMENT_ID,
2075 status: ActorStatus::Running,
2076 splits: actor_splits,
2077 worker_id: 0,
2078 upstream_actor_ids: Default::default(),
2079 vnode_bitmap: actor_bitmaps
2080 .remove(&actor_id)
2081 .map(|bitmap| bitmap.to_protobuf())
2082 .as_ref()
2083 .map(VnodeBitmap::from),
2084 expr_context: ExprContext::from(&PbExprContext {
2085 time_zone: String::from("America/New_York"),
2086 strict_mode: false,
2087 }),
2088 }
2089 })
2090 .collect_vec();
2091
2092 let stream_node = {
2093 let template_actor = actors.first().cloned().unwrap();
2094
2095 let template_upstream_actor_ids = upstream_actor_ids
2096 .get(&(template_actor.actor_id as _))
2097 .unwrap();
2098
2099 generate_merger_stream_node(template_upstream_actor_ids)
2100 };
2101
2102 #[expect(deprecated)]
2103 let fragment = fragment::Model {
2104 fragment_id: TEST_FRAGMENT_ID,
2105 job_id: TEST_JOB_ID,
2106 fragment_type_mask: 0,
2107 distribution_type: DistributionType::Hash,
2108 stream_node: StreamNode::from(&stream_node),
2109 state_table_ids: I32Array(vec![TEST_STATE_TABLE_ID]),
2110 upstream_fragment_id: Default::default(),
2111 vnode_count: VirtualNode::COUNT_FOR_TEST as _,
2112 };
2113
2114 let (pb_fragment, pb_actor_status, pb_actor_splits) =
2115 CatalogController::compose_fragment(fragment.clone(), actors.clone(), None).unwrap();
2116
2117 assert_eq!(pb_actor_status.len(), actor_count as usize);
2118 assert_eq!(pb_actor_splits.len(), actor_count as usize);
2119
2120 let pb_actors = pb_fragment.actors.clone();
2121
2122 check_fragment(fragment, pb_fragment);
2123 check_actors(
2124 actors,
2125 &upstream_actor_ids,
2126 pb_actors,
2127 pb_actor_splits,
2128 &stream_node,
2129 );
2130
2131 Ok(())
2132 }
2133
2134 fn check_actors(
2135 actors: Vec<actor::Model>,
2136 actor_upstreams: &FragmentActorUpstreams,
2137 pb_actors: Vec<StreamActor>,
2138 pb_actor_splits: HashMap<u32, PbConnectorSplits>,
2139 stream_node: &PbStreamNode,
2140 ) {
2141 for (
2142 actor::Model {
2143 actor_id,
2144 fragment_id,
2145 status,
2146 splits,
2147 worker_id: _,
2148 vnode_bitmap,
2149 expr_context,
2150 ..
2151 },
2152 StreamActor {
2153 actor_id: pb_actor_id,
2154 fragment_id: pb_fragment_id,
2155 vnode_bitmap: pb_vnode_bitmap,
2156 mview_definition,
2157 expr_context: pb_expr_context,
2158 ..
2159 },
2160 ) in actors.into_iter().zip_eq_debug(pb_actors.into_iter())
2161 {
2162 assert_eq!(actor_id, pb_actor_id as ActorId);
2163 assert_eq!(fragment_id, pb_fragment_id as FragmentId);
2164
2165 assert_eq!(
2166 vnode_bitmap.map(|bitmap| bitmap.to_protobuf().into()),
2167 pb_vnode_bitmap,
2168 );
2169
2170 assert_eq!(mview_definition, "");
2171
2172 visit_stream_node_body(stream_node, |body| {
2173 if let PbNodeBody::Merge(m) = body {
2174 assert!(
2175 actor_upstreams
2176 .get(&(actor_id as _))
2177 .unwrap()
2178 .contains_key(&m.upstream_fragment_id)
2179 );
2180 }
2181 });
2182
2183 assert_eq!(status, ActorStatus::Running);
2184
2185 assert_eq!(
2186 splits,
2187 pb_actor_splits.get(&pb_actor_id).map(ConnectorSplits::from)
2188 );
2189
2190 assert_eq!(Some(expr_context.to_protobuf()), pb_expr_context);
2191 }
2192 }
2193
2194 fn check_fragment(fragment: fragment::Model, pb_fragment: Fragment) {
2195 let Fragment {
2196 fragment_id,
2197 fragment_type_mask,
2198 distribution_type: pb_distribution_type,
2199 actors: _,
2200 state_table_ids: pb_state_table_ids,
2201 maybe_vnode_count: _,
2202 nodes,
2203 } = pb_fragment;
2204
2205 assert_eq!(fragment_id, TEST_FRAGMENT_ID as u32);
2206 assert_eq!(fragment_type_mask, fragment.fragment_type_mask.into());
2207 assert_eq!(
2208 pb_distribution_type,
2209 PbFragmentDistributionType::from(fragment.distribution_type)
2210 );
2211
2212 assert_eq!(
2213 pb_state_table_ids,
2214 fragment.state_table_ids.into_u32_array()
2215 );
2216 assert_eq!(fragment.stream_node.to_protobuf(), nodes);
2217 }
2218}