risingwave_meta_model_migration/
m20250325_061743_exactly_once_iceberg_sink_metadata.rs

1use sea_orm_migration::prelude::*;
2
3use crate::utils::ColumnDefExt;
4
5#[derive(DeriveMigrationName)]
6pub struct Migration;
7#[async_trait::async_trait]
8impl MigrationTrait for Migration {
9    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
10        manager
11            .create_table(
12                Table::create()
13                    .table(ExactlyOnceIcebergSinkMetadata::Table)
14                    .if_not_exists()
15                    .col(
16                        ColumnDef::new(ExactlyOnceIcebergSinkMetadata::SinkId)
17                            .integer()
18                            .not_null(),
19                    )
20                    .col(
21                        ColumnDef::new(ExactlyOnceIcebergSinkMetadata::EndEpoch)
22                            .big_integer()
23                            .not_null(),
24                    )
25                    .col(
26                        ColumnDef::new(ExactlyOnceIcebergSinkMetadata::StartEpoch)
27                            .big_integer()
28                            .not_null(),
29                    )
30                    .col(
31                        ColumnDef::new(ExactlyOnceIcebergSinkMetadata::Metadata)
32                            .rw_binary(manager)
33                            .not_null(),
34                    )
35                    .col(
36                        ColumnDef::new(ExactlyOnceIcebergSinkMetadata::SnapshotId)
37                            .big_integer()
38                            .not_null(),
39                    )
40                    .col(
41                        ColumnDef::new(ExactlyOnceIcebergSinkMetadata::Committed)
42                            .boolean()
43                            .not_null(),
44                    )
45                    .primary_key(
46                        Index::create()
47                            .col(ExactlyOnceIcebergSinkMetadata::SinkId)
48                            .col(ExactlyOnceIcebergSinkMetadata::EndEpoch),
49                    )
50                    .to_owned(),
51            )
52            .await?;
53
54        Ok(())
55    }
56
57    async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
58        crate::drop_tables!(manager, ExactlyOnceIcebergSinkMetadata);
59        Ok(())
60    }
61}
62
63#[derive(DeriveIden)]
64enum ExactlyOnceIcebergSinkMetadata {
65    Table,
66    SinkId,
67    EndEpoch,
68    StartEpoch,
69    Metadata,
70    SnapshotId,
71    Committed,
72}