risingwave_meta_model_migration/
m20240617_071625_sink_into_table_column.rs

1use sea_orm_migration::prelude::{Table as MigrationTable, *};
2
3use crate::SubQueryStatement::SelectStatement;
4use crate::utils::ColumnDefExt;
5
6#[derive(DeriveMigrationName)]
7pub struct Migration;
8
9#[async_trait::async_trait]
10impl MigrationTrait for Migration {
11    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
12        manager
13            .alter_table(
14                MigrationTable::alter()
15                    .table(Sink::Table)
16                    .add_column(ColumnDef::new(Sink::OriginalTargetColumns).rw_binary(manager))
17                    .to_owned(),
18            )
19            .await?;
20
21        let stmt = Query::update()
22            .table(Sink::Table)
23            .value(
24                Sink::OriginalTargetColumns,
25                SimpleExpr::SubQuery(
26                    None,
27                    Box::new(SelectStatement(
28                        Query::select()
29                            .column((Table::Table, Table::Columns))
30                            .from(Table::Table)
31                            .and_where(
32                                Expr::col((Table::Table, Table::TableId))
33                                    .equals((Sink::Table, Sink::TargetTable)),
34                            )
35                            .to_owned(),
36                    )),
37                ),
38            )
39            .and_where(Expr::col((Sink::Table, Sink::TargetTable)).is_not_null())
40            .to_owned();
41
42        manager.exec_stmt(stmt).await?;
43
44        Ok(())
45    }
46
47    async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
48        manager
49            .alter_table(
50                MigrationTable::alter()
51                    .table(Sink::Table)
52                    .drop_column(Sink::OriginalTargetColumns)
53                    .to_owned(),
54            )
55            .await
56    }
57}
58
59#[derive(DeriveIden)]
60enum Sink {
61    Table,
62    TargetTable,
63    OriginalTargetColumns,
64}
65
66#[derive(DeriveIden)]
67enum Table {
68    Table,
69    TableId,
70    Columns,
71}