1use sea_orm_migration::prelude::{Index as MigrationIndex, Table as MigrationTable, *};
2
3use crate::sea_orm::{DatabaseBackend, DbBackend, Statement};
4use crate::utils::ColumnDefExt;
5use crate::{assert_not_has_tables, drop_tables};
6
7#[derive(DeriveMigrationName)]
8pub struct Migration;
9
10#[async_trait::async_trait]
11impl MigrationTrait for Migration {
12 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
13 assert_not_has_tables!(
15 manager,
16 Cluster,
17 Worker,
18 WorkerProperty,
19 User,
20 UserPrivilege,
21 Database,
22 Schema,
23 StreamingJob,
24 Fragment,
25 Actor,
26 ActorDispatcher,
27 Table,
28 Source,
29 Sink,
30 Connection,
31 View,
32 Index,
33 Function,
34 Object,
35 ObjectDependency,
36 SystemParameter,
37 CatalogVersion
38 );
39
40 if manager.get_database_backend() == DbBackend::MySql {
43 manager
44 .get_connection()
45 .execute_unprepared("ALTER DATABASE CHARACTER SET utf8mb4 COLLATE utf8mb4_bin")
46 .await
47 .expect("failed to set database collate");
48 }
49
50 manager
52 .create_table(
53 MigrationTable::create()
54 .table(Cluster::Table)
55 .col(
56 ColumnDef::new(Cluster::ClusterId)
57 .uuid()
58 .not_null()
59 .primary_key(),
60 )
61 .col(
62 ColumnDef::new(Cluster::CreatedAt)
63 .date_time()
64 .default(Expr::current_timestamp())
65 .not_null(),
66 )
67 .to_owned(),
68 )
69 .await?;
70 manager
71 .create_table(
72 MigrationTable::create()
73 .table(Worker::Table)
74 .col(
75 ColumnDef::new(Worker::WorkerId)
76 .integer()
77 .not_null()
78 .auto_increment()
79 .primary_key(),
80 )
81 .col(ColumnDef::new(Worker::WorkerType).string().not_null())
82 .col(ColumnDef::new(Worker::Host).string().not_null())
83 .col(ColumnDef::new(Worker::Port).integer().not_null())
84 .col(ColumnDef::new(Worker::Status).string().not_null())
85 .col(ColumnDef::new(Worker::TransactionId).integer())
86 .to_owned(),
87 )
88 .await?;
89 manager
90 .create_table(
91 MigrationTable::create()
92 .table(WorkerProperty::Table)
93 .col(
94 ColumnDef::new(WorkerProperty::WorkerId)
95 .integer()
96 .primary_key(),
97 )
98 .col(
99 ColumnDef::new(WorkerProperty::ParallelUnitIds)
100 .json_binary()
101 .not_null(),
102 )
103 .col(
104 ColumnDef::new(WorkerProperty::IsStreaming)
105 .boolean()
106 .not_null(),
107 )
108 .col(
109 ColumnDef::new(WorkerProperty::IsServing)
110 .boolean()
111 .not_null(),
112 )
113 .col(
114 ColumnDef::new(WorkerProperty::IsUnschedulable)
115 .boolean()
116 .not_null(),
117 )
118 .foreign_key(
119 &mut ForeignKey::create()
120 .name("FK_worker_property_worker_id")
121 .from(WorkerProperty::Table, WorkerProperty::WorkerId)
122 .to(Worker::Table, Worker::WorkerId)
123 .on_delete(ForeignKeyAction::Cascade)
124 .to_owned(),
125 )
126 .to_owned(),
127 )
128 .await?;
129 manager
130 .create_table(
131 MigrationTable::create()
132 .table(User::Table)
133 .col(
134 ColumnDef::new(User::UserId)
135 .integer()
136 .primary_key()
137 .auto_increment(),
138 )
139 .col(ColumnDef::new(User::Name).string().unique_key().not_null())
140 .col(ColumnDef::new(User::IsSuper).boolean().not_null())
141 .col(ColumnDef::new(User::CanCreateDb).boolean().not_null())
142 .col(ColumnDef::new(User::CanCreateUser).boolean().not_null())
143 .col(ColumnDef::new(User::CanLogin).boolean().not_null())
144 .col(ColumnDef::new(User::AuthInfo).rw_binary(manager))
145 .to_owned(),
146 )
147 .await?;
148 manager
149 .create_table(
150 MigrationTable::create()
151 .table(Object::Table)
152 .col(
153 ColumnDef::new(Object::Oid)
154 .integer()
155 .auto_increment()
156 .primary_key(),
157 )
158 .col(ColumnDef::new(Object::ObjType).string().not_null())
159 .col(ColumnDef::new(Object::OwnerId).integer().not_null())
160 .col(ColumnDef::new(Object::SchemaId).integer())
161 .col(ColumnDef::new(Object::DatabaseId).integer())
162 .col(
163 ColumnDef::new(Object::InitializedAt)
164 .date_time()
165 .default(Expr::current_timestamp())
166 .not_null(),
167 )
168 .col(
169 ColumnDef::new(Object::CreatedAt)
170 .date_time()
171 .default(Expr::current_timestamp())
172 .not_null(),
173 )
174 .col(ColumnDef::new(Object::InitializedAtClusterVersion).string())
175 .col(ColumnDef::new(Object::CreatedAtClusterVersion).string())
176 .foreign_key(
177 &mut ForeignKey::create()
178 .name("FK_object_owner_id")
179 .from(Object::Table, Object::OwnerId)
180 .to(User::Table, User::UserId)
181 .on_delete(ForeignKeyAction::Cascade)
182 .to_owned(),
183 )
184 .foreign_key(
185 &mut ForeignKey::create()
186 .name("FK_object_database_id")
187 .from(Object::Table, Object::DatabaseId)
188 .to(Object::Table, Object::Oid)
189 .on_delete(ForeignKeyAction::Cascade)
190 .to_owned(),
191 )
192 .foreign_key(
193 &mut ForeignKey::create()
194 .name("FK_object_schema_id")
195 .from(Object::Table, Object::SchemaId)
196 .to(Object::Table, Object::Oid)
197 .on_delete(ForeignKeyAction::Cascade)
198 .to_owned(),
199 )
200 .to_owned(),
201 )
202 .await?;
203 manager
204 .create_table(
205 MigrationTable::create()
206 .table(UserPrivilege::Table)
207 .col(
208 ColumnDef::new(UserPrivilege::Id)
209 .integer()
210 .primary_key()
211 .auto_increment(),
212 )
213 .col(ColumnDef::new(UserPrivilege::DependentId).integer())
214 .col(ColumnDef::new(UserPrivilege::UserId).integer().not_null())
215 .col(ColumnDef::new(UserPrivilege::Oid).integer().not_null())
216 .col(
217 ColumnDef::new(UserPrivilege::GrantedBy)
218 .integer()
219 .not_null(),
220 )
221 .col(ColumnDef::new(UserPrivilege::Action).string().not_null())
222 .col(
223 ColumnDef::new(UserPrivilege::WithGrantOption)
224 .boolean()
225 .not_null(),
226 )
227 .foreign_key(
228 &mut ForeignKey::create()
229 .name("FK_user_privilege_dependent_id")
230 .from(UserPrivilege::Table, UserPrivilege::DependentId)
231 .to(UserPrivilege::Table, UserPrivilege::Id)
232 .on_delete(ForeignKeyAction::Cascade)
233 .to_owned(),
234 )
235 .foreign_key(
236 &mut ForeignKey::create()
237 .name("FK_user_privilege_user_id")
238 .from(UserPrivilege::Table, UserPrivilege::UserId)
239 .to(User::Table, User::UserId)
240 .on_delete(ForeignKeyAction::Cascade)
241 .to_owned(),
242 )
243 .foreign_key(
244 &mut ForeignKey::create()
245 .name("FK_user_privilege_granted_by")
246 .from(UserPrivilege::Table, UserPrivilege::GrantedBy)
247 .to(User::Table, User::UserId)
248 .to_owned(),
249 )
250 .foreign_key(
251 &mut ForeignKey::create()
252 .name("FK_user_privilege_oid")
253 .from(UserPrivilege::Table, UserPrivilege::Oid)
254 .to(Object::Table, Object::Oid)
255 .on_delete(ForeignKeyAction::Cascade)
256 .to_owned(),
257 )
258 .to_owned(),
259 )
260 .await?;
261 manager
262 .create_table(
263 MigrationTable::create()
264 .table(ObjectDependency::Table)
265 .col(
266 ColumnDef::new(ObjectDependency::Id)
267 .integer()
268 .auto_increment()
269 .primary_key(),
270 )
271 .col(ColumnDef::new(ObjectDependency::Oid).integer().not_null())
272 .col(
273 ColumnDef::new(ObjectDependency::UsedBy)
274 .integer()
275 .not_null(),
276 )
277 .foreign_key(
278 &mut ForeignKey::create()
279 .name("FK_object_dependency_oid")
280 .from(ObjectDependency::Table, ObjectDependency::Oid)
281 .to(Object::Table, Object::Oid)
282 .on_delete(ForeignKeyAction::Cascade)
283 .to_owned(),
284 )
285 .foreign_key(
286 &mut ForeignKey::create()
287 .name("FK_object_dependency_used_by")
288 .from(ObjectDependency::Table, ObjectDependency::UsedBy)
289 .to(Object::Table, Object::Oid)
290 .on_delete(ForeignKeyAction::Cascade)
291 .to_owned(),
292 )
293 .to_owned(),
294 )
295 .await?;
296 manager
297 .create_table(
298 MigrationTable::create()
299 .table(Database::Table)
300 .col(ColumnDef::new(Database::DatabaseId).integer().primary_key())
301 .col(
302 ColumnDef::new(Database::Name)
303 .string()
304 .unique_key()
305 .not_null(),
306 )
307 .foreign_key(
308 &mut ForeignKey::create()
309 .name("FK_database_object_id")
310 .from(Database::Table, Database::DatabaseId)
311 .to(Object::Table, Object::Oid)
312 .on_delete(ForeignKeyAction::Cascade)
313 .to_owned(),
314 )
315 .to_owned(),
316 )
317 .await?;
318 manager
319 .create_table(
320 MigrationTable::create()
321 .table(Schema::Table)
322 .col(ColumnDef::new(Schema::SchemaId).integer().primary_key())
323 .col(ColumnDef::new(Schema::Name).string().not_null())
324 .foreign_key(
325 &mut ForeignKey::create()
326 .name("FK_schema_object_id")
327 .from(Schema::Table, Schema::SchemaId)
328 .to(Object::Table, Object::Oid)
329 .on_delete(ForeignKeyAction::Cascade)
330 .to_owned(),
331 )
332 .to_owned(),
333 )
334 .await?;
335 manager
336 .create_table(
337 MigrationTable::create()
338 .table(StreamingJob::Table)
339 .col(ColumnDef::new(StreamingJob::JobId).integer().primary_key())
340 .col(ColumnDef::new(StreamingJob::JobStatus).string().not_null())
341 .col(ColumnDef::new(StreamingJob::CreateType).string().not_null())
342 .col(ColumnDef::new(StreamingJob::Timezone).string())
343 .col(
344 ColumnDef::new(StreamingJob::Parallelism)
345 .json_binary()
346 .not_null(),
347 )
348 .foreign_key(
349 &mut ForeignKey::create()
350 .name("FK_streaming_job_object_id")
351 .from(StreamingJob::Table, StreamingJob::JobId)
352 .to(Object::Table, Object::Oid)
353 .on_delete(ForeignKeyAction::Cascade)
354 .to_owned(),
355 )
356 .to_owned(),
357 )
358 .await?;
359 manager
360 .create_table(
361 MigrationTable::create()
362 .table(Fragment::Table)
363 .col(
364 ColumnDef::new(Fragment::FragmentId)
365 .integer()
366 .primary_key()
367 .auto_increment(),
368 )
369 .col(ColumnDef::new(Fragment::JobId).integer().not_null())
370 .col(
371 ColumnDef::new(Fragment::FragmentTypeMask)
372 .integer()
373 .not_null(),
374 )
375 .col(
376 ColumnDef::new(Fragment::DistributionType)
377 .string()
378 .not_null(),
379 )
380 .col(
381 ColumnDef::new(Fragment::StreamNode)
382 .rw_binary(manager)
383 .not_null(),
384 )
385 .col(
386 ColumnDef::new(Fragment::VnodeMapping)
387 .rw_binary(manager)
388 .not_null(),
389 )
390 .col(ColumnDef::new(Fragment::StateTableIds).json_binary())
391 .col(ColumnDef::new(Fragment::UpstreamFragmentId).json_binary())
392 .foreign_key(
393 &mut ForeignKey::create()
394 .name("FK_fragment_table_id")
395 .from(Fragment::Table, Fragment::JobId)
396 .to(Object::Table, Object::Oid)
397 .on_delete(ForeignKeyAction::Cascade)
398 .to_owned(),
399 )
400 .to_owned(),
401 )
402 .await?;
403 manager
404 .create_table(
405 MigrationTable::create()
406 .table(Actor::Table)
407 .col(
408 ColumnDef::new(Actor::ActorId)
409 .integer()
410 .primary_key()
411 .auto_increment(),
412 )
413 .col(ColumnDef::new(Actor::FragmentId).integer().not_null())
414 .col(ColumnDef::new(Actor::Status).string().not_null())
415 .col(ColumnDef::new(Actor::Splits).rw_binary(manager))
416 .col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null())
417 .col(ColumnDef::new(Actor::WorkerId).integer().not_null())
418 .col(ColumnDef::new(Actor::UpstreamActorIds).json_binary())
419 .col(ColumnDef::new(Actor::VnodeBitmap).rw_binary(manager))
420 .col(
421 ColumnDef::new(Actor::ExprContext)
422 .rw_binary(manager)
423 .not_null(),
424 )
425 .foreign_key(
426 &mut ForeignKey::create()
427 .name("FK_actor_fragment_id")
428 .from(Actor::Table, Actor::FragmentId)
429 .to(Fragment::Table, Fragment::FragmentId)
430 .on_delete(ForeignKeyAction::Cascade)
431 .to_owned(),
432 )
433 .to_owned(),
434 )
435 .await?;
436 manager
437 .create_table(
438 MigrationTable::create()
439 .table(ActorDispatcher::Table)
440 .col(
441 ColumnDef::new(ActorDispatcher::Id)
442 .integer()
443 .primary_key()
444 .auto_increment(),
445 )
446 .col(
447 ColumnDef::new(ActorDispatcher::ActorId)
448 .integer()
449 .not_null(),
450 )
451 .col(
452 ColumnDef::new(ActorDispatcher::DispatcherType)
453 .string()
454 .not_null(),
455 )
456 .col(
457 ColumnDef::new(ActorDispatcher::DistKeyIndices)
458 .json_binary()
459 .not_null(),
460 )
461 .col(
462 ColumnDef::new(ActorDispatcher::OutputIndices)
463 .json_binary()
464 .not_null(),
465 )
466 .col(ColumnDef::new(ActorDispatcher::HashMapping).rw_binary(manager))
467 .col(
468 ColumnDef::new(ActorDispatcher::DispatcherId)
469 .integer()
470 .not_null(),
471 )
472 .col(
473 ColumnDef::new(ActorDispatcher::DownstreamActorIds)
474 .json_binary()
475 .not_null(),
476 )
477 .col(ColumnDef::new(ActorDispatcher::DownstreamTableName).string())
478 .foreign_key(
479 &mut ForeignKey::create()
480 .name("FK_actor_dispatcher_actor_id")
481 .from(ActorDispatcher::Table, ActorDispatcher::ActorId)
482 .to(Actor::Table, Actor::ActorId)
483 .on_delete(ForeignKeyAction::Cascade)
484 .to_owned(),
485 )
486 .foreign_key(
487 &mut ForeignKey::create()
488 .name("FK_actor_dispatcher_dispatcher_id")
489 .from(ActorDispatcher::Table, ActorDispatcher::DispatcherId)
490 .to(Fragment::Table, Fragment::FragmentId)
491 .on_delete(ForeignKeyAction::Cascade)
492 .to_owned(),
493 )
494 .to_owned(),
495 )
496 .await?;
497 manager
498 .create_table(
499 MigrationTable::create()
500 .table(Connection::Table)
501 .col(
502 ColumnDef::new(Connection::ConnectionId)
503 .integer()
504 .primary_key(),
505 )
506 .col(ColumnDef::new(Connection::Name).string().not_null())
507 .col(
508 ColumnDef::new(Connection::Info)
509 .rw_binary(manager)
510 .not_null(),
511 )
512 .foreign_key(
513 &mut ForeignKey::create()
514 .name("FK_connection_object_id")
515 .from(Connection::Table, Connection::ConnectionId)
516 .to(Object::Table, Object::Oid)
517 .on_delete(ForeignKeyAction::Cascade)
518 .to_owned(),
519 )
520 .to_owned(),
521 )
522 .await?;
523 manager
524 .create_table(
525 MigrationTable::create()
526 .table(Source::Table)
527 .col(ColumnDef::new(Source::SourceId).integer().primary_key())
528 .col(ColumnDef::new(Source::Name).string().not_null())
529 .col(ColumnDef::new(Source::RowIdIndex).integer())
530 .col(
531 ColumnDef::new(Source::Columns)
532 .rw_binary(manager)
533 .not_null(),
534 )
535 .col(ColumnDef::new(Source::PkColumnIds).json_binary().not_null())
536 .col(
537 ColumnDef::new(Source::WithProperties)
538 .json_binary()
539 .not_null(),
540 )
541 .col(
542 ColumnDef::new(Source::Definition)
543 .rw_long_text(manager)
544 .not_null(),
545 )
546 .col(ColumnDef::new(Source::SourceInfo).rw_binary(manager))
547 .col(
548 ColumnDef::new(Source::WatermarkDescs)
549 .rw_binary(manager)
550 .not_null(),
551 )
552 .col(ColumnDef::new(Source::OptionalAssociatedTableId).integer())
553 .col(ColumnDef::new(Source::ConnectionId).integer())
554 .col(ColumnDef::new(Source::Version).big_integer().not_null())
555 .foreign_key(
556 &mut ForeignKey::create()
557 .name("FK_source_object_id")
558 .from(Source::Table, Source::SourceId)
559 .to(Object::Table, Object::Oid)
560 .on_delete(ForeignKeyAction::Cascade)
561 .to_owned(),
562 )
563 .foreign_key(
564 &mut ForeignKey::create()
565 .name("FK_source_connection_id")
566 .from(Source::Table, Source::ConnectionId)
567 .to(Connection::Table, Connection::ConnectionId)
568 .to_owned(),
569 )
570 .foreign_key(
571 &mut ForeignKey::create()
572 .name("FK_source_optional_associated_table_id")
573 .from(Source::Table, Source::OptionalAssociatedTableId)
574 .to(Object::Table, Object::Oid)
575 .on_delete(ForeignKeyAction::Cascade)
576 .to_owned(),
577 )
578 .to_owned(),
579 )
580 .await?;
581 manager
582 .create_table(
583 MigrationTable::create()
584 .table(Table::Table)
585 .col(ColumnDef::new(Table::TableId).integer().primary_key())
586 .col(ColumnDef::new(Table::Name).string().not_null())
587 .col(ColumnDef::new(Table::OptionalAssociatedSourceId).integer())
588 .col(ColumnDef::new(Table::TableType).string().not_null())
589 .col(ColumnDef::new(Table::BelongsToJobId).integer())
590 .col(ColumnDef::new(Table::Columns).rw_binary(manager).not_null())
591 .col(ColumnDef::new(Table::Pk).rw_binary(manager).not_null())
592 .col(
593 ColumnDef::new(Table::DistributionKey)
594 .json_binary()
595 .not_null(),
596 )
597 .col(ColumnDef::new(Table::StreamKey).json_binary().not_null())
598 .col(ColumnDef::new(Table::AppendOnly).boolean().not_null())
599 .col(ColumnDef::new(Table::FragmentId).integer())
600 .col(ColumnDef::new(Table::VnodeColIndex).integer())
601 .col(ColumnDef::new(Table::RowIdIndex).integer())
602 .col(ColumnDef::new(Table::ValueIndices).json_binary().not_null())
603 .col(
604 ColumnDef::new(Table::Definition)
605 .rw_long_text(manager)
606 .not_null(),
607 )
608 .col(
609 ColumnDef::new(Table::HandlePkConflictBehavior)
610 .string()
611 .not_null(),
612 )
613 .col(
614 ColumnDef::new(Table::ReadPrefixLenHint)
615 .integer()
616 .not_null(),
617 )
618 .col(
619 ColumnDef::new(Table::WatermarkIndices)
620 .json_binary()
621 .not_null(),
622 )
623 .col(ColumnDef::new(Table::DistKeyInPk).json_binary().not_null())
624 .col(ColumnDef::new(Table::DmlFragmentId).integer())
625 .col(ColumnDef::new(Table::Cardinality).rw_binary(manager))
626 .col(
627 ColumnDef::new(Table::CleanedByWatermark)
628 .boolean()
629 .not_null(),
630 )
631 .col(ColumnDef::new(Table::Description).string())
632 .col(ColumnDef::new(Table::Version).rw_binary(manager))
633 .col(ColumnDef::new(Table::RetentionSeconds).integer())
634 .col(
635 ColumnDef::new(Table::IncomingSinks)
636 .json_binary()
637 .not_null(),
638 )
639 .foreign_key(
640 &mut ForeignKey::create()
641 .name("FK_table_object_id")
642 .from(Table::Table, Table::TableId)
643 .to(Object::Table, Object::Oid)
644 .on_delete(ForeignKeyAction::Cascade)
645 .to_owned(),
646 )
647 .foreign_key(
648 &mut ForeignKey::create()
649 .name("FK_table_belongs_to_job_id")
650 .from(Table::Table, Table::BelongsToJobId)
651 .to(Object::Table, Object::Oid)
652 .on_delete(ForeignKeyAction::Cascade)
653 .to_owned(),
654 )
655 .foreign_key(
656 &mut ForeignKey::create()
657 .name("FK_table_fragment_id")
658 .from(Table::Table, Table::FragmentId)
659 .to(Fragment::Table, Fragment::FragmentId)
660 .on_delete(ForeignKeyAction::Cascade)
661 .to_owned(),
662 )
663 .foreign_key(
664 &mut ForeignKey::create()
665 .name("FK_table_dml_fragment_id")
666 .from(Table::Table, Table::DmlFragmentId)
667 .to(Fragment::Table, Fragment::FragmentId)
668 .to_owned(),
669 )
670 .foreign_key(
671 &mut ForeignKey::create()
672 .name("FK_table_optional_associated_source_id")
673 .from(Table::Table, Table::OptionalAssociatedSourceId)
674 .to(Object::Table, Object::Oid)
675 .on_delete(ForeignKeyAction::Cascade)
676 .to_owned(),
677 )
678 .to_owned(),
679 )
680 .await?;
681 manager
682 .create_table(
683 MigrationTable::create()
684 .table(Sink::Table)
685 .col(ColumnDef::new(Sink::SinkId).integer().primary_key())
686 .col(ColumnDef::new(Sink::Name).string().not_null())
687 .col(ColumnDef::new(Sink::Columns).rw_binary(manager).not_null())
688 .col(ColumnDef::new(Sink::PlanPk).rw_binary(manager).not_null())
689 .col(
690 ColumnDef::new(Sink::DistributionKey)
691 .json_binary()
692 .not_null(),
693 )
694 .col(ColumnDef::new(Sink::DownstreamPk).json_binary().not_null())
695 .col(ColumnDef::new(Sink::SinkType).string().not_null())
696 .col(ColumnDef::new(Sink::Properties).json_binary().not_null())
697 .col(
698 ColumnDef::new(Sink::Definition)
699 .rw_long_text(manager)
700 .not_null(),
701 )
702 .col(ColumnDef::new(Sink::ConnectionId).integer())
703 .col(ColumnDef::new(Sink::DbName).string().not_null())
704 .col(ColumnDef::new(Sink::SinkFromName).string().not_null())
705 .col(ColumnDef::new(Sink::SinkFormatDesc).rw_binary(manager))
706 .col(ColumnDef::new(Sink::TargetTable).integer())
707 .foreign_key(
708 &mut ForeignKey::create()
709 .name("FK_sink_object_id")
710 .from(Sink::Table, Sink::SinkId)
711 .to(Object::Table, Object::Oid)
712 .on_delete(ForeignKeyAction::Cascade)
713 .to_owned(),
714 )
715 .foreign_key(
716 &mut ForeignKey::create()
717 .name("FK_sink_connection_id")
718 .from(Sink::Table, Sink::ConnectionId)
719 .to(Connection::Table, Connection::ConnectionId)
720 .to_owned(),
721 )
722 .foreign_key(
723 &mut ForeignKey::create()
724 .name("FK_sink_target_table_id")
725 .from(Sink::Table, Sink::TargetTable)
726 .to(Table::Table, Table::TableId)
727 .to_owned(),
728 )
729 .to_owned(),
730 )
731 .await?;
732 manager
733 .create_table(
734 MigrationTable::create()
735 .table(View::Table)
736 .col(ColumnDef::new(View::ViewId).integer().primary_key())
737 .col(ColumnDef::new(View::Name).string().not_null())
738 .col(ColumnDef::new(View::Properties).json_binary().not_null())
739 .col(
740 ColumnDef::new(View::Definition)
741 .rw_long_text(manager)
742 .not_null(),
743 )
744 .col(ColumnDef::new(View::Columns).rw_binary(manager).not_null())
745 .foreign_key(
746 &mut ForeignKey::create()
747 .name("FK_view_object_id")
748 .from(View::Table, View::ViewId)
749 .to(Object::Table, Object::Oid)
750 .on_delete(ForeignKeyAction::Cascade)
751 .to_owned(),
752 )
753 .to_owned(),
754 )
755 .await?;
756 manager
757 .create_table(
758 MigrationTable::create()
759 .table(Index::Table)
760 .col(ColumnDef::new(Index::IndexId).integer().primary_key())
761 .col(ColumnDef::new(Index::Name).string().not_null())
762 .col(ColumnDef::new(Index::IndexTableId).integer().not_null())
763 .col(ColumnDef::new(Index::PrimaryTableId).integer().not_null())
764 .col(
765 ColumnDef::new(Index::IndexItems)
766 .rw_binary(manager)
767 .not_null(),
768 )
769 .col(ColumnDef::new(Index::IndexColumnsLen).integer().not_null())
770 .foreign_key(
771 &mut ForeignKey::create()
772 .name("FK_index_object_id")
773 .from(Index::Table, Index::IndexId)
774 .to(Object::Table, Object::Oid)
775 .on_delete(ForeignKeyAction::Cascade)
776 .to_owned(),
777 )
778 .foreign_key(
779 &mut ForeignKey::create()
780 .name("FK_index_index_table_id")
781 .from(Index::Table, Index::IndexTableId)
782 .to(Table::Table, Table::TableId)
783 .on_delete(ForeignKeyAction::Cascade)
784 .to_owned(),
785 )
786 .foreign_key(
787 &mut ForeignKey::create()
788 .name("FK_index_primary_table_id")
789 .from(Index::Table, Index::PrimaryTableId)
790 .to(Table::Table, Table::TableId)
791 .on_delete(ForeignKeyAction::Cascade)
792 .to_owned(),
793 )
794 .to_owned(),
795 )
796 .await?;
797 manager
798 .create_table(
799 MigrationTable::create()
800 .table(Function::Table)
801 .col(ColumnDef::new(Function::FunctionId).integer().primary_key())
802 .col(ColumnDef::new(Function::Name).string().not_null())
803 .col(ColumnDef::new(Function::ArgNames).string().not_null())
804 .col(
805 ColumnDef::new(Function::ArgTypes)
806 .rw_binary(manager)
807 .not_null(),
808 )
809 .col(
810 ColumnDef::new(Function::ReturnType)
811 .rw_binary(manager)
812 .not_null(),
813 )
814 .col(ColumnDef::new(Function::Language).string().not_null())
815 .col(ColumnDef::new(Function::Link).string())
816 .col(ColumnDef::new(Function::Identifier).string())
817 .col(ColumnDef::new(Function::Body).rw_long_text(manager))
818 .col(ColumnDef::new(Function::CompressedBinary).rw_long_text(manager))
820 .col(ColumnDef::new(Function::Kind).string().not_null())
821 .col(
822 ColumnDef::new(Function::AlwaysRetryOnNetworkError)
823 .boolean()
824 .not_null(),
825 )
826 .foreign_key(
827 &mut ForeignKey::create()
828 .name("FK_function_object_id")
829 .from(Function::Table, Function::FunctionId)
830 .to(Object::Table, Object::Oid)
831 .on_delete(ForeignKeyAction::Cascade)
832 .to_owned(),
833 )
834 .to_owned(),
835 )
836 .await?;
837 manager
838 .create_table(
839 MigrationTable::create()
840 .table(SystemParameter::Table)
841 .col(
842 ColumnDef::new(SystemParameter::Name)
843 .string()
844 .primary_key()
845 .not_null(),
846 )
847 .col(ColumnDef::new(SystemParameter::Value).string().not_null())
848 .col(
849 ColumnDef::new(SystemParameter::IsMutable)
850 .boolean()
851 .not_null(),
852 )
853 .col(ColumnDef::new(SystemParameter::Description).string())
854 .to_owned(),
855 )
856 .await?;
857 manager
858 .create_table(
859 crate::Table::create()
860 .table(CatalogVersion::Table)
861 .col(
862 ColumnDef::new(CatalogVersion::Name)
863 .string()
864 .not_null()
865 .primary_key(),
866 )
867 .col(
868 ColumnDef::new(CatalogVersion::Version)
869 .big_integer()
870 .not_null(),
871 )
872 .to_owned(),
873 )
874 .await?;
875
876 manager
878 .create_index(
879 MigrationIndex::create()
880 .table(Worker::Table)
881 .name("idx_worker_host_port")
882 .unique()
883 .col(Worker::Host)
884 .col(Worker::Port)
885 .to_owned(),
886 )
887 .await?;
888 manager
889 .create_index(
890 MigrationIndex::create()
891 .table(UserPrivilege::Table)
892 .name("idx_user_privilege_item")
893 .unique()
894 .col(UserPrivilege::UserId)
895 .col(UserPrivilege::Oid)
896 .col(UserPrivilege::Action)
897 .col(UserPrivilege::GrantedBy)
898 .to_owned(),
899 )
900 .await?;
901
902 let insert_cluster_id = Query::insert()
904 .into_table(Cluster::Table)
905 .columns([Cluster::ClusterId])
906 .values_panic([uuid::Uuid::new_v4().into()])
907 .to_owned();
908 let insert_sys_users = Query::insert()
909 .into_table(User::Table)
910 .columns([
911 User::UserId,
912 User::Name,
913 User::IsSuper,
914 User::CanCreateUser,
915 User::CanCreateDb,
916 User::CanLogin,
917 ])
918 .values_panic([
919 1.into(),
920 "root".into(),
921 true.into(),
922 true.into(),
923 true.into(),
924 true.into(),
925 ])
926 .values_panic([
927 2.into(),
928 "postgres".into(),
929 true.into(),
930 true.into(),
931 true.into(),
932 true.into(),
933 ])
934 .values_panic([
935 3.into(),
936 "rwadmin".into(),
937 true.into(),
938 true.into(),
939 true.into(),
940 true.into(),
941 ])
942 .to_owned();
943
944 let insert_objects = Query::insert()
947 .into_table(Object::Table)
948 .columns([
949 Object::Oid,
950 Object::ObjType,
951 Object::OwnerId,
952 Object::DatabaseId,
953 ])
954 .values_panic([1.into(), "DATABASE".into(), 1.into(), None::<i32>.into()])
955 .values_panic([2.into(), "SCHEMA".into(), 1.into(), 1.into()]) .values_panic([3.into(), "SCHEMA".into(), 1.into(), 1.into()]) .values_panic([4.into(), "SCHEMA".into(), 1.into(), 1.into()]) .values_panic([5.into(), "SCHEMA".into(), 1.into(), 1.into()]) .to_owned();
960
961 let insert_sys_database = Query::insert()
962 .into_table(Database::Table)
963 .columns([Database::DatabaseId, Database::Name])
964 .values_panic([1.into(), "dev".into()])
965 .to_owned();
966 let insert_sys_schemas = Query::insert()
967 .into_table(Schema::Table)
968 .columns([Schema::SchemaId, Schema::Name])
969 .values_panic([2.into(), "public".into()])
970 .values_panic([3.into(), "pg_catalog".into()])
971 .values_panic([4.into(), "information_schema".into()])
972 .values_panic([5.into(), "rw_catalog".into()])
973 .to_owned();
974
975 manager.exec_stmt(insert_cluster_id).await?;
976 manager.exec_stmt(insert_sys_users).await?;
977 manager.exec_stmt(insert_objects).await?;
978 manager.exec_stmt(insert_sys_database).await?;
979 manager.exec_stmt(insert_sys_schemas).await?;
980
981 match manager.get_database_backend() {
983 DbBackend::MySql => {
984 manager
985 .get_connection()
986 .execute(Statement::from_string(
987 DatabaseBackend::MySql,
988 "ALTER TABLE object AUTO_INCREMENT = 6",
989 ))
990 .await?;
991 manager
992 .get_connection()
993 .execute(Statement::from_string(
994 DatabaseBackend::MySql,
995 "ALTER TABLE user AUTO_INCREMENT = 3",
996 ))
997 .await?;
998 }
999 DbBackend::Postgres => {
1000 manager
1001 .get_connection()
1002 .execute(Statement::from_string(
1003 DatabaseBackend::Postgres,
1004 "SELECT setval('object_oid_seq', 5)",
1005 ))
1006 .await?;
1007 manager
1008 .get_connection()
1009 .execute(Statement::from_string(
1010 DatabaseBackend::Postgres,
1011 "SELECT setval('user_user_id_seq', 3)",
1012 ))
1013 .await?;
1014 }
1015 DbBackend::Sqlite => {}
1016 }
1017
1018 Ok(())
1019 }
1020
1021 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
1022 drop_tables!(
1024 manager,
1025 Cluster,
1026 WorkerProperty,
1027 Worker,
1028 UserPrivilege,
1029 Database,
1030 Schema,
1031 StreamingJob,
1032 ActorDispatcher,
1033 Actor,
1034 Sink,
1035 Index,
1036 Table,
1037 Fragment,
1038 Source,
1039 Connection,
1040 View,
1041 Function,
1042 ObjectDependency,
1043 Object,
1044 User,
1045 SystemParameter,
1046 CatalogVersion
1047 );
1048 Ok(())
1049 }
1050}
1051
1052#[derive(DeriveIden)]
1053enum Cluster {
1054 Table,
1055 ClusterId,
1056 CreatedAt,
1057}
1058
1059#[derive(DeriveIden)]
1060enum Worker {
1061 Table,
1062 WorkerId,
1063 WorkerType,
1064 Host,
1065 Port,
1066 TransactionId,
1067 Status,
1068}
1069
1070#[derive(DeriveIden)]
1071enum WorkerProperty {
1072 Table,
1073 WorkerId,
1074 ParallelUnitIds,
1075 IsStreaming,
1076 IsServing,
1077 IsUnschedulable,
1078}
1079
1080#[derive(DeriveIden)]
1081enum User {
1082 Table,
1083 UserId,
1084 Name,
1085 IsSuper,
1086 CanCreateDb,
1087 CanCreateUser,
1088 CanLogin,
1089 AuthInfo,
1090}
1091
1092#[derive(DeriveIden)]
1093enum UserPrivilege {
1094 Table,
1095 Id,
1096 DependentId,
1097 UserId,
1098 Oid,
1099 GrantedBy,
1100 Action,
1101 WithGrantOption,
1102}
1103
1104#[derive(DeriveIden)]
1105enum Database {
1106 Table,
1107 DatabaseId,
1108 Name,
1109}
1110
1111#[derive(DeriveIden)]
1112enum Schema {
1113 Table,
1114 SchemaId,
1115 Name,
1116}
1117
1118#[derive(DeriveIden)]
1119enum Fragment {
1120 Table,
1121 FragmentId,
1122 JobId,
1123 FragmentTypeMask,
1124 DistributionType,
1125 StreamNode,
1126 VnodeMapping,
1127 StateTableIds,
1128 UpstreamFragmentId,
1129}
1130
1131#[derive(DeriveIden)]
1132enum Actor {
1133 Table,
1134 ActorId,
1135 FragmentId,
1136 Status,
1137 Splits,
1138 ParallelUnitId,
1139 WorkerId,
1140 UpstreamActorIds,
1141 VnodeBitmap,
1142 ExprContext,
1143}
1144
1145#[derive(DeriveIden)]
1146enum ActorDispatcher {
1147 Table,
1148 Id,
1149 ActorId,
1150 DispatcherType,
1151 DistKeyIndices,
1152 OutputIndices,
1153 HashMapping,
1154 DispatcherId,
1155 DownstreamActorIds,
1156 DownstreamTableName,
1157}
1158
1159#[derive(DeriveIden)]
1160enum StreamingJob {
1161 Table,
1162 JobId,
1163 JobStatus,
1164 Timezone,
1165 CreateType,
1166 Parallelism,
1167}
1168
1169#[derive(DeriveIden)]
1170#[allow(clippy::enum_variant_names)]
1171enum Table {
1172 Table,
1173 TableId,
1174 Name,
1175 OptionalAssociatedSourceId,
1176 TableType,
1177 BelongsToJobId,
1178 Columns,
1179 Pk,
1180 DistributionKey,
1181 StreamKey,
1182 AppendOnly,
1183 FragmentId,
1184 VnodeColIndex,
1185 RowIdIndex,
1186 ValueIndices,
1187 Definition,
1188 HandlePkConflictBehavior,
1189 ReadPrefixLenHint,
1190 WatermarkIndices,
1191 DistKeyInPk,
1192 DmlFragmentId,
1193 Cardinality,
1194 CleanedByWatermark,
1195 Description,
1196 Version,
1197 RetentionSeconds,
1198 IncomingSinks,
1199}
1200
1201#[derive(DeriveIden)]
1202enum Source {
1203 Table,
1204 SourceId,
1205 Name,
1206 RowIdIndex,
1207 Columns,
1208 PkColumnIds,
1209 WithProperties,
1210 Definition,
1211 SourceInfo,
1212 WatermarkDescs,
1213 OptionalAssociatedTableId,
1214 ConnectionId,
1215 Version,
1216}
1217
1218#[derive(DeriveIden)]
1219enum Sink {
1220 Table,
1221 SinkId,
1222 Name,
1223 Columns,
1224 PlanPk,
1225 DistributionKey,
1226 DownstreamPk,
1227 SinkType,
1228 Properties,
1229 Definition,
1230 ConnectionId,
1231 DbName,
1232 SinkFromName,
1233 SinkFormatDesc,
1234 TargetTable,
1235}
1236
1237#[derive(DeriveIden)]
1238enum Connection {
1239 Table,
1240 ConnectionId,
1241 Name,
1242 Info,
1243}
1244
1245#[derive(DeriveIden)]
1246enum View {
1247 Table,
1248 ViewId,
1249 Name,
1250 Properties,
1251 Definition,
1252 Columns,
1253}
1254
1255#[derive(DeriveIden)]
1256enum Index {
1257 Table,
1258 IndexId,
1259 Name,
1260 IndexTableId,
1261 PrimaryTableId,
1262 IndexItems,
1263 IndexColumnsLen,
1264}
1265
1266#[derive(DeriveIden)]
1267enum Function {
1268 Table,
1269 FunctionId,
1270 Name,
1271 ArgNames,
1272 ArgTypes,
1273 ReturnType,
1274 Language,
1275 Link,
1276 Identifier,
1277 Body,
1278 CompressedBinary,
1279 Kind,
1280 AlwaysRetryOnNetworkError,
1281}
1282
1283#[derive(DeriveIden)]
1284pub(crate) enum Object {
1285 Table,
1286 Oid,
1287 ObjType,
1288 OwnerId,
1289 SchemaId,
1290 DatabaseId,
1291 InitializedAt,
1292 CreatedAt,
1293 InitializedAtClusterVersion,
1294 CreatedAtClusterVersion,
1295}
1296
1297#[derive(DeriveIden)]
1298enum ObjectDependency {
1299 Table,
1300 Id,
1301 Oid,
1302 UsedBy,
1303}
1304
1305#[derive(DeriveIden)]
1306enum SystemParameter {
1307 Table,
1308 Name,
1309 Value,
1310 IsMutable,
1311 Description,
1312}
1313
1314#[derive(DeriveIden)]
1315enum CatalogVersion {
1316 Table,
1317 Name,
1318 Version,
1319}