risingwave_meta_model_migration/
m20240630_131430_remove_parallel_unit.rs

1use sea_orm_migration::prelude::*;
2use serde::{Deserialize, Serialize};
3
4use crate::sea_orm::{FromJsonQueryResult, FromQueryResult, Statement};
5use crate::utils::ColumnDefExt;
6
7#[derive(DeriveMigrationName)]
8pub struct Migration;
9
10#[async_trait::async_trait]
11impl MigrationTrait for Migration {
12    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
13        manager
14            .alter_table(
15                Table::alter()
16                    .table(WorkerProperty::Table)
17                    .add_column(ColumnDef::new(WorkerProperty::Parallelism).integer())
18                    .to_owned(),
19            )
20            .await?;
21
22        set_worker_parallelism(manager).await?;
23
24        manager
25            .alter_table(
26                Table::alter()
27                    .table(WorkerProperty::Table)
28                    .drop_column(WorkerProperty::ParallelUnitIds)
29                    .to_owned(),
30            )
31            .await?;
32
33        manager
34            .alter_table(
35                Table::alter()
36                    .table(Fragment::Table)
37                    .drop_column(Fragment::VnodeMapping)
38                    .to_owned(),
39            )
40            .await?;
41
42        manager
43            .alter_table(
44                Table::alter()
45                    .table(Actor::Table)
46                    .drop_column(Actor::ParallelUnitId)
47                    .to_owned(),
48            )
49            .await
50    }
51
52    async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
53        manager
54            .alter_table(
55                Table::alter()
56                    .table(WorkerProperty::Table)
57                    .drop_column(WorkerProperty::Parallelism)
58                    .to_owned(),
59            )
60            .await?;
61
62        manager
63            .alter_table(
64                Table::alter()
65                    .table(WorkerProperty::Table)
66                    .add_column(
67                        ColumnDef::new(WorkerProperty::ParallelUnitIds)
68                            .json_binary()
69                            .not_null(),
70                    )
71                    .to_owned(),
72            )
73            .await?;
74
75        manager
76            .alter_table(
77                Table::alter()
78                    .table(Fragment::Table)
79                    .add_column(
80                        ColumnDef::new(Fragment::VnodeMapping)
81                            .rw_binary(manager)
82                            .not_null(),
83                    )
84                    .to_owned(),
85            )
86            .await?;
87
88        manager
89            .alter_table(
90                Table::alter()
91                    .table(Actor::Table)
92                    .add_column(ColumnDef::new(Actor::ParallelUnitId).integer().not_null())
93                    .to_owned(),
94            )
95            .await
96    }
97}
98
99// Set worker parallelism based on the number of parallel unit ids
100async fn set_worker_parallelism(manager: &SchemaManager<'_>) -> Result<(), DbErr> {
101    let connection = manager.get_connection();
102
103    let database_backend = connection.get_database_backend();
104
105    let (sql, values) = Query::select()
106        .columns([
107            (WorkerProperty::Table, WorkerProperty::WorkerId),
108            (WorkerProperty::Table, WorkerProperty::ParallelUnitIds),
109        ])
110        .from(WorkerProperty::Table)
111        .to_owned()
112        .build_any(&*database_backend.get_query_builder());
113
114    let stmt = Statement::from_sql_and_values(database_backend, sql, values);
115
116    for WorkerPropertyParallelUnitIds {
117        worker_id,
118        parallel_unit_ids,
119    } in WorkerPropertyParallelUnitIds::find_by_statement(stmt)
120        .all(connection)
121        .await?
122    {
123        manager
124            .exec_stmt(
125                Query::update()
126                    .table(WorkerProperty::Table)
127                    .value(
128                        WorkerProperty::Parallelism,
129                        Expr::value(parallel_unit_ids.0.len() as i32),
130                    )
131                    .and_where(Expr::col(WorkerProperty::WorkerId).eq(worker_id))
132                    .to_owned(),
133            )
134            .await?;
135    }
136    Ok(())
137}
138#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
139pub struct I32Array(pub Vec<i32>);
140
141#[derive(Debug, FromQueryResult)]
142pub struct WorkerPropertyParallelUnitIds {
143    worker_id: i32,
144    parallel_unit_ids: I32Array,
145}
146
147#[derive(DeriveIden)]
148enum WorkerProperty {
149    Table,
150    WorkerId,
151    Parallelism,
152    ParallelUnitIds,
153}
154
155#[derive(DeriveIden)]
156enum Fragment {
157    Table,
158    VnodeMapping,
159}
160
161#[derive(DeriveIden)]
162enum Actor {
163    Table,
164    ParallelUnitId,
165}