risingwave_meta_model_migration/
m20230908_072257_init.rs

1use sea_orm_migration::prelude::{Index as MigrationIndex, Table as MigrationTable, *};
2
3use crate::sea_orm::{DatabaseBackend, DbBackend, Statement};
4use crate::utils::ColumnDefExt;
5use crate::{assert_not_has_tables, drop_tables};
6
7#[derive(DeriveMigrationName)]
8pub struct Migration;
9
10#[async_trait::async_trait]
11impl MigrationTrait for Migration {
12    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
13        // 1. check if the table exists.
14        assert_not_has_tables!(
15            manager,
16            Cluster,
17            Worker,
18            WorkerProperty,
19            User,
20            UserPrivilege,
21            Database,
22            Schema,
23            StreamingJob,
24            Fragment,
25            Actor,
26            ActorDispatcher,
27            Table,
28            Source,
29            Sink,
30            Connection,
31            View,
32            Index,
33            Function,
34            Object,
35            ObjectDependency,
36            SystemParameter,
37            CatalogVersion
38        );
39
40        // In Mysql, The CHAR, VARCHAR and TEXT types are encoded in utf8_general_ci by default, which is not case sensitive but
41        // required in risingwave. Here we need to change the database collate to utf8mb4_bin.
42        if manager.get_database_backend() == DbBackend::MySql {
43            manager
44                .get_connection()
45                .execute_unprepared("ALTER DATABASE CHARACTER SET utf8mb4 COLLATE utf8mb4_bin")
46                .await
47                .expect("failed to set database collate");
48        }
49
50        // 2. create tables.
51        manager
52            .create_table(
53                MigrationTable::create()
54                    .table(Cluster::Table)
55                    .col(
56                        ColumnDef::new(Cluster::ClusterId)
57                            .uuid()
58                            .not_null()
59                            .primary_key(),
60                    )
61                    .col(
62                        ColumnDef::new(Cluster::CreatedAt)
63                            .date_time()
64                            .default(Expr::current_timestamp())
65                            .not_null(),
66                    )
67                    .to_owned(),
68            )
69            .await?;
70        manager
71            .create_table(
72                MigrationTable::create()
73                    .table(Worker::Table)
74                    .col(
75                        ColumnDef::new(Worker::WorkerId)
76                            .integer()
77                            .not_null()
78                            .auto_increment()
79                            .primary_key(),
80                    )
81                    .col(ColumnDef::new(Worker::WorkerType).string().not_null())
82                    .col(ColumnDef::new(Worker::Host).string().not_null())
83                    .col(ColumnDef::new(Worker::Port).integer().not_null())
84                    .col(ColumnDef::new(Worker::Status).string().not_null())
85                    .col(ColumnDef::new(Worker::TransactionId).integer())
86                    .to_owned(),
87            )
88            .await?;
89        manager
90            .create_table(
91                MigrationTable::create()
92                    .table(WorkerProperty::Table)
93                    .col(
94                        ColumnDef::new(WorkerProperty::WorkerId)
95                            .integer()
96                            .primary_key(),
97                    )
98                    .col(
99                        ColumnDef::new(WorkerProperty::ParallelUnitIds)
100                            .json_binary()
101                            .not_null(),
102                    )
103                    .col(
104                        ColumnDef::new(WorkerProperty::IsStreaming)
105                            .boolean()
106                            .not_null(),
107                    )
108                    .col(
109                        ColumnDef::new(WorkerProperty::IsServing)
110                            .boolean()
111                            .not_null(),
112                    )
113                    .col(
114                        ColumnDef::new(WorkerProperty::IsUnschedulable)
115                            .boolean()
116                            .not_null(),
117                    )
118                    .foreign_key(
119                        &mut ForeignKey::create()
120                            .name("FK_worker_property_worker_id")
121                            .from(WorkerProperty::Table, WorkerProperty::WorkerId)
122                            .to(Worker::Table, Worker::WorkerId)
123                            .on_delete(ForeignKeyAction::Cascade)
124                            .to_owned(),
125                    )
126                    .to_owned(),
127            )
128            .await?;
129        manager
130            .create_table(
131                MigrationTable::create()
132                    .table(User::Table)
133                    .col(
134                        ColumnDef::new(User::UserId)
135                            .integer()
136                            .primary_key()
137                            .auto_increment(),
138                    )
139                    .col(ColumnDef::new(User::Name).string().unique_key().not_null())
140                    .col(ColumnDef::new(User::IsSuper).boolean().not_null())
141                    .col(ColumnDef::new(User::CanCreateDb).boolean().not_null())
142                    .col(ColumnDef::new(User::CanCreateUser).boolean().not_null())
143                    .col(ColumnDef::new(User::CanLogin).boolean().not_null())
144                    .col(ColumnDef::new(User::AuthInfo).rw_binary(manager))
145                    .to_owned(),
146            )
147            .await?;
148        manager
149            .create_table(
150                MigrationTable::create()
151                    .table(Object::Table)
152                    .col(
153                        ColumnDef::new(Object::Oid)
154                            .integer()
155                            .auto_increment()
156                            .primary_key(),
157                    )
158                    .col(ColumnDef::new(Object::ObjType).string().not_null())
159                    .col(ColumnDef::new(Object::OwnerId).integer().not_null())
160                    .col(ColumnDef::new(Object::SchemaId).integer())
161                    .col(ColumnDef::new(Object::DatabaseId).integer())
162                    .col(
163                        ColumnDef::new(Object::InitializedAt)
164                            .date_time()
165                            .default(Expr::current_timestamp())
166                            .not_null(),
167                    )
168                    .col(
169                        ColumnDef::new(Object::CreatedAt)
170                            .date_time()
171                            .default(Expr::current_timestamp())
172                            .not_null(),
173                    )
174                    .col(ColumnDef::new(Object::InitializedAtClusterVersion).string())
175                    .col(ColumnDef::new(Object::CreatedAtClusterVersion).string())
176                    .foreign_key(
177                        &mut ForeignKey::create()
178                            .name("FK_object_owner_id")
179                            .from(Object::Table, Object::OwnerId)
180                            .to(User::Table, User::UserId)
181                            .on_delete(ForeignKeyAction::Cascade)
182                            .to_owned(),
183                    )
184                    .foreign_key(
185                        &mut ForeignKey::create()
186                            .name("FK_object_database_id")
187                            .from(Object::Table, Object::DatabaseId)
188                            .to(Object::Table, Object::Oid)
189                            .on_delete(ForeignKeyAction::Cascade)
190                            .to_owned(),
191                    )
192                    .foreign_key(
193                        &mut ForeignKey::create()
194                            .name("FK_object_schema_id")
195                            .from(Object::Table, Object::SchemaId)
196                            .to(Object::Table, Object::Oid)
197                            .on_delete(ForeignKeyAction::Cascade)
198                            .to_owned(),
199                    )
200                    .to_owned(),
201            )
202            .await?;
203        manager
204            .create_table(
205                MigrationTable::create()
206                    .table(UserPrivilege::Table)
207                    .col(
208                        ColumnDef::new(UserPrivilege::Id)
209                            .integer()
210                            .primary_key()
211                            .auto_increment(),
212                    )
213                    .col(ColumnDef::new(UserPrivilege::DependentId).integer())
214                    .col(ColumnDef::new(UserPrivilege::UserId).integer().not_null())
215                    .col(ColumnDef::new(UserPrivilege::Oid).integer().not_null())
216                    .col(
217                        ColumnDef::new(UserPrivilege::GrantedBy)
218                            .integer()
219                            .not_null(),
220                    )
221                    .col(ColumnDef::new(UserPrivilege::Action).string().not_null())
222                    .col(
223                        ColumnDef::new(UserPrivilege::WithGrantOption)
224                            .boolean()
225                            .not_null(),
226                    )
227                    .foreign_key(
228                        &mut ForeignKey::create()
229                            .name("FK_user_privilege_dependent_id")
230                            .from(UserPrivilege::Table, UserPrivilege::DependentId)
231                            .to(UserPrivilege::Table, UserPrivilege::Id)
232                            .on_delete(ForeignKeyAction::Cascade)
233                            .to_owned(),
234                    )
235                    .foreign_key(
236                        &mut ForeignKey::create()
237                            .name("FK_user_privilege_user_id")
238                            .from(UserPrivilege::Table, UserPrivilege::UserId)
239                            .to(User::Table, User::UserId)
240                            .on_delete(ForeignKeyAction::Cascade)
241                            .to_owned(),
242                    )
243                    .foreign_key(
244                        &mut ForeignKey::create()
245                            .name("FK_user_privilege_granted_by")
246                            .from(UserPrivilege::Table, UserPrivilege::GrantedBy)
247                            .to(User::Table, User::UserId)
248                            .to_owned(),
249                    )
250                    .foreign_key(
251                        &mut ForeignKey::create()
252                            .name("FK_user_privilege_oid")
253                            .from(UserPrivilege::Table, UserPrivilege::Oid)
254                            .to(Object::Table, Object::Oid)
255                            .on_delete(ForeignKeyAction::Cascade)
256                            .to_owned(),
257                    )
258                    .to_owned(),
259            )
260            .await?;
261        manager
262            .create_table(
263                MigrationTable::create()
264                    .table(ObjectDependency::Table)
265                    .col(
266                        ColumnDef::new(ObjectDependency::Id)
267                            .integer()
268                            .auto_increment()
269                            .primary_key(),
270                    )
271                    .col(ColumnDef::new(ObjectDependency::Oid).integer().not_null())
272                    .col(
273                        ColumnDef::new(ObjectDependency::UsedBy)
274                            .integer()
275                            .not_null(),
276                    )
277                    .foreign_key(
278                        &mut ForeignKey::create()
279                            .name("FK_object_dependency_oid")
280                            .from(ObjectDependency::Table, ObjectDependency::Oid)
281                            .to(Object::Table, Object::Oid)
282                            .on_delete(ForeignKeyAction::Cascade)
283                            .to_owned(),
284                    )
285                    .foreign_key(
286                        &mut ForeignKey::create()
287                            .name("FK_object_dependency_used_by")
288                            .from(ObjectDependency::Table, ObjectDependency::UsedBy)
289                            .to(Object::Table, Object::Oid)
290                            .on_delete(ForeignKeyAction::Cascade)
291                            .to_owned(),
292                    )
293                    .to_owned(),
294            )
295            .await?;
296        manager
297            .create_table(
298                MigrationTable::create()
299                    .table(Database::Table)
300                    .col(ColumnDef::new(Database::DatabaseId).integer().primary_key())
301                    .col(
302                        ColumnDef::new(Database::Name)
303                            .string()
304                            .unique_key()
305                            .not_null(),
306                    )
307                    .foreign_key(
308                        &mut ForeignKey::create()
309                            .name("FK_database_object_id")
310                            .from(Database::Table, Database::DatabaseId)
311                            .to(Object::Table, Object::Oid)
312                            .on_delete(ForeignKeyAction::Cascade)
313                            .to_owned(),
314                    )
315                    .to_owned(),
316            )
317            .await?;
318        manager
319            .create_table(
320                MigrationTable::create()
321                    .table(Schema::Table)
322                    .col(ColumnDef::new(Schema::SchemaId).integer().primary_key())
323                    .col(ColumnDef::new(Schema::Name).string().not_null())
324                    .foreign_key(
325                        &mut ForeignKey::create()
326                            .name("FK_schema_object_id")
327                            .from(Schema::Table, Schema::SchemaId)
328                            .to(Object::Table, Object::Oid)
329                            .on_delete(ForeignKeyAction::Cascade)
330                            .to_owned(),
331                    )
332                    .to_owned(),
333            )
334            .await?;
335        manager
336            .create_table(
337                MigrationTable::create()
338                    .table(StreamingJob::Table)
339                    .col(ColumnDef::new(StreamingJob::JobId).integer().primary_key())
340                    .col(ColumnDef::new(StreamingJob::JobStatus).string().not_null())
341                    .col(ColumnDef::new(StreamingJob::CreateType).string().not_null())
342                    .col(ColumnDef::new(StreamingJob::Timezone).string())
343                    .col(
344                        ColumnDef::new(StreamingJob::Parallelism)
345                            .json_binary()
346                            .not_null(),
347                    )
348                    .foreign_key(
349                        &mut ForeignKey::create()
350                            .name("FK_streaming_job_object_id")
351                            .from(StreamingJob::Table, StreamingJob::JobId)
352                            .to(Object::Table, Object::Oid)
353                            .on_delete(ForeignKeyAction::Cascade)
354                            .to_owned(),
355                    )
356                    .to_owned(),
357            )
358            .await?;
359        manager
360            .create_table(
361                MigrationTable::create()
362                    .table(Fragment::Table)
363                    .col(
364                        ColumnDef::new(Fragment::FragmentId)
365                            .integer()
366                            .primary_key()
367                            .auto_increment(),
368                    )
369                    .col(ColumnDef::new(Fragment::JobId).integer().not_null())
370                    .col(
371                        ColumnDef::new(Fragment::FragmentTypeMask)
372                            .integer()
373                            .not_null(),
374                    )
375                    .col(
376                        ColumnDef::new(Fragment::DistributionType)
377                            .string()
378                            .not_null(),
379                    )
380                    .col(
381                        ColumnDef::new(Fragment::StreamNode)
382                            .rw_binary(manager)
383                            .not_null(),
384                    )
385                    .col(
386                        ColumnDef::new(Fragment::VnodeMapping)
387                            .rw_binary(manager)
388                            .not_null(),
389                    )
390                    .col(ColumnDef::new(Fragment::StateTableIds).json_binary())
391                    .col(ColumnDef::new(Fragment::UpstreamFragmentId).json_binary())
392                    .foreign_key(
393                        &mut ForeignKey::create()
394                            .name("FK_fragment_table_id")
395                            .from(Fragment::Table, Fragment::JobId)
396                            .to(Object::Table, Object::Oid)
397                            .on_delete(ForeignKeyAction::Cascade)
398                            .to_owned(),
399                    )
400                    .to_owned(),
401            )
402            .await?;
403        manager
404            .create_table(
405                MigrationTable::create()
406                    .table(Actor::Table)
407                    .col(
408                        ColumnDef::new(Actor::ActorId)
409                            .integer()
410                            .primary_key()
411                            .auto_increment(),
412                    )
413                    .col(ColumnDef::new(Actor::FragmentId).integer().not_null())
414                    .col(ColumnDef::new(Actor::Status).string().not_null())
415                    .col(ColumnDef::new(Actor::Splits).rw_binary(manager))
416                    .col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null())
417                    .col(ColumnDef::new(Actor::WorkerId).integer().not_null())
418                    .col(ColumnDef::new(Actor::UpstreamActorIds).json_binary())
419                    .col(ColumnDef::new(Actor::VnodeBitmap).rw_binary(manager))
420                    .col(
421                        ColumnDef::new(Actor::ExprContext)
422                            .rw_binary(manager)
423                            .not_null(),
424                    )
425                    .foreign_key(
426                        &mut ForeignKey::create()
427                            .name("FK_actor_fragment_id")
428                            .from(Actor::Table, Actor::FragmentId)
429                            .to(Fragment::Table, Fragment::FragmentId)
430                            .on_delete(ForeignKeyAction::Cascade)
431                            .to_owned(),
432                    )
433                    .to_owned(),
434            )
435            .await?;
436        manager
437            .create_table(
438                MigrationTable::create()
439                    .table(ActorDispatcher::Table)
440                    .col(
441                        ColumnDef::new(ActorDispatcher::Id)
442                            .integer()
443                            .primary_key()
444                            .auto_increment(),
445                    )
446                    .col(
447                        ColumnDef::new(ActorDispatcher::ActorId)
448                            .integer()
449                            .not_null(),
450                    )
451                    .col(
452                        ColumnDef::new(ActorDispatcher::DispatcherType)
453                            .string()
454                            .not_null(),
455                    )
456                    .col(
457                        ColumnDef::new(ActorDispatcher::DistKeyIndices)
458                            .json_binary()
459                            .not_null(),
460                    )
461                    .col(
462                        ColumnDef::new(ActorDispatcher::OutputIndices)
463                            .json_binary()
464                            .not_null(),
465                    )
466                    .col(ColumnDef::new(ActorDispatcher::HashMapping).rw_binary(manager))
467                    .col(
468                        ColumnDef::new(ActorDispatcher::DispatcherId)
469                            .integer()
470                            .not_null(),
471                    )
472                    .col(
473                        ColumnDef::new(ActorDispatcher::DownstreamActorIds)
474                            .json_binary()
475                            .not_null(),
476                    )
477                    .col(ColumnDef::new(ActorDispatcher::DownstreamTableName).string())
478                    .foreign_key(
479                        &mut ForeignKey::create()
480                            .name("FK_actor_dispatcher_actor_id")
481                            .from(ActorDispatcher::Table, ActorDispatcher::ActorId)
482                            .to(Actor::Table, Actor::ActorId)
483                            .on_delete(ForeignKeyAction::Cascade)
484                            .to_owned(),
485                    )
486                    .foreign_key(
487                        &mut ForeignKey::create()
488                            .name("FK_actor_dispatcher_dispatcher_id")
489                            .from(ActorDispatcher::Table, ActorDispatcher::DispatcherId)
490                            .to(Fragment::Table, Fragment::FragmentId)
491                            .on_delete(ForeignKeyAction::Cascade)
492                            .to_owned(),
493                    )
494                    .to_owned(),
495            )
496            .await?;
497        manager
498            .create_table(
499                MigrationTable::create()
500                    .table(Connection::Table)
501                    .col(
502                        ColumnDef::new(Connection::ConnectionId)
503                            .integer()
504                            .primary_key(),
505                    )
506                    .col(ColumnDef::new(Connection::Name).string().not_null())
507                    .col(
508                        ColumnDef::new(Connection::Info)
509                            .rw_binary(manager)
510                            .not_null(),
511                    )
512                    .foreign_key(
513                        &mut ForeignKey::create()
514                            .name("FK_connection_object_id")
515                            .from(Connection::Table, Connection::ConnectionId)
516                            .to(Object::Table, Object::Oid)
517                            .on_delete(ForeignKeyAction::Cascade)
518                            .to_owned(),
519                    )
520                    .to_owned(),
521            )
522            .await?;
523        manager
524            .create_table(
525                MigrationTable::create()
526                    .table(Source::Table)
527                    .col(ColumnDef::new(Source::SourceId).integer().primary_key())
528                    .col(ColumnDef::new(Source::Name).string().not_null())
529                    .col(ColumnDef::new(Source::RowIdIndex).integer())
530                    .col(
531                        ColumnDef::new(Source::Columns)
532                            .rw_binary(manager)
533                            .not_null(),
534                    )
535                    .col(ColumnDef::new(Source::PkColumnIds).json_binary().not_null())
536                    .col(
537                        ColumnDef::new(Source::WithProperties)
538                            .json_binary()
539                            .not_null(),
540                    )
541                    .col(
542                        ColumnDef::new(Source::Definition)
543                            .rw_long_text(manager)
544                            .not_null(),
545                    )
546                    .col(ColumnDef::new(Source::SourceInfo).rw_binary(manager))
547                    .col(
548                        ColumnDef::new(Source::WatermarkDescs)
549                            .rw_binary(manager)
550                            .not_null(),
551                    )
552                    .col(ColumnDef::new(Source::OptionalAssociatedTableId).integer())
553                    .col(ColumnDef::new(Source::ConnectionId).integer())
554                    .col(ColumnDef::new(Source::Version).big_integer().not_null())
555                    .foreign_key(
556                        &mut ForeignKey::create()
557                            .name("FK_source_object_id")
558                            .from(Source::Table, Source::SourceId)
559                            .to(Object::Table, Object::Oid)
560                            .on_delete(ForeignKeyAction::Cascade)
561                            .to_owned(),
562                    )
563                    .foreign_key(
564                        &mut ForeignKey::create()
565                            .name("FK_source_connection_id")
566                            .from(Source::Table, Source::ConnectionId)
567                            .to(Connection::Table, Connection::ConnectionId)
568                            .to_owned(),
569                    )
570                    .foreign_key(
571                        &mut ForeignKey::create()
572                            .name("FK_source_optional_associated_table_id")
573                            .from(Source::Table, Source::OptionalAssociatedTableId)
574                            .to(Object::Table, Object::Oid)
575                            .on_delete(ForeignKeyAction::Cascade)
576                            .to_owned(),
577                    )
578                    .to_owned(),
579            )
580            .await?;
581        manager
582            .create_table(
583                MigrationTable::create()
584                    .table(Table::Table)
585                    .col(ColumnDef::new(Table::TableId).integer().primary_key())
586                    .col(ColumnDef::new(Table::Name).string().not_null())
587                    .col(ColumnDef::new(Table::OptionalAssociatedSourceId).integer())
588                    .col(ColumnDef::new(Table::TableType).string().not_null())
589                    .col(ColumnDef::new(Table::BelongsToJobId).integer())
590                    .col(ColumnDef::new(Table::Columns).rw_binary(manager).not_null())
591                    .col(ColumnDef::new(Table::Pk).rw_binary(manager).not_null())
592                    .col(
593                        ColumnDef::new(Table::DistributionKey)
594                            .json_binary()
595                            .not_null(),
596                    )
597                    .col(ColumnDef::new(Table::StreamKey).json_binary().not_null())
598                    .col(ColumnDef::new(Table::AppendOnly).boolean().not_null())
599                    .col(ColumnDef::new(Table::FragmentId).integer())
600                    .col(ColumnDef::new(Table::VnodeColIndex).integer())
601                    .col(ColumnDef::new(Table::RowIdIndex).integer())
602                    .col(ColumnDef::new(Table::ValueIndices).json_binary().not_null())
603                    .col(
604                        ColumnDef::new(Table::Definition)
605                            .rw_long_text(manager)
606                            .not_null(),
607                    )
608                    .col(
609                        ColumnDef::new(Table::HandlePkConflictBehavior)
610                            .string()
611                            .not_null(),
612                    )
613                    .col(
614                        ColumnDef::new(Table::ReadPrefixLenHint)
615                            .integer()
616                            .not_null(),
617                    )
618                    .col(
619                        ColumnDef::new(Table::WatermarkIndices)
620                            .json_binary()
621                            .not_null(),
622                    )
623                    .col(ColumnDef::new(Table::DistKeyInPk).json_binary().not_null())
624                    .col(ColumnDef::new(Table::DmlFragmentId).integer())
625                    .col(ColumnDef::new(Table::Cardinality).rw_binary(manager))
626                    .col(
627                        ColumnDef::new(Table::CleanedByWatermark)
628                            .boolean()
629                            .not_null(),
630                    )
631                    .col(ColumnDef::new(Table::Description).string())
632                    .col(ColumnDef::new(Table::Version).rw_binary(manager))
633                    .col(ColumnDef::new(Table::RetentionSeconds).integer())
634                    .col(
635                        ColumnDef::new(Table::IncomingSinks)
636                            .json_binary()
637                            .not_null(),
638                    )
639                    .foreign_key(
640                        &mut ForeignKey::create()
641                            .name("FK_table_object_id")
642                            .from(Table::Table, Table::TableId)
643                            .to(Object::Table, Object::Oid)
644                            .on_delete(ForeignKeyAction::Cascade)
645                            .to_owned(),
646                    )
647                    .foreign_key(
648                        &mut ForeignKey::create()
649                            .name("FK_table_belongs_to_job_id")
650                            .from(Table::Table, Table::BelongsToJobId)
651                            .to(Object::Table, Object::Oid)
652                            .on_delete(ForeignKeyAction::Cascade)
653                            .to_owned(),
654                    )
655                    .foreign_key(
656                        &mut ForeignKey::create()
657                            .name("FK_table_fragment_id")
658                            .from(Table::Table, Table::FragmentId)
659                            .to(Fragment::Table, Fragment::FragmentId)
660                            .on_delete(ForeignKeyAction::Cascade)
661                            .to_owned(),
662                    )
663                    .foreign_key(
664                        &mut ForeignKey::create()
665                            .name("FK_table_dml_fragment_id")
666                            .from(Table::Table, Table::DmlFragmentId)
667                            .to(Fragment::Table, Fragment::FragmentId)
668                            .to_owned(),
669                    )
670                    .foreign_key(
671                        &mut ForeignKey::create()
672                            .name("FK_table_optional_associated_source_id")
673                            .from(Table::Table, Table::OptionalAssociatedSourceId)
674                            .to(Object::Table, Object::Oid)
675                            .on_delete(ForeignKeyAction::Cascade)
676                            .to_owned(),
677                    )
678                    .to_owned(),
679            )
680            .await?;
681        manager
682            .create_table(
683                MigrationTable::create()
684                    .table(Sink::Table)
685                    .col(ColumnDef::new(Sink::SinkId).integer().primary_key())
686                    .col(ColumnDef::new(Sink::Name).string().not_null())
687                    .col(ColumnDef::new(Sink::Columns).rw_binary(manager).not_null())
688                    .col(ColumnDef::new(Sink::PlanPk).rw_binary(manager).not_null())
689                    .col(
690                        ColumnDef::new(Sink::DistributionKey)
691                            .json_binary()
692                            .not_null(),
693                    )
694                    .col(ColumnDef::new(Sink::DownstreamPk).json_binary().not_null())
695                    .col(ColumnDef::new(Sink::SinkType).string().not_null())
696                    .col(ColumnDef::new(Sink::Properties).json_binary().not_null())
697                    .col(
698                        ColumnDef::new(Sink::Definition)
699                            .rw_long_text(manager)
700                            .not_null(),
701                    )
702                    .col(ColumnDef::new(Sink::ConnectionId).integer())
703                    .col(ColumnDef::new(Sink::DbName).string().not_null())
704                    .col(ColumnDef::new(Sink::SinkFromName).string().not_null())
705                    .col(ColumnDef::new(Sink::SinkFormatDesc).rw_binary(manager))
706                    .col(ColumnDef::new(Sink::TargetTable).integer())
707                    .foreign_key(
708                        &mut ForeignKey::create()
709                            .name("FK_sink_object_id")
710                            .from(Sink::Table, Sink::SinkId)
711                            .to(Object::Table, Object::Oid)
712                            .on_delete(ForeignKeyAction::Cascade)
713                            .to_owned(),
714                    )
715                    .foreign_key(
716                        &mut ForeignKey::create()
717                            .name("FK_sink_connection_id")
718                            .from(Sink::Table, Sink::ConnectionId)
719                            .to(Connection::Table, Connection::ConnectionId)
720                            .to_owned(),
721                    )
722                    .foreign_key(
723                        &mut ForeignKey::create()
724                            .name("FK_sink_target_table_id")
725                            .from(Sink::Table, Sink::TargetTable)
726                            .to(Table::Table, Table::TableId)
727                            .to_owned(),
728                    )
729                    .to_owned(),
730            )
731            .await?;
732        manager
733            .create_table(
734                MigrationTable::create()
735                    .table(View::Table)
736                    .col(ColumnDef::new(View::ViewId).integer().primary_key())
737                    .col(ColumnDef::new(View::Name).string().not_null())
738                    .col(ColumnDef::new(View::Properties).json_binary().not_null())
739                    .col(
740                        ColumnDef::new(View::Definition)
741                            .rw_long_text(manager)
742                            .not_null(),
743                    )
744                    .col(ColumnDef::new(View::Columns).rw_binary(manager).not_null())
745                    .foreign_key(
746                        &mut ForeignKey::create()
747                            .name("FK_view_object_id")
748                            .from(View::Table, View::ViewId)
749                            .to(Object::Table, Object::Oid)
750                            .on_delete(ForeignKeyAction::Cascade)
751                            .to_owned(),
752                    )
753                    .to_owned(),
754            )
755            .await?;
756        manager
757            .create_table(
758                MigrationTable::create()
759                    .table(Index::Table)
760                    .col(ColumnDef::new(Index::IndexId).integer().primary_key())
761                    .col(ColumnDef::new(Index::Name).string().not_null())
762                    .col(ColumnDef::new(Index::IndexTableId).integer().not_null())
763                    .col(ColumnDef::new(Index::PrimaryTableId).integer().not_null())
764                    .col(
765                        ColumnDef::new(Index::IndexItems)
766                            .rw_binary(manager)
767                            .not_null(),
768                    )
769                    .col(ColumnDef::new(Index::IndexColumnsLen).integer().not_null())
770                    .foreign_key(
771                        &mut ForeignKey::create()
772                            .name("FK_index_object_id")
773                            .from(Index::Table, Index::IndexId)
774                            .to(Object::Table, Object::Oid)
775                            .on_delete(ForeignKeyAction::Cascade)
776                            .to_owned(),
777                    )
778                    .foreign_key(
779                        &mut ForeignKey::create()
780                            .name("FK_index_index_table_id")
781                            .from(Index::Table, Index::IndexTableId)
782                            .to(Table::Table, Table::TableId)
783                            .on_delete(ForeignKeyAction::Cascade)
784                            .to_owned(),
785                    )
786                    .foreign_key(
787                        &mut ForeignKey::create()
788                            .name("FK_index_primary_table_id")
789                            .from(Index::Table, Index::PrimaryTableId)
790                            .to(Table::Table, Table::TableId)
791                            .on_delete(ForeignKeyAction::Cascade)
792                            .to_owned(),
793                    )
794                    .to_owned(),
795            )
796            .await?;
797        manager
798            .create_table(
799                MigrationTable::create()
800                    .table(Function::Table)
801                    .col(ColumnDef::new(Function::FunctionId).integer().primary_key())
802                    .col(ColumnDef::new(Function::Name).string().not_null())
803                    .col(ColumnDef::new(Function::ArgNames).string().not_null())
804                    .col(
805                        ColumnDef::new(Function::ArgTypes)
806                            .rw_binary(manager)
807                            .not_null(),
808                    )
809                    .col(
810                        ColumnDef::new(Function::ReturnType)
811                            .rw_binary(manager)
812                            .not_null(),
813                    )
814                    .col(ColumnDef::new(Function::Language).string().not_null())
815                    .col(ColumnDef::new(Function::Link).string())
816                    .col(ColumnDef::new(Function::Identifier).string())
817                    .col(ColumnDef::new(Function::Body).rw_long_text(manager))
818                    // XXX: should this be binary type instead?
819                    .col(ColumnDef::new(Function::CompressedBinary).rw_long_text(manager))
820                    .col(ColumnDef::new(Function::Kind).string().not_null())
821                    .col(
822                        ColumnDef::new(Function::AlwaysRetryOnNetworkError)
823                            .boolean()
824                            .not_null(),
825                    )
826                    .foreign_key(
827                        &mut ForeignKey::create()
828                            .name("FK_function_object_id")
829                            .from(Function::Table, Function::FunctionId)
830                            .to(Object::Table, Object::Oid)
831                            .on_delete(ForeignKeyAction::Cascade)
832                            .to_owned(),
833                    )
834                    .to_owned(),
835            )
836            .await?;
837        manager
838            .create_table(
839                MigrationTable::create()
840                    .table(SystemParameter::Table)
841                    .col(
842                        ColumnDef::new(SystemParameter::Name)
843                            .string()
844                            .primary_key()
845                            .not_null(),
846                    )
847                    .col(ColumnDef::new(SystemParameter::Value).string().not_null())
848                    .col(
849                        ColumnDef::new(SystemParameter::IsMutable)
850                            .boolean()
851                            .not_null(),
852                    )
853                    .col(ColumnDef::new(SystemParameter::Description).string())
854                    .to_owned(),
855            )
856            .await?;
857        manager
858            .create_table(
859                crate::Table::create()
860                    .table(CatalogVersion::Table)
861                    .col(
862                        ColumnDef::new(CatalogVersion::Name)
863                            .string()
864                            .not_null()
865                            .primary_key(),
866                    )
867                    .col(
868                        ColumnDef::new(CatalogVersion::Version)
869                            .big_integer()
870                            .not_null(),
871                    )
872                    .to_owned(),
873            )
874            .await?;
875
876        // 3. create indexes.
877        manager
878            .create_index(
879                MigrationIndex::create()
880                    .table(Worker::Table)
881                    .name("idx_worker_host_port")
882                    .unique()
883                    .col(Worker::Host)
884                    .col(Worker::Port)
885                    .to_owned(),
886            )
887            .await?;
888        manager
889            .create_index(
890                MigrationIndex::create()
891                    .table(UserPrivilege::Table)
892                    .name("idx_user_privilege_item")
893                    .unique()
894                    .col(UserPrivilege::UserId)
895                    .col(UserPrivilege::Oid)
896                    .col(UserPrivilege::Action)
897                    .col(UserPrivilege::GrantedBy)
898                    .to_owned(),
899            )
900            .await?;
901
902        // 4. initialize data.
903        let insert_cluster_id = Query::insert()
904            .into_table(Cluster::Table)
905            .columns([Cluster::ClusterId])
906            .values_panic([uuid::Uuid::new_v4().into()])
907            .to_owned();
908        let insert_sys_users = Query::insert()
909            .into_table(User::Table)
910            .columns([
911                User::UserId,
912                User::Name,
913                User::IsSuper,
914                User::CanCreateUser,
915                User::CanCreateDb,
916                User::CanLogin,
917            ])
918            .values_panic([
919                1.into(),
920                "root".into(),
921                true.into(),
922                true.into(),
923                true.into(),
924                true.into(),
925            ])
926            .values_panic([
927                2.into(),
928                "postgres".into(),
929                true.into(),
930                true.into(),
931                true.into(),
932                true.into(),
933            ])
934            .to_owned();
935
936        // Since User table is newly created, we assume that the initial user id of `root` is 1 and `postgres` is 2.
937        let insert_objects = Query::insert()
938            .into_table(Object::Table)
939            .columns([
940                Object::Oid,
941                Object::ObjType,
942                Object::OwnerId,
943                Object::DatabaseId,
944            ])
945            .values_panic([1.into(), "DATABASE".into(), 1.into(), None::<i32>.into()])
946            .values_panic([2.into(), "SCHEMA".into(), 1.into(), 1.into()]) // public
947            .values_panic([3.into(), "SCHEMA".into(), 1.into(), 1.into()]) // pg_catalog
948            .values_panic([4.into(), "SCHEMA".into(), 1.into(), 1.into()]) // information_schema
949            .values_panic([5.into(), "SCHEMA".into(), 1.into(), 1.into()]) // rw_catalog
950            .to_owned();
951
952        let insert_sys_database = Query::insert()
953            .into_table(Database::Table)
954            .columns([Database::DatabaseId, Database::Name])
955            .values_panic([1.into(), "dev".into()])
956            .to_owned();
957        let insert_sys_schemas = Query::insert()
958            .into_table(Schema::Table)
959            .columns([Schema::SchemaId, Schema::Name])
960            .values_panic([2.into(), "public".into()])
961            .values_panic([3.into(), "pg_catalog".into()])
962            .values_panic([4.into(), "information_schema".into()])
963            .values_panic([5.into(), "rw_catalog".into()])
964            .to_owned();
965
966        manager.exec_stmt(insert_cluster_id).await?;
967        manager.exec_stmt(insert_sys_users).await?;
968        manager.exec_stmt(insert_objects).await?;
969        manager.exec_stmt(insert_sys_database).await?;
970        manager.exec_stmt(insert_sys_schemas).await?;
971
972        // Rest auto increment offset
973        match manager.get_database_backend() {
974            DbBackend::MySql => {
975                manager
976                    .get_connection()
977                    .execute(Statement::from_string(
978                        DatabaseBackend::MySql,
979                        "ALTER TABLE object AUTO_INCREMENT = 6",
980                    ))
981                    .await?;
982                manager
983                    .get_connection()
984                    .execute(Statement::from_string(
985                        DatabaseBackend::MySql,
986                        "ALTER TABLE user AUTO_INCREMENT = 3",
987                    ))
988                    .await?;
989            }
990            DbBackend::Postgres => {
991                manager
992                    .get_connection()
993                    .execute(Statement::from_string(
994                        DatabaseBackend::Postgres,
995                        "SELECT setval('object_oid_seq', 5)",
996                    ))
997                    .await?;
998                manager
999                    .get_connection()
1000                    .execute(Statement::from_string(
1001                        DatabaseBackend::Postgres,
1002                        "SELECT setval('user_user_id_seq', 2)",
1003                    ))
1004                    .await?;
1005            }
1006            DbBackend::Sqlite => {}
1007        }
1008
1009        Ok(())
1010    }
1011
1012    async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
1013        // drop tables cascade.
1014        drop_tables!(
1015            manager,
1016            Cluster,
1017            WorkerProperty,
1018            Worker,
1019            UserPrivilege,
1020            Database,
1021            Schema,
1022            StreamingJob,
1023            ActorDispatcher,
1024            Actor,
1025            Sink,
1026            Index,
1027            Table,
1028            Fragment,
1029            Source,
1030            Connection,
1031            View,
1032            Function,
1033            ObjectDependency,
1034            Object,
1035            User,
1036            SystemParameter,
1037            CatalogVersion
1038        );
1039        Ok(())
1040    }
1041}
1042
1043#[derive(DeriveIden)]
1044enum Cluster {
1045    Table,
1046    ClusterId,
1047    CreatedAt,
1048}
1049
1050#[derive(DeriveIden)]
1051enum Worker {
1052    Table,
1053    WorkerId,
1054    WorkerType,
1055    Host,
1056    Port,
1057    TransactionId,
1058    Status,
1059}
1060
1061#[derive(DeriveIden)]
1062enum WorkerProperty {
1063    Table,
1064    WorkerId,
1065    ParallelUnitIds,
1066    IsStreaming,
1067    IsServing,
1068    IsUnschedulable,
1069}
1070
1071#[derive(DeriveIden)]
1072enum User {
1073    Table,
1074    UserId,
1075    Name,
1076    IsSuper,
1077    CanCreateDb,
1078    CanCreateUser,
1079    CanLogin,
1080    AuthInfo,
1081}
1082
1083#[derive(DeriveIden)]
1084enum UserPrivilege {
1085    Table,
1086    Id,
1087    DependentId,
1088    UserId,
1089    Oid,
1090    GrantedBy,
1091    Action,
1092    WithGrantOption,
1093}
1094
1095#[derive(DeriveIden)]
1096enum Database {
1097    Table,
1098    DatabaseId,
1099    Name,
1100}
1101
1102#[derive(DeriveIden)]
1103enum Schema {
1104    Table,
1105    SchemaId,
1106    Name,
1107}
1108
1109#[derive(DeriveIden)]
1110enum Fragment {
1111    Table,
1112    FragmentId,
1113    JobId,
1114    FragmentTypeMask,
1115    DistributionType,
1116    StreamNode,
1117    VnodeMapping,
1118    StateTableIds,
1119    UpstreamFragmentId,
1120}
1121
1122#[derive(DeriveIden)]
1123enum Actor {
1124    Table,
1125    ActorId,
1126    FragmentId,
1127    Status,
1128    Splits,
1129    ParallelUnitId,
1130    WorkerId,
1131    UpstreamActorIds,
1132    VnodeBitmap,
1133    ExprContext,
1134}
1135
1136#[derive(DeriveIden)]
1137enum ActorDispatcher {
1138    Table,
1139    Id,
1140    ActorId,
1141    DispatcherType,
1142    DistKeyIndices,
1143    OutputIndices,
1144    HashMapping,
1145    DispatcherId,
1146    DownstreamActorIds,
1147    DownstreamTableName,
1148}
1149
1150#[derive(DeriveIden)]
1151enum StreamingJob {
1152    Table,
1153    JobId,
1154    JobStatus,
1155    Timezone,
1156    CreateType,
1157    Parallelism,
1158}
1159
1160#[derive(DeriveIden)]
1161#[allow(clippy::enum_variant_names)]
1162enum Table {
1163    Table,
1164    TableId,
1165    Name,
1166    OptionalAssociatedSourceId,
1167    TableType,
1168    BelongsToJobId,
1169    Columns,
1170    Pk,
1171    DistributionKey,
1172    StreamKey,
1173    AppendOnly,
1174    FragmentId,
1175    VnodeColIndex,
1176    RowIdIndex,
1177    ValueIndices,
1178    Definition,
1179    HandlePkConflictBehavior,
1180    ReadPrefixLenHint,
1181    WatermarkIndices,
1182    DistKeyInPk,
1183    DmlFragmentId,
1184    Cardinality,
1185    CleanedByWatermark,
1186    Description,
1187    Version,
1188    RetentionSeconds,
1189    IncomingSinks,
1190}
1191
1192#[derive(DeriveIden)]
1193enum Source {
1194    Table,
1195    SourceId,
1196    Name,
1197    RowIdIndex,
1198    Columns,
1199    PkColumnIds,
1200    WithProperties,
1201    Definition,
1202    SourceInfo,
1203    WatermarkDescs,
1204    OptionalAssociatedTableId,
1205    ConnectionId,
1206    Version,
1207}
1208
1209#[derive(DeriveIden)]
1210enum Sink {
1211    Table,
1212    SinkId,
1213    Name,
1214    Columns,
1215    PlanPk,
1216    DistributionKey,
1217    DownstreamPk,
1218    SinkType,
1219    Properties,
1220    Definition,
1221    ConnectionId,
1222    DbName,
1223    SinkFromName,
1224    SinkFormatDesc,
1225    TargetTable,
1226}
1227
1228#[derive(DeriveIden)]
1229enum Connection {
1230    Table,
1231    ConnectionId,
1232    Name,
1233    Info,
1234}
1235
1236#[derive(DeriveIden)]
1237enum View {
1238    Table,
1239    ViewId,
1240    Name,
1241    Properties,
1242    Definition,
1243    Columns,
1244}
1245
1246#[derive(DeriveIden)]
1247enum Index {
1248    Table,
1249    IndexId,
1250    Name,
1251    IndexTableId,
1252    PrimaryTableId,
1253    IndexItems,
1254    IndexColumnsLen,
1255}
1256
1257#[derive(DeriveIden)]
1258enum Function {
1259    Table,
1260    FunctionId,
1261    Name,
1262    ArgNames,
1263    ArgTypes,
1264    ReturnType,
1265    Language,
1266    Link,
1267    Identifier,
1268    Body,
1269    CompressedBinary,
1270    Kind,
1271    AlwaysRetryOnNetworkError,
1272}
1273
1274#[derive(DeriveIden)]
1275pub(crate) enum Object {
1276    Table,
1277    Oid,
1278    ObjType,
1279    OwnerId,
1280    SchemaId,
1281    DatabaseId,
1282    InitializedAt,
1283    CreatedAt,
1284    InitializedAtClusterVersion,
1285    CreatedAtClusterVersion,
1286}
1287
1288#[derive(DeriveIden)]
1289enum ObjectDependency {
1290    Table,
1291    Id,
1292    Oid,
1293    UsedBy,
1294}
1295
1296#[derive(DeriveIden)]
1297enum SystemParameter {
1298    Table,
1299    Name,
1300    Value,
1301    IsMutable,
1302    Description,
1303}
1304
1305#[derive(DeriveIden)]
1306enum CatalogVersion {
1307    Table,
1308    Name,
1309    Version,
1310}