risingwave_meta_model_migration/
m20230908_072257_init.rs

1use sea_orm_migration::prelude::{Index as MigrationIndex, Table as MigrationTable, *};
2
3use crate::sea_orm::{DatabaseBackend, DbBackend, Statement};
4use crate::utils::ColumnDefExt;
5use crate::{assert_not_has_tables, drop_tables};
6
7#[derive(DeriveMigrationName)]
8pub struct Migration;
9
10#[async_trait::async_trait]
11impl MigrationTrait for Migration {
12    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
13        // 1. check if the table exists.
14        assert_not_has_tables!(
15            manager,
16            Cluster,
17            Worker,
18            WorkerProperty,
19            User,
20            UserPrivilege,
21            Database,
22            Schema,
23            StreamingJob,
24            Fragment,
25            Actor,
26            ActorDispatcher,
27            Table,
28            Source,
29            Sink,
30            Connection,
31            View,
32            Index,
33            Function,
34            Object,
35            ObjectDependency,
36            SystemParameter,
37            CatalogVersion
38        );
39
40        // In Mysql, The CHAR, VARCHAR and TEXT types are encoded in utf8_general_ci by default, which is not case sensitive but
41        // required in risingwave. Here we need to change the database collate to utf8mb4_bin.
42        if manager.get_database_backend() == DbBackend::MySql {
43            manager
44                .get_connection()
45                .execute_unprepared("ALTER DATABASE CHARACTER SET utf8mb4 COLLATE utf8mb4_bin")
46                .await
47                .expect("failed to set database collate");
48        }
49
50        // 2. create tables.
51        manager
52            .create_table(
53                MigrationTable::create()
54                    .table(Cluster::Table)
55                    .col(
56                        ColumnDef::new(Cluster::ClusterId)
57                            .uuid()
58                            .not_null()
59                            .primary_key(),
60                    )
61                    .col(
62                        ColumnDef::new(Cluster::CreatedAt)
63                            .date_time()
64                            .default(Expr::current_timestamp())
65                            .not_null(),
66                    )
67                    .to_owned(),
68            )
69            .await?;
70        manager
71            .create_table(
72                MigrationTable::create()
73                    .table(Worker::Table)
74                    .col(
75                        ColumnDef::new(Worker::WorkerId)
76                            .integer()
77                            .not_null()
78                            .auto_increment()
79                            .primary_key(),
80                    )
81                    .col(ColumnDef::new(Worker::WorkerType).string().not_null())
82                    .col(ColumnDef::new(Worker::Host).string().not_null())
83                    .col(ColumnDef::new(Worker::Port).integer().not_null())
84                    .col(ColumnDef::new(Worker::Status).string().not_null())
85                    .col(ColumnDef::new(Worker::TransactionId).integer())
86                    .to_owned(),
87            )
88            .await?;
89        manager
90            .create_table(
91                MigrationTable::create()
92                    .table(WorkerProperty::Table)
93                    .col(
94                        ColumnDef::new(WorkerProperty::WorkerId)
95                            .integer()
96                            .primary_key(),
97                    )
98                    .col(
99                        ColumnDef::new(WorkerProperty::ParallelUnitIds)
100                            .json_binary()
101                            .not_null(),
102                    )
103                    .col(
104                        ColumnDef::new(WorkerProperty::IsStreaming)
105                            .boolean()
106                            .not_null(),
107                    )
108                    .col(
109                        ColumnDef::new(WorkerProperty::IsServing)
110                            .boolean()
111                            .not_null(),
112                    )
113                    .col(
114                        ColumnDef::new(WorkerProperty::IsUnschedulable)
115                            .boolean()
116                            .not_null(),
117                    )
118                    .foreign_key(
119                        &mut ForeignKey::create()
120                            .name("FK_worker_property_worker_id")
121                            .from(WorkerProperty::Table, WorkerProperty::WorkerId)
122                            .to(Worker::Table, Worker::WorkerId)
123                            .on_delete(ForeignKeyAction::Cascade)
124                            .to_owned(),
125                    )
126                    .to_owned(),
127            )
128            .await?;
129        manager
130            .create_table(
131                MigrationTable::create()
132                    .table(User::Table)
133                    .col(
134                        ColumnDef::new(User::UserId)
135                            .integer()
136                            .primary_key()
137                            .auto_increment(),
138                    )
139                    .col(ColumnDef::new(User::Name).string().unique_key().not_null())
140                    .col(ColumnDef::new(User::IsSuper).boolean().not_null())
141                    .col(ColumnDef::new(User::CanCreateDb).boolean().not_null())
142                    .col(ColumnDef::new(User::CanCreateUser).boolean().not_null())
143                    .col(ColumnDef::new(User::CanLogin).boolean().not_null())
144                    .col(ColumnDef::new(User::AuthInfo).rw_binary(manager))
145                    .to_owned(),
146            )
147            .await?;
148        manager
149            .create_table(
150                MigrationTable::create()
151                    .table(Object::Table)
152                    .col(
153                        ColumnDef::new(Object::Oid)
154                            .integer()
155                            .auto_increment()
156                            .primary_key(),
157                    )
158                    .col(ColumnDef::new(Object::ObjType).string().not_null())
159                    .col(ColumnDef::new(Object::OwnerId).integer().not_null())
160                    .col(ColumnDef::new(Object::SchemaId).integer())
161                    .col(ColumnDef::new(Object::DatabaseId).integer())
162                    .col(
163                        ColumnDef::new(Object::InitializedAt)
164                            .date_time()
165                            .default(Expr::current_timestamp())
166                            .not_null(),
167                    )
168                    .col(
169                        ColumnDef::new(Object::CreatedAt)
170                            .date_time()
171                            .default(Expr::current_timestamp())
172                            .not_null(),
173                    )
174                    .col(ColumnDef::new(Object::InitializedAtClusterVersion).string())
175                    .col(ColumnDef::new(Object::CreatedAtClusterVersion).string())
176                    .foreign_key(
177                        &mut ForeignKey::create()
178                            .name("FK_object_owner_id")
179                            .from(Object::Table, Object::OwnerId)
180                            .to(User::Table, User::UserId)
181                            .on_delete(ForeignKeyAction::Cascade)
182                            .to_owned(),
183                    )
184                    .foreign_key(
185                        &mut ForeignKey::create()
186                            .name("FK_object_database_id")
187                            .from(Object::Table, Object::DatabaseId)
188                            .to(Object::Table, Object::Oid)
189                            .on_delete(ForeignKeyAction::Cascade)
190                            .to_owned(),
191                    )
192                    .foreign_key(
193                        &mut ForeignKey::create()
194                            .name("FK_object_schema_id")
195                            .from(Object::Table, Object::SchemaId)
196                            .to(Object::Table, Object::Oid)
197                            .on_delete(ForeignKeyAction::Cascade)
198                            .to_owned(),
199                    )
200                    .to_owned(),
201            )
202            .await?;
203        manager
204            .create_table(
205                MigrationTable::create()
206                    .table(UserPrivilege::Table)
207                    .col(
208                        ColumnDef::new(UserPrivilege::Id)
209                            .integer()
210                            .primary_key()
211                            .auto_increment(),
212                    )
213                    .col(ColumnDef::new(UserPrivilege::DependentId).integer())
214                    .col(ColumnDef::new(UserPrivilege::UserId).integer().not_null())
215                    .col(ColumnDef::new(UserPrivilege::Oid).integer().not_null())
216                    .col(
217                        ColumnDef::new(UserPrivilege::GrantedBy)
218                            .integer()
219                            .not_null(),
220                    )
221                    .col(ColumnDef::new(UserPrivilege::Action).string().not_null())
222                    .col(
223                        ColumnDef::new(UserPrivilege::WithGrantOption)
224                            .boolean()
225                            .not_null(),
226                    )
227                    .foreign_key(
228                        &mut ForeignKey::create()
229                            .name("FK_user_privilege_dependent_id")
230                            .from(UserPrivilege::Table, UserPrivilege::DependentId)
231                            .to(UserPrivilege::Table, UserPrivilege::Id)
232                            .on_delete(ForeignKeyAction::Cascade)
233                            .to_owned(),
234                    )
235                    .foreign_key(
236                        &mut ForeignKey::create()
237                            .name("FK_user_privilege_user_id")
238                            .from(UserPrivilege::Table, UserPrivilege::UserId)
239                            .to(User::Table, User::UserId)
240                            .on_delete(ForeignKeyAction::Cascade)
241                            .to_owned(),
242                    )
243                    .foreign_key(
244                        &mut ForeignKey::create()
245                            .name("FK_user_privilege_granted_by")
246                            .from(UserPrivilege::Table, UserPrivilege::GrantedBy)
247                            .to(User::Table, User::UserId)
248                            .to_owned(),
249                    )
250                    .foreign_key(
251                        &mut ForeignKey::create()
252                            .name("FK_user_privilege_oid")
253                            .from(UserPrivilege::Table, UserPrivilege::Oid)
254                            .to(Object::Table, Object::Oid)
255                            .on_delete(ForeignKeyAction::Cascade)
256                            .to_owned(),
257                    )
258                    .to_owned(),
259            )
260            .await?;
261        manager
262            .create_table(
263                MigrationTable::create()
264                    .table(ObjectDependency::Table)
265                    .col(
266                        ColumnDef::new(ObjectDependency::Id)
267                            .integer()
268                            .auto_increment()
269                            .primary_key(),
270                    )
271                    .col(ColumnDef::new(ObjectDependency::Oid).integer().not_null())
272                    .col(
273                        ColumnDef::new(ObjectDependency::UsedBy)
274                            .integer()
275                            .not_null(),
276                    )
277                    .foreign_key(
278                        &mut ForeignKey::create()
279                            .name("FK_object_dependency_oid")
280                            .from(ObjectDependency::Table, ObjectDependency::Oid)
281                            .to(Object::Table, Object::Oid)
282                            .on_delete(ForeignKeyAction::Cascade)
283                            .to_owned(),
284                    )
285                    .foreign_key(
286                        &mut ForeignKey::create()
287                            .name("FK_object_dependency_used_by")
288                            .from(ObjectDependency::Table, ObjectDependency::UsedBy)
289                            .to(Object::Table, Object::Oid)
290                            .on_delete(ForeignKeyAction::Cascade)
291                            .to_owned(),
292                    )
293                    .to_owned(),
294            )
295            .await?;
296        manager
297            .create_table(
298                MigrationTable::create()
299                    .table(Database::Table)
300                    .col(ColumnDef::new(Database::DatabaseId).integer().primary_key())
301                    .col(
302                        ColumnDef::new(Database::Name)
303                            .string()
304                            .unique_key()
305                            .not_null(),
306                    )
307                    .foreign_key(
308                        &mut ForeignKey::create()
309                            .name("FK_database_object_id")
310                            .from(Database::Table, Database::DatabaseId)
311                            .to(Object::Table, Object::Oid)
312                            .on_delete(ForeignKeyAction::Cascade)
313                            .to_owned(),
314                    )
315                    .to_owned(),
316            )
317            .await?;
318        manager
319            .create_table(
320                MigrationTable::create()
321                    .table(Schema::Table)
322                    .col(ColumnDef::new(Schema::SchemaId).integer().primary_key())
323                    .col(ColumnDef::new(Schema::Name).string().not_null())
324                    .foreign_key(
325                        &mut ForeignKey::create()
326                            .name("FK_schema_object_id")
327                            .from(Schema::Table, Schema::SchemaId)
328                            .to(Object::Table, Object::Oid)
329                            .on_delete(ForeignKeyAction::Cascade)
330                            .to_owned(),
331                    )
332                    .to_owned(),
333            )
334            .await?;
335        manager
336            .create_table(
337                MigrationTable::create()
338                    .table(StreamingJob::Table)
339                    .col(ColumnDef::new(StreamingJob::JobId).integer().primary_key())
340                    .col(ColumnDef::new(StreamingJob::JobStatus).string().not_null())
341                    .col(ColumnDef::new(StreamingJob::CreateType).string().not_null())
342                    .col(ColumnDef::new(StreamingJob::Timezone).string())
343                    .col(
344                        ColumnDef::new(StreamingJob::Parallelism)
345                            .json_binary()
346                            .not_null(),
347                    )
348                    .foreign_key(
349                        &mut ForeignKey::create()
350                            .name("FK_streaming_job_object_id")
351                            .from(StreamingJob::Table, StreamingJob::JobId)
352                            .to(Object::Table, Object::Oid)
353                            .on_delete(ForeignKeyAction::Cascade)
354                            .to_owned(),
355                    )
356                    .to_owned(),
357            )
358            .await?;
359        manager
360            .create_table(
361                MigrationTable::create()
362                    .table(Fragment::Table)
363                    .col(
364                        ColumnDef::new(Fragment::FragmentId)
365                            .integer()
366                            .primary_key()
367                            .auto_increment(),
368                    )
369                    .col(ColumnDef::new(Fragment::JobId).integer().not_null())
370                    .col(
371                        ColumnDef::new(Fragment::FragmentTypeMask)
372                            .integer()
373                            .not_null(),
374                    )
375                    .col(
376                        ColumnDef::new(Fragment::DistributionType)
377                            .string()
378                            .not_null(),
379                    )
380                    .col(
381                        ColumnDef::new(Fragment::StreamNode)
382                            .rw_binary(manager)
383                            .not_null(),
384                    )
385                    .col(
386                        ColumnDef::new(Fragment::VnodeMapping)
387                            .rw_binary(manager)
388                            .not_null(),
389                    )
390                    .col(ColumnDef::new(Fragment::StateTableIds).json_binary())
391                    .col(ColumnDef::new(Fragment::UpstreamFragmentId).json_binary())
392                    .foreign_key(
393                        &mut ForeignKey::create()
394                            .name("FK_fragment_table_id")
395                            .from(Fragment::Table, Fragment::JobId)
396                            .to(Object::Table, Object::Oid)
397                            .on_delete(ForeignKeyAction::Cascade)
398                            .to_owned(),
399                    )
400                    .to_owned(),
401            )
402            .await?;
403        manager
404            .create_table(
405                MigrationTable::create()
406                    .table(Actor::Table)
407                    .col(
408                        ColumnDef::new(Actor::ActorId)
409                            .integer()
410                            .primary_key()
411                            .auto_increment(),
412                    )
413                    .col(ColumnDef::new(Actor::FragmentId).integer().not_null())
414                    .col(ColumnDef::new(Actor::Status).string().not_null())
415                    .col(ColumnDef::new(Actor::Splits).rw_binary(manager))
416                    .col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null())
417                    .col(ColumnDef::new(Actor::WorkerId).integer().not_null())
418                    .col(ColumnDef::new(Actor::UpstreamActorIds).json_binary())
419                    .col(ColumnDef::new(Actor::VnodeBitmap).rw_binary(manager))
420                    .col(
421                        ColumnDef::new(Actor::ExprContext)
422                            .rw_binary(manager)
423                            .not_null(),
424                    )
425                    .foreign_key(
426                        &mut ForeignKey::create()
427                            .name("FK_actor_fragment_id")
428                            .from(Actor::Table, Actor::FragmentId)
429                            .to(Fragment::Table, Fragment::FragmentId)
430                            .on_delete(ForeignKeyAction::Cascade)
431                            .to_owned(),
432                    )
433                    .to_owned(),
434            )
435            .await?;
436        manager
437            .create_table(
438                MigrationTable::create()
439                    .table(ActorDispatcher::Table)
440                    .col(
441                        ColumnDef::new(ActorDispatcher::Id)
442                            .integer()
443                            .primary_key()
444                            .auto_increment(),
445                    )
446                    .col(
447                        ColumnDef::new(ActorDispatcher::ActorId)
448                            .integer()
449                            .not_null(),
450                    )
451                    .col(
452                        ColumnDef::new(ActorDispatcher::DispatcherType)
453                            .string()
454                            .not_null(),
455                    )
456                    .col(
457                        ColumnDef::new(ActorDispatcher::DistKeyIndices)
458                            .json_binary()
459                            .not_null(),
460                    )
461                    .col(
462                        ColumnDef::new(ActorDispatcher::OutputIndices)
463                            .json_binary()
464                            .not_null(),
465                    )
466                    .col(ColumnDef::new(ActorDispatcher::HashMapping).rw_binary(manager))
467                    .col(
468                        ColumnDef::new(ActorDispatcher::DispatcherId)
469                            .integer()
470                            .not_null(),
471                    )
472                    .col(
473                        ColumnDef::new(ActorDispatcher::DownstreamActorIds)
474                            .json_binary()
475                            .not_null(),
476                    )
477                    .col(ColumnDef::new(ActorDispatcher::DownstreamTableName).string())
478                    .foreign_key(
479                        &mut ForeignKey::create()
480                            .name("FK_actor_dispatcher_actor_id")
481                            .from(ActorDispatcher::Table, ActorDispatcher::ActorId)
482                            .to(Actor::Table, Actor::ActorId)
483                            .on_delete(ForeignKeyAction::Cascade)
484                            .to_owned(),
485                    )
486                    .foreign_key(
487                        &mut ForeignKey::create()
488                            .name("FK_actor_dispatcher_dispatcher_id")
489                            .from(ActorDispatcher::Table, ActorDispatcher::DispatcherId)
490                            .to(Fragment::Table, Fragment::FragmentId)
491                            .on_delete(ForeignKeyAction::Cascade)
492                            .to_owned(),
493                    )
494                    .to_owned(),
495            )
496            .await?;
497        manager
498            .create_table(
499                MigrationTable::create()
500                    .table(Connection::Table)
501                    .col(
502                        ColumnDef::new(Connection::ConnectionId)
503                            .integer()
504                            .primary_key(),
505                    )
506                    .col(ColumnDef::new(Connection::Name).string().not_null())
507                    .col(
508                        ColumnDef::new(Connection::Info)
509                            .rw_binary(manager)
510                            .not_null(),
511                    )
512                    .foreign_key(
513                        &mut ForeignKey::create()
514                            .name("FK_connection_object_id")
515                            .from(Connection::Table, Connection::ConnectionId)
516                            .to(Object::Table, Object::Oid)
517                            .on_delete(ForeignKeyAction::Cascade)
518                            .to_owned(),
519                    )
520                    .to_owned(),
521            )
522            .await?;
523        manager
524            .create_table(
525                MigrationTable::create()
526                    .table(Source::Table)
527                    .col(ColumnDef::new(Source::SourceId).integer().primary_key())
528                    .col(ColumnDef::new(Source::Name).string().not_null())
529                    .col(ColumnDef::new(Source::RowIdIndex).integer())
530                    .col(
531                        ColumnDef::new(Source::Columns)
532                            .rw_binary(manager)
533                            .not_null(),
534                    )
535                    .col(ColumnDef::new(Source::PkColumnIds).json_binary().not_null())
536                    .col(
537                        ColumnDef::new(Source::WithProperties)
538                            .json_binary()
539                            .not_null(),
540                    )
541                    .col(
542                        ColumnDef::new(Source::Definition)
543                            .rw_long_text(manager)
544                            .not_null(),
545                    )
546                    .col(ColumnDef::new(Source::SourceInfo).rw_binary(manager))
547                    .col(
548                        ColumnDef::new(Source::WatermarkDescs)
549                            .rw_binary(manager)
550                            .not_null(),
551                    )
552                    .col(ColumnDef::new(Source::OptionalAssociatedTableId).integer())
553                    .col(ColumnDef::new(Source::ConnectionId).integer())
554                    .col(ColumnDef::new(Source::Version).big_integer().not_null())
555                    .foreign_key(
556                        &mut ForeignKey::create()
557                            .name("FK_source_object_id")
558                            .from(Source::Table, Source::SourceId)
559                            .to(Object::Table, Object::Oid)
560                            .on_delete(ForeignKeyAction::Cascade)
561                            .to_owned(),
562                    )
563                    .foreign_key(
564                        &mut ForeignKey::create()
565                            .name("FK_source_connection_id")
566                            .from(Source::Table, Source::ConnectionId)
567                            .to(Connection::Table, Connection::ConnectionId)
568                            .to_owned(),
569                    )
570                    .foreign_key(
571                        &mut ForeignKey::create()
572                            .name("FK_source_optional_associated_table_id")
573                            .from(Source::Table, Source::OptionalAssociatedTableId)
574                            .to(Object::Table, Object::Oid)
575                            .on_delete(ForeignKeyAction::Cascade)
576                            .to_owned(),
577                    )
578                    .to_owned(),
579            )
580            .await?;
581        manager
582            .create_table(
583                MigrationTable::create()
584                    .table(Table::Table)
585                    .col(ColumnDef::new(Table::TableId).integer().primary_key())
586                    .col(ColumnDef::new(Table::Name).string().not_null())
587                    .col(ColumnDef::new(Table::OptionalAssociatedSourceId).integer())
588                    .col(ColumnDef::new(Table::TableType).string().not_null())
589                    .col(ColumnDef::new(Table::BelongsToJobId).integer())
590                    .col(ColumnDef::new(Table::Columns).rw_binary(manager).not_null())
591                    .col(ColumnDef::new(Table::Pk).rw_binary(manager).not_null())
592                    .col(
593                        ColumnDef::new(Table::DistributionKey)
594                            .json_binary()
595                            .not_null(),
596                    )
597                    .col(ColumnDef::new(Table::StreamKey).json_binary().not_null())
598                    .col(ColumnDef::new(Table::AppendOnly).boolean().not_null())
599                    .col(ColumnDef::new(Table::FragmentId).integer())
600                    .col(ColumnDef::new(Table::VnodeColIndex).integer())
601                    .col(ColumnDef::new(Table::RowIdIndex).integer())
602                    .col(ColumnDef::new(Table::ValueIndices).json_binary().not_null())
603                    .col(
604                        ColumnDef::new(Table::Definition)
605                            .rw_long_text(manager)
606                            .not_null(),
607                    )
608                    .col(
609                        ColumnDef::new(Table::HandlePkConflictBehavior)
610                            .string()
611                            .not_null(),
612                    )
613                    .col(
614                        ColumnDef::new(Table::ReadPrefixLenHint)
615                            .integer()
616                            .not_null(),
617                    )
618                    .col(
619                        ColumnDef::new(Table::WatermarkIndices)
620                            .json_binary()
621                            .not_null(),
622                    )
623                    .col(ColumnDef::new(Table::DistKeyInPk).json_binary().not_null())
624                    .col(ColumnDef::new(Table::DmlFragmentId).integer())
625                    .col(ColumnDef::new(Table::Cardinality).rw_binary(manager))
626                    .col(
627                        ColumnDef::new(Table::CleanedByWatermark)
628                            .boolean()
629                            .not_null(),
630                    )
631                    .col(ColumnDef::new(Table::Description).string())
632                    .col(ColumnDef::new(Table::Version).rw_binary(manager))
633                    .col(ColumnDef::new(Table::RetentionSeconds).integer())
634                    .col(
635                        ColumnDef::new(Table::IncomingSinks)
636                            .json_binary()
637                            .not_null(),
638                    )
639                    .foreign_key(
640                        &mut ForeignKey::create()
641                            .name("FK_table_object_id")
642                            .from(Table::Table, Table::TableId)
643                            .to(Object::Table, Object::Oid)
644                            .on_delete(ForeignKeyAction::Cascade)
645                            .to_owned(),
646                    )
647                    .foreign_key(
648                        &mut ForeignKey::create()
649                            .name("FK_table_belongs_to_job_id")
650                            .from(Table::Table, Table::BelongsToJobId)
651                            .to(Object::Table, Object::Oid)
652                            .on_delete(ForeignKeyAction::Cascade)
653                            .to_owned(),
654                    )
655                    .foreign_key(
656                        &mut ForeignKey::create()
657                            .name("FK_table_fragment_id")
658                            .from(Table::Table, Table::FragmentId)
659                            .to(Fragment::Table, Fragment::FragmentId)
660                            .on_delete(ForeignKeyAction::Cascade)
661                            .to_owned(),
662                    )
663                    .foreign_key(
664                        &mut ForeignKey::create()
665                            .name("FK_table_dml_fragment_id")
666                            .from(Table::Table, Table::DmlFragmentId)
667                            .to(Fragment::Table, Fragment::FragmentId)
668                            .to_owned(),
669                    )
670                    .foreign_key(
671                        &mut ForeignKey::create()
672                            .name("FK_table_optional_associated_source_id")
673                            .from(Table::Table, Table::OptionalAssociatedSourceId)
674                            .to(Object::Table, Object::Oid)
675                            .on_delete(ForeignKeyAction::Cascade)
676                            .to_owned(),
677                    )
678                    .to_owned(),
679            )
680            .await?;
681        manager
682            .create_table(
683                MigrationTable::create()
684                    .table(Sink::Table)
685                    .col(ColumnDef::new(Sink::SinkId).integer().primary_key())
686                    .col(ColumnDef::new(Sink::Name).string().not_null())
687                    .col(ColumnDef::new(Sink::Columns).rw_binary(manager).not_null())
688                    .col(ColumnDef::new(Sink::PlanPk).rw_binary(manager).not_null())
689                    .col(
690                        ColumnDef::new(Sink::DistributionKey)
691                            .json_binary()
692                            .not_null(),
693                    )
694                    .col(ColumnDef::new(Sink::DownstreamPk).json_binary().not_null())
695                    .col(ColumnDef::new(Sink::SinkType).string().not_null())
696                    .col(ColumnDef::new(Sink::Properties).json_binary().not_null())
697                    .col(
698                        ColumnDef::new(Sink::Definition)
699                            .rw_long_text(manager)
700                            .not_null(),
701                    )
702                    .col(ColumnDef::new(Sink::ConnectionId).integer())
703                    .col(ColumnDef::new(Sink::DbName).string().not_null())
704                    .col(ColumnDef::new(Sink::SinkFromName).string().not_null())
705                    .col(ColumnDef::new(Sink::SinkFormatDesc).rw_binary(manager))
706                    .col(ColumnDef::new(Sink::TargetTable).integer())
707                    .foreign_key(
708                        &mut ForeignKey::create()
709                            .name("FK_sink_object_id")
710                            .from(Sink::Table, Sink::SinkId)
711                            .to(Object::Table, Object::Oid)
712                            .on_delete(ForeignKeyAction::Cascade)
713                            .to_owned(),
714                    )
715                    .foreign_key(
716                        &mut ForeignKey::create()
717                            .name("FK_sink_connection_id")
718                            .from(Sink::Table, Sink::ConnectionId)
719                            .to(Connection::Table, Connection::ConnectionId)
720                            .to_owned(),
721                    )
722                    .foreign_key(
723                        &mut ForeignKey::create()
724                            .name("FK_sink_target_table_id")
725                            .from(Sink::Table, Sink::TargetTable)
726                            .to(Table::Table, Table::TableId)
727                            .to_owned(),
728                    )
729                    .to_owned(),
730            )
731            .await?;
732        manager
733            .create_table(
734                MigrationTable::create()
735                    .table(View::Table)
736                    .col(ColumnDef::new(View::ViewId).integer().primary_key())
737                    .col(ColumnDef::new(View::Name).string().not_null())
738                    .col(ColumnDef::new(View::Properties).json_binary().not_null())
739                    .col(
740                        ColumnDef::new(View::Definition)
741                            .rw_long_text(manager)
742                            .not_null(),
743                    )
744                    .col(ColumnDef::new(View::Columns).rw_binary(manager).not_null())
745                    .foreign_key(
746                        &mut ForeignKey::create()
747                            .name("FK_view_object_id")
748                            .from(View::Table, View::ViewId)
749                            .to(Object::Table, Object::Oid)
750                            .on_delete(ForeignKeyAction::Cascade)
751                            .to_owned(),
752                    )
753                    .to_owned(),
754            )
755            .await?;
756        manager
757            .create_table(
758                MigrationTable::create()
759                    .table(Index::Table)
760                    .col(ColumnDef::new(Index::IndexId).integer().primary_key())
761                    .col(ColumnDef::new(Index::Name).string().not_null())
762                    .col(ColumnDef::new(Index::IndexTableId).integer().not_null())
763                    .col(ColumnDef::new(Index::PrimaryTableId).integer().not_null())
764                    .col(
765                        ColumnDef::new(Index::IndexItems)
766                            .rw_binary(manager)
767                            .not_null(),
768                    )
769                    .col(ColumnDef::new(Index::IndexColumnsLen).integer().not_null())
770                    .foreign_key(
771                        &mut ForeignKey::create()
772                            .name("FK_index_object_id")
773                            .from(Index::Table, Index::IndexId)
774                            .to(Object::Table, Object::Oid)
775                            .on_delete(ForeignKeyAction::Cascade)
776                            .to_owned(),
777                    )
778                    .foreign_key(
779                        &mut ForeignKey::create()
780                            .name("FK_index_index_table_id")
781                            .from(Index::Table, Index::IndexTableId)
782                            .to(Table::Table, Table::TableId)
783                            .on_delete(ForeignKeyAction::Cascade)
784                            .to_owned(),
785                    )
786                    .foreign_key(
787                        &mut ForeignKey::create()
788                            .name("FK_index_primary_table_id")
789                            .from(Index::Table, Index::PrimaryTableId)
790                            .to(Table::Table, Table::TableId)
791                            .on_delete(ForeignKeyAction::Cascade)
792                            .to_owned(),
793                    )
794                    .to_owned(),
795            )
796            .await?;
797        manager
798            .create_table(
799                MigrationTable::create()
800                    .table(Function::Table)
801                    .col(ColumnDef::new(Function::FunctionId).integer().primary_key())
802                    .col(ColumnDef::new(Function::Name).string().not_null())
803                    .col(ColumnDef::new(Function::ArgNames).string().not_null())
804                    .col(
805                        ColumnDef::new(Function::ArgTypes)
806                            .rw_binary(manager)
807                            .not_null(),
808                    )
809                    .col(
810                        ColumnDef::new(Function::ReturnType)
811                            .rw_binary(manager)
812                            .not_null(),
813                    )
814                    .col(ColumnDef::new(Function::Language).string().not_null())
815                    .col(ColumnDef::new(Function::Link).string())
816                    .col(ColumnDef::new(Function::Identifier).string())
817                    .col(ColumnDef::new(Function::Body).rw_long_text(manager))
818                    // XXX: should this be binary type instead?
819                    .col(ColumnDef::new(Function::CompressedBinary).rw_long_text(manager))
820                    .col(ColumnDef::new(Function::Kind).string().not_null())
821                    .col(
822                        ColumnDef::new(Function::AlwaysRetryOnNetworkError)
823                            .boolean()
824                            .not_null(),
825                    )
826                    .foreign_key(
827                        &mut ForeignKey::create()
828                            .name("FK_function_object_id")
829                            .from(Function::Table, Function::FunctionId)
830                            .to(Object::Table, Object::Oid)
831                            .on_delete(ForeignKeyAction::Cascade)
832                            .to_owned(),
833                    )
834                    .to_owned(),
835            )
836            .await?;
837        manager
838            .create_table(
839                MigrationTable::create()
840                    .table(SystemParameter::Table)
841                    .col(
842                        ColumnDef::new(SystemParameter::Name)
843                            .string()
844                            .primary_key()
845                            .not_null(),
846                    )
847                    .col(ColumnDef::new(SystemParameter::Value).string().not_null())
848                    .col(
849                        ColumnDef::new(SystemParameter::IsMutable)
850                            .boolean()
851                            .not_null(),
852                    )
853                    .col(ColumnDef::new(SystemParameter::Description).string())
854                    .to_owned(),
855            )
856            .await?;
857        manager
858            .create_table(
859                crate::Table::create()
860                    .table(CatalogVersion::Table)
861                    .col(
862                        ColumnDef::new(CatalogVersion::Name)
863                            .string()
864                            .not_null()
865                            .primary_key(),
866                    )
867                    .col(
868                        ColumnDef::new(CatalogVersion::Version)
869                            .big_integer()
870                            .not_null(),
871                    )
872                    .to_owned(),
873            )
874            .await?;
875
876        // 3. create indexes.
877        manager
878            .create_index(
879                MigrationIndex::create()
880                    .table(Worker::Table)
881                    .name("idx_worker_host_port")
882                    .unique()
883                    .col(Worker::Host)
884                    .col(Worker::Port)
885                    .to_owned(),
886            )
887            .await?;
888        manager
889            .create_index(
890                MigrationIndex::create()
891                    .table(UserPrivilege::Table)
892                    .name("idx_user_privilege_item")
893                    .unique()
894                    .col(UserPrivilege::UserId)
895                    .col(UserPrivilege::Oid)
896                    .col(UserPrivilege::Action)
897                    .col(UserPrivilege::GrantedBy)
898                    .to_owned(),
899            )
900            .await?;
901
902        // 4. initialize data.
903        let insert_cluster_id = Query::insert()
904            .into_table(Cluster::Table)
905            .columns([Cluster::ClusterId])
906            .values_panic([uuid::Uuid::new_v4().into()])
907            .to_owned();
908        let insert_sys_users = Query::insert()
909            .into_table(User::Table)
910            .columns([
911                User::UserId,
912                User::Name,
913                User::IsSuper,
914                User::CanCreateUser,
915                User::CanCreateDb,
916                User::CanLogin,
917            ])
918            .values_panic([
919                1.into(),
920                "root".into(),
921                true.into(),
922                true.into(),
923                true.into(),
924                true.into(),
925            ])
926            .values_panic([
927                2.into(),
928                "postgres".into(),
929                true.into(),
930                true.into(),
931                true.into(),
932                true.into(),
933            ])
934            .values_panic([
935                3.into(),
936                "rwadmin".into(),
937                true.into(),
938                true.into(),
939                true.into(),
940                true.into(),
941            ])
942            .to_owned();
943
944        // Since the User table is newly created, we assume that the initial
945        // user id of `root` is 1, `postgres` is 2 and `rwadmin` is 3.
946        let insert_objects = Query::insert()
947            .into_table(Object::Table)
948            .columns([
949                Object::Oid,
950                Object::ObjType,
951                Object::OwnerId,
952                Object::DatabaseId,
953            ])
954            .values_panic([1.into(), "DATABASE".into(), 1.into(), None::<i32>.into()])
955            .values_panic([2.into(), "SCHEMA".into(), 1.into(), 1.into()]) // public
956            .values_panic([3.into(), "SCHEMA".into(), 1.into(), 1.into()]) // pg_catalog
957            .values_panic([4.into(), "SCHEMA".into(), 1.into(), 1.into()]) // information_schema
958            .values_panic([5.into(), "SCHEMA".into(), 1.into(), 1.into()]) // rw_catalog
959            .to_owned();
960
961        let insert_sys_database = Query::insert()
962            .into_table(Database::Table)
963            .columns([Database::DatabaseId, Database::Name])
964            .values_panic([1.into(), "dev".into()])
965            .to_owned();
966        let insert_sys_schemas = Query::insert()
967            .into_table(Schema::Table)
968            .columns([Schema::SchemaId, Schema::Name])
969            .values_panic([2.into(), "public".into()])
970            .values_panic([3.into(), "pg_catalog".into()])
971            .values_panic([4.into(), "information_schema".into()])
972            .values_panic([5.into(), "rw_catalog".into()])
973            .to_owned();
974
975        manager.exec_stmt(insert_cluster_id).await?;
976        manager.exec_stmt(insert_sys_users).await?;
977        manager.exec_stmt(insert_objects).await?;
978        manager.exec_stmt(insert_sys_database).await?;
979        manager.exec_stmt(insert_sys_schemas).await?;
980
981        // Rest auto increment offset
982        match manager.get_database_backend() {
983            DbBackend::MySql => {
984                manager
985                    .get_connection()
986                    .execute(Statement::from_string(
987                        DatabaseBackend::MySql,
988                        "ALTER TABLE object AUTO_INCREMENT = 6",
989                    ))
990                    .await?;
991                manager
992                    .get_connection()
993                    .execute(Statement::from_string(
994                        DatabaseBackend::MySql,
995                        "ALTER TABLE user AUTO_INCREMENT = 3",
996                    ))
997                    .await?;
998            }
999            DbBackend::Postgres => {
1000                manager
1001                    .get_connection()
1002                    .execute(Statement::from_string(
1003                        DatabaseBackend::Postgres,
1004                        "SELECT setval('object_oid_seq', 5)",
1005                    ))
1006                    .await?;
1007                manager
1008                    .get_connection()
1009                    .execute(Statement::from_string(
1010                        DatabaseBackend::Postgres,
1011                        "SELECT setval('user_user_id_seq', 3)",
1012                    ))
1013                    .await?;
1014            }
1015            DbBackend::Sqlite => {}
1016        }
1017
1018        Ok(())
1019    }
1020
1021    async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
1022        // drop tables cascade.
1023        drop_tables!(
1024            manager,
1025            Cluster,
1026            WorkerProperty,
1027            Worker,
1028            UserPrivilege,
1029            Database,
1030            Schema,
1031            StreamingJob,
1032            ActorDispatcher,
1033            Actor,
1034            Sink,
1035            Index,
1036            Table,
1037            Fragment,
1038            Source,
1039            Connection,
1040            View,
1041            Function,
1042            ObjectDependency,
1043            Object,
1044            User,
1045            SystemParameter,
1046            CatalogVersion
1047        );
1048        Ok(())
1049    }
1050}
1051
1052#[derive(DeriveIden)]
1053enum Cluster {
1054    Table,
1055    ClusterId,
1056    CreatedAt,
1057}
1058
1059#[derive(DeriveIden)]
1060enum Worker {
1061    Table,
1062    WorkerId,
1063    WorkerType,
1064    Host,
1065    Port,
1066    TransactionId,
1067    Status,
1068}
1069
1070#[derive(DeriveIden)]
1071enum WorkerProperty {
1072    Table,
1073    WorkerId,
1074    ParallelUnitIds,
1075    IsStreaming,
1076    IsServing,
1077    IsUnschedulable,
1078}
1079
1080#[derive(DeriveIden)]
1081enum User {
1082    Table,
1083    UserId,
1084    Name,
1085    IsSuper,
1086    CanCreateDb,
1087    CanCreateUser,
1088    CanLogin,
1089    AuthInfo,
1090}
1091
1092#[derive(DeriveIden)]
1093enum UserPrivilege {
1094    Table,
1095    Id,
1096    DependentId,
1097    UserId,
1098    Oid,
1099    GrantedBy,
1100    Action,
1101    WithGrantOption,
1102}
1103
1104#[derive(DeriveIden)]
1105enum Database {
1106    Table,
1107    DatabaseId,
1108    Name,
1109}
1110
1111#[derive(DeriveIden)]
1112enum Schema {
1113    Table,
1114    SchemaId,
1115    Name,
1116}
1117
1118#[derive(DeriveIden)]
1119enum Fragment {
1120    Table,
1121    FragmentId,
1122    JobId,
1123    FragmentTypeMask,
1124    DistributionType,
1125    StreamNode,
1126    VnodeMapping,
1127    StateTableIds,
1128    UpstreamFragmentId,
1129}
1130
1131#[derive(DeriveIden)]
1132enum Actor {
1133    Table,
1134    ActorId,
1135    FragmentId,
1136    Status,
1137    Splits,
1138    ParallelUnitId,
1139    WorkerId,
1140    UpstreamActorIds,
1141    VnodeBitmap,
1142    ExprContext,
1143}
1144
1145#[derive(DeriveIden)]
1146enum ActorDispatcher {
1147    Table,
1148    Id,
1149    ActorId,
1150    DispatcherType,
1151    DistKeyIndices,
1152    OutputIndices,
1153    HashMapping,
1154    DispatcherId,
1155    DownstreamActorIds,
1156    DownstreamTableName,
1157}
1158
1159#[derive(DeriveIden)]
1160enum StreamingJob {
1161    Table,
1162    JobId,
1163    JobStatus,
1164    Timezone,
1165    CreateType,
1166    Parallelism,
1167}
1168
1169#[derive(DeriveIden)]
1170#[allow(clippy::enum_variant_names)]
1171enum Table {
1172    Table,
1173    TableId,
1174    Name,
1175    OptionalAssociatedSourceId,
1176    TableType,
1177    BelongsToJobId,
1178    Columns,
1179    Pk,
1180    DistributionKey,
1181    StreamKey,
1182    AppendOnly,
1183    FragmentId,
1184    VnodeColIndex,
1185    RowIdIndex,
1186    ValueIndices,
1187    Definition,
1188    HandlePkConflictBehavior,
1189    ReadPrefixLenHint,
1190    WatermarkIndices,
1191    DistKeyInPk,
1192    DmlFragmentId,
1193    Cardinality,
1194    CleanedByWatermark,
1195    Description,
1196    Version,
1197    RetentionSeconds,
1198    IncomingSinks,
1199}
1200
1201#[derive(DeriveIden)]
1202enum Source {
1203    Table,
1204    SourceId,
1205    Name,
1206    RowIdIndex,
1207    Columns,
1208    PkColumnIds,
1209    WithProperties,
1210    Definition,
1211    SourceInfo,
1212    WatermarkDescs,
1213    OptionalAssociatedTableId,
1214    ConnectionId,
1215    Version,
1216}
1217
1218#[derive(DeriveIden)]
1219enum Sink {
1220    Table,
1221    SinkId,
1222    Name,
1223    Columns,
1224    PlanPk,
1225    DistributionKey,
1226    DownstreamPk,
1227    SinkType,
1228    Properties,
1229    Definition,
1230    ConnectionId,
1231    DbName,
1232    SinkFromName,
1233    SinkFormatDesc,
1234    TargetTable,
1235}
1236
1237#[derive(DeriveIden)]
1238enum Connection {
1239    Table,
1240    ConnectionId,
1241    Name,
1242    Info,
1243}
1244
1245#[derive(DeriveIden)]
1246enum View {
1247    Table,
1248    ViewId,
1249    Name,
1250    Properties,
1251    Definition,
1252    Columns,
1253}
1254
1255#[derive(DeriveIden)]
1256enum Index {
1257    Table,
1258    IndexId,
1259    Name,
1260    IndexTableId,
1261    PrimaryTableId,
1262    IndexItems,
1263    IndexColumnsLen,
1264}
1265
1266#[derive(DeriveIden)]
1267enum Function {
1268    Table,
1269    FunctionId,
1270    Name,
1271    ArgNames,
1272    ArgTypes,
1273    ReturnType,
1274    Language,
1275    Link,
1276    Identifier,
1277    Body,
1278    CompressedBinary,
1279    Kind,
1280    AlwaysRetryOnNetworkError,
1281}
1282
1283#[derive(DeriveIden)]
1284pub(crate) enum Object {
1285    Table,
1286    Oid,
1287    ObjType,
1288    OwnerId,
1289    SchemaId,
1290    DatabaseId,
1291    InitializedAt,
1292    CreatedAt,
1293    InitializedAtClusterVersion,
1294    CreatedAtClusterVersion,
1295}
1296
1297#[derive(DeriveIden)]
1298enum ObjectDependency {
1299    Table,
1300    Id,
1301    Oid,
1302    UsedBy,
1303}
1304
1305#[derive(DeriveIden)]
1306enum SystemParameter {
1307    Table,
1308    Name,
1309    Value,
1310    IsMutable,
1311    Description,
1312}
1313
1314#[derive(DeriveIden)]
1315enum CatalogVersion {
1316    Table,
1317    Name,
1318    Version,
1319}