risingwave_meta_model_migration/
m20250106_072104_fragment_relation.rs

1use sea_orm::{FromJsonQueryResult, FromQueryResult, Statement};
2use sea_orm_migration::prelude::*;
3use serde::{Deserialize, Serialize};
4
5use crate::drop_tables;
6
7#[derive(DeriveMigrationName)]
8pub struct Migration;
9
10#[async_trait::async_trait]
11impl MigrationTrait for Migration {
12    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
13        manager
14            .create_table(
15                Table::create()
16                    .table(FragmentRelation::Table)
17                    .col(
18                        ColumnDef::new(FragmentRelation::SourceFragmentId)
19                            .integer()
20                            .not_null(),
21                    )
22                    .col(
23                        ColumnDef::new(FragmentRelation::TargetFragmentId)
24                            .integer()
25                            .not_null(),
26                    )
27                    .col(
28                        ColumnDef::new(FragmentRelation::DispatcherType)
29                            .string()
30                            .not_null(),
31                    )
32                    .col(
33                        ColumnDef::new(FragmentRelation::DistKeyIndices)
34                            .json_binary()
35                            .not_null(),
36                    )
37                    .col(
38                        ColumnDef::new(FragmentRelation::OutputIndices)
39                            .json_binary()
40                            .not_null(),
41                    )
42                    .primary_key(
43                        Index::create()
44                            .col(FragmentRelation::SourceFragmentId)
45                            .col(FragmentRelation::TargetFragmentId),
46                    )
47                    .foreign_key(
48                        &mut ForeignKey::create()
49                            .name("FK_fragment_relation_source_oid")
50                            .from(FragmentRelation::Table, FragmentRelation::SourceFragmentId)
51                            .to(Fragment::Table, Fragment::FragmentId)
52                            .on_delete(ForeignKeyAction::Cascade)
53                            .to_owned(),
54                    )
55                    .foreign_key(
56                        &mut ForeignKey::create()
57                            .name("FK_fragment_relation_target_oid")
58                            .from(FragmentRelation::Table, FragmentRelation::TargetFragmentId)
59                            .to(Fragment::Table, Fragment::FragmentId)
60                            .on_delete(ForeignKeyAction::Cascade)
61                            .to_owned(),
62                    )
63                    .to_owned(),
64            )
65            .await?;
66
67        fulfill_fragment_relation(manager).await?;
68
69        Ok(())
70    }
71
72    async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
73        drop_tables!(manager, FragmentRelation);
74        Ok(())
75    }
76}
77
78// Fulfill the FragmentRelation table with data from the Actor and ActorDispatcher tables
79async fn fulfill_fragment_relation(manager: &SchemaManager<'_>) -> Result<(), DbErr> {
80    let connection = manager.get_connection();
81
82    let database_backend = connection.get_database_backend();
83
84    let (sql, values) = Query::select()
85        // as sqlite does not support `distinct on` yet, we have to use `distinct` here
86        .distinct()
87        .expr_as(
88            Expr::col((Actor::Table, Actor::FragmentId)),
89            FragmentRelation::SourceFragmentId,
90        )
91        .expr_as(
92            Expr::col((ActorDispatcher::Table, ActorDispatcher::DispatcherId)),
93            FragmentRelation::TargetFragmentId,
94        )
95        .columns([
96            (ActorDispatcher::Table, ActorDispatcher::DispatcherType),
97            (ActorDispatcher::Table, ActorDispatcher::DistKeyIndices),
98            (ActorDispatcher::Table, ActorDispatcher::OutputIndices),
99        ])
100        .from(Actor::Table)
101        .join(
102            JoinType::InnerJoin,
103            ActorDispatcher::Table,
104            Expr::col((Actor::Table, Actor::ActorId))
105                .equals((ActorDispatcher::Table, ActorDispatcher::ActorId)),
106        )
107        .to_owned()
108        .build_any(&*database_backend.get_query_builder());
109
110    let rows = connection
111        .query_all(Statement::from_sql_and_values(
112            database_backend,
113            sql,
114            values,
115        ))
116        .await?;
117
118    for row in rows {
119        let FragmentRelationEntity {
120            source_fragment_id,
121            target_fragment_id,
122            dispatcher_type,
123            dist_key_indices,
124            output_indices,
125        } = FragmentRelationEntity::from_query_result(&row, "")?;
126
127        manager
128            .exec_stmt(
129                Query::insert()
130                    .into_table(FragmentRelation::Table)
131                    .columns([
132                        FragmentRelation::SourceFragmentId,
133                        FragmentRelation::TargetFragmentId,
134                        FragmentRelation::DispatcherType,
135                        FragmentRelation::DistKeyIndices,
136                        FragmentRelation::OutputIndices,
137                    ])
138                    .values_panic([
139                        source_fragment_id.into(),
140                        target_fragment_id.into(),
141                        dispatcher_type.into(),
142                        dist_key_indices.into(),
143                        output_indices.into(),
144                    ])
145                    .to_owned(),
146            )
147            .await?;
148    }
149
150    Ok(())
151}
152
153#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
154pub struct I32Array(pub Vec<i32>);
155
156#[derive(Debug, FromQueryResult)]
157#[sea_orm(entity = "FragmentRelation")]
158pub struct FragmentRelationEntity {
159    source_fragment_id: i32,
160    target_fragment_id: i32,
161    dispatcher_type: String,
162    dist_key_indices: I32Array,
163    output_indices: I32Array,
164}
165
166#[derive(DeriveIden)]
167enum FragmentRelation {
168    Table,
169    SourceFragmentId,
170    TargetFragmentId,
171    DispatcherType,
172    DistKeyIndices,
173    OutputIndices,
174}
175
176#[derive(DeriveIden)]
177enum Fragment {
178    Table,
179    FragmentId,
180}
181
182#[derive(DeriveIden)]
183enum Actor {
184    Table,
185    ActorId,
186    FragmentId,
187}
188
189#[derive(DeriveIden)]
190enum ActorDispatcher {
191    Table,
192    ActorId,
193    DispatcherType,
194    DistKeyIndices,
195    OutputIndices,
196    DispatcherId,
197}