risingwave_meta_model_migration/
m20240618_072634_function_compressed_binary.rs

1use sea_orm_migration::prelude::*;
2
3use crate::sea_orm::{DatabaseBackend, DbBackend, Statement};
4use crate::utils::ColumnDefExt;
5
6#[derive(DeriveMigrationName)]
7pub struct Migration;
8
9#[async_trait::async_trait]
10impl MigrationTrait for Migration {
11    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
12        // Fix mismatch column `compressed_binary` type and do data migration
13        match manager.get_database_backend() {
14            DbBackend::MySql => {
15                // Creating function with compressed binary will fail in previous version, so we can
16                // safely assume that the column is always empty and we can just modify the column type
17                // without any data migration.
18                manager
19                    .alter_table(
20                        Table::alter()
21                            .table(Function::Table)
22                            .modify_column(
23                                ColumnDef::new(Function::CompressedBinary).rw_binary(manager),
24                            )
25                            .to_owned(),
26                    )
27                    .await?;
28            }
29            DbBackend::Postgres => {
30                manager.get_connection().execute(Statement::from_string(
31                    DatabaseBackend::Postgres,
32                    "ALTER TABLE function ALTER COLUMN compressed_binary TYPE bytea USING compressed_binary::bytea",
33                )).await?;
34            }
35            DbBackend::Sqlite => {
36                // Sqlite does not support modifying column type, so we need to do data migration and column renaming.
37                // Note that: all these DDLs are not transactional, so if some of them fail, we need to manually run it again.
38                let conn = manager.get_connection();
39                conn.execute(Statement::from_string(
40                    DatabaseBackend::Sqlite,
41                    "ALTER TABLE function ADD COLUMN compressed_binary_new BLOB",
42                ))
43                .await?;
44                conn.execute(Statement::from_string(
45                    DatabaseBackend::Sqlite,
46                    "UPDATE function SET compressed_binary_new = compressed_binary",
47                ))
48                .await?;
49                conn.execute(Statement::from_string(
50                    DatabaseBackend::Sqlite,
51                    "ALTER TABLE function DROP COLUMN compressed_binary",
52                ))
53                .await?;
54                conn.execute(Statement::from_string(
55                    DatabaseBackend::Sqlite,
56                    "ALTER TABLE function RENAME COLUMN compressed_binary_new TO compressed_binary",
57                ))
58                .await?;
59            }
60        }
61
62        Ok(())
63    }
64
65    async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> {
66        // DO nothing, the operations in `up` are idempotent and required to fix the column type mismatch.
67        Ok(())
68    }
69}
70
71#[derive(DeriveIden)]
72enum Function {
73    Table,
74    CompressedBinary,
75}