risingwave_meta_model_migration/
m20250325_061743_exactly_once_iceberg_sink_metadata.rs1use sea_orm_migration::prelude::*;
2
3use crate::utils::ColumnDefExt;
4
5#[derive(DeriveMigrationName)]
6pub struct Migration;
7#[async_trait::async_trait]
8impl MigrationTrait for Migration {
9 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
10 manager
11 .create_table(
12 Table::create()
13 .table(ExactlyOnceIcebergSinkMetadata::Table)
14 .if_not_exists()
15 .col(
16 ColumnDef::new(ExactlyOnceIcebergSinkMetadata::SinkId)
17 .integer()
18 .not_null(),
19 )
20 .col(
21 ColumnDef::new(ExactlyOnceIcebergSinkMetadata::EndEpoch)
22 .big_integer()
23 .not_null(),
24 )
25 .col(
26 ColumnDef::new(ExactlyOnceIcebergSinkMetadata::StartEpoch)
27 .big_integer()
28 .not_null(),
29 )
30 .col(
31 ColumnDef::new(ExactlyOnceIcebergSinkMetadata::Metadata)
32 .rw_binary(manager)
33 .not_null(),
34 )
35 .col(
36 ColumnDef::new(ExactlyOnceIcebergSinkMetadata::SnapshotId)
37 .big_integer()
38 .not_null(),
39 )
40 .col(
41 ColumnDef::new(ExactlyOnceIcebergSinkMetadata::Committed)
42 .boolean()
43 .not_null(),
44 )
45 .primary_key(
46 Index::create()
47 .col(ExactlyOnceIcebergSinkMetadata::SinkId)
48 .col(ExactlyOnceIcebergSinkMetadata::EndEpoch),
49 )
50 .to_owned(),
51 )
52 .await?;
53
54 Ok(())
55 }
56
57 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
58 crate::drop_tables!(manager, ExactlyOnceIcebergSinkMetadata);
59 Ok(())
60 }
61}
62
63#[derive(DeriveIden)]
64enum ExactlyOnceIcebergSinkMetadata {
65 Table,
66 SinkId,
67 EndEpoch,
68 StartEpoch,
69 Metadata,
70 SnapshotId,
71 Committed,
72}