1use sea_orm_migration::prelude::{Index as MigrationIndex, Table as MigrationTable, *};
2
3use crate::sea_orm::{DatabaseBackend, DbBackend, Statement};
4use crate::utils::ColumnDefExt;
5use crate::{assert_not_has_tables, drop_tables};
6
7#[derive(DeriveMigrationName)]
8pub struct Migration;
9
10#[async_trait::async_trait]
11impl MigrationTrait for Migration {
12 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
13 assert_not_has_tables!(
15 manager,
16 Cluster,
17 Worker,
18 WorkerProperty,
19 User,
20 UserPrivilege,
21 Database,
22 Schema,
23 StreamingJob,
24 Fragment,
25 Actor,
26 ActorDispatcher,
27 Table,
28 Source,
29 Sink,
30 Connection,
31 View,
32 Index,
33 Function,
34 Object,
35 ObjectDependency,
36 SystemParameter,
37 CatalogVersion
38 );
39
40 if manager.get_database_backend() == DbBackend::MySql {
43 manager
44 .get_connection()
45 .execute_unprepared("ALTER DATABASE CHARACTER SET utf8mb4 COLLATE utf8mb4_bin")
46 .await
47 .expect("failed to set database collate");
48 }
49
50 manager
52 .create_table(
53 MigrationTable::create()
54 .table(Cluster::Table)
55 .col(
56 ColumnDef::new(Cluster::ClusterId)
57 .uuid()
58 .not_null()
59 .primary_key(),
60 )
61 .col(
62 ColumnDef::new(Cluster::CreatedAt)
63 .date_time()
64 .default(Expr::current_timestamp())
65 .not_null(),
66 )
67 .to_owned(),
68 )
69 .await?;
70 manager
71 .create_table(
72 MigrationTable::create()
73 .table(Worker::Table)
74 .col(
75 ColumnDef::new(Worker::WorkerId)
76 .integer()
77 .not_null()
78 .auto_increment()
79 .primary_key(),
80 )
81 .col(ColumnDef::new(Worker::WorkerType).string().not_null())
82 .col(ColumnDef::new(Worker::Host).string().not_null())
83 .col(ColumnDef::new(Worker::Port).integer().not_null())
84 .col(ColumnDef::new(Worker::Status).string().not_null())
85 .col(ColumnDef::new(Worker::TransactionId).integer())
86 .to_owned(),
87 )
88 .await?;
89 manager
90 .create_table(
91 MigrationTable::create()
92 .table(WorkerProperty::Table)
93 .col(
94 ColumnDef::new(WorkerProperty::WorkerId)
95 .integer()
96 .primary_key(),
97 )
98 .col(
99 ColumnDef::new(WorkerProperty::ParallelUnitIds)
100 .json_binary()
101 .not_null(),
102 )
103 .col(
104 ColumnDef::new(WorkerProperty::IsStreaming)
105 .boolean()
106 .not_null(),
107 )
108 .col(
109 ColumnDef::new(WorkerProperty::IsServing)
110 .boolean()
111 .not_null(),
112 )
113 .col(
114 ColumnDef::new(WorkerProperty::IsUnschedulable)
115 .boolean()
116 .not_null(),
117 )
118 .foreign_key(
119 &mut ForeignKey::create()
120 .name("FK_worker_property_worker_id")
121 .from(WorkerProperty::Table, WorkerProperty::WorkerId)
122 .to(Worker::Table, Worker::WorkerId)
123 .on_delete(ForeignKeyAction::Cascade)
124 .to_owned(),
125 )
126 .to_owned(),
127 )
128 .await?;
129 manager
130 .create_table(
131 MigrationTable::create()
132 .table(User::Table)
133 .col(
134 ColumnDef::new(User::UserId)
135 .integer()
136 .primary_key()
137 .auto_increment(),
138 )
139 .col(ColumnDef::new(User::Name).string().unique_key().not_null())
140 .col(ColumnDef::new(User::IsSuper).boolean().not_null())
141 .col(ColumnDef::new(User::CanCreateDb).boolean().not_null())
142 .col(ColumnDef::new(User::CanCreateUser).boolean().not_null())
143 .col(ColumnDef::new(User::CanLogin).boolean().not_null())
144 .col(ColumnDef::new(User::AuthInfo).rw_binary(manager))
145 .to_owned(),
146 )
147 .await?;
148 manager
149 .create_table(
150 MigrationTable::create()
151 .table(Object::Table)
152 .col(
153 ColumnDef::new(Object::Oid)
154 .integer()
155 .auto_increment()
156 .primary_key(),
157 )
158 .col(ColumnDef::new(Object::ObjType).string().not_null())
159 .col(ColumnDef::new(Object::OwnerId).integer().not_null())
160 .col(ColumnDef::new(Object::SchemaId).integer())
161 .col(ColumnDef::new(Object::DatabaseId).integer())
162 .col(
163 ColumnDef::new(Object::InitializedAt)
164 .date_time()
165 .default(Expr::current_timestamp())
166 .not_null(),
167 )
168 .col(
169 ColumnDef::new(Object::CreatedAt)
170 .date_time()
171 .default(Expr::current_timestamp())
172 .not_null(),
173 )
174 .col(ColumnDef::new(Object::InitializedAtClusterVersion).string())
175 .col(ColumnDef::new(Object::CreatedAtClusterVersion).string())
176 .foreign_key(
177 &mut ForeignKey::create()
178 .name("FK_object_owner_id")
179 .from(Object::Table, Object::OwnerId)
180 .to(User::Table, User::UserId)
181 .on_delete(ForeignKeyAction::Cascade)
182 .to_owned(),
183 )
184 .foreign_key(
185 &mut ForeignKey::create()
186 .name("FK_object_database_id")
187 .from(Object::Table, Object::DatabaseId)
188 .to(Object::Table, Object::Oid)
189 .on_delete(ForeignKeyAction::Cascade)
190 .to_owned(),
191 )
192 .foreign_key(
193 &mut ForeignKey::create()
194 .name("FK_object_schema_id")
195 .from(Object::Table, Object::SchemaId)
196 .to(Object::Table, Object::Oid)
197 .on_delete(ForeignKeyAction::Cascade)
198 .to_owned(),
199 )
200 .to_owned(),
201 )
202 .await?;
203 manager
204 .create_table(
205 MigrationTable::create()
206 .table(UserPrivilege::Table)
207 .col(
208 ColumnDef::new(UserPrivilege::Id)
209 .integer()
210 .primary_key()
211 .auto_increment(),
212 )
213 .col(ColumnDef::new(UserPrivilege::DependentId).integer())
214 .col(ColumnDef::new(UserPrivilege::UserId).integer().not_null())
215 .col(ColumnDef::new(UserPrivilege::Oid).integer().not_null())
216 .col(
217 ColumnDef::new(UserPrivilege::GrantedBy)
218 .integer()
219 .not_null(),
220 )
221 .col(ColumnDef::new(UserPrivilege::Action).string().not_null())
222 .col(
223 ColumnDef::new(UserPrivilege::WithGrantOption)
224 .boolean()
225 .not_null(),
226 )
227 .foreign_key(
228 &mut ForeignKey::create()
229 .name("FK_user_privilege_dependent_id")
230 .from(UserPrivilege::Table, UserPrivilege::DependentId)
231 .to(UserPrivilege::Table, UserPrivilege::Id)
232 .on_delete(ForeignKeyAction::Cascade)
233 .to_owned(),
234 )
235 .foreign_key(
236 &mut ForeignKey::create()
237 .name("FK_user_privilege_user_id")
238 .from(UserPrivilege::Table, UserPrivilege::UserId)
239 .to(User::Table, User::UserId)
240 .on_delete(ForeignKeyAction::Cascade)
241 .to_owned(),
242 )
243 .foreign_key(
244 &mut ForeignKey::create()
245 .name("FK_user_privilege_granted_by")
246 .from(UserPrivilege::Table, UserPrivilege::GrantedBy)
247 .to(User::Table, User::UserId)
248 .to_owned(),
249 )
250 .foreign_key(
251 &mut ForeignKey::create()
252 .name("FK_user_privilege_oid")
253 .from(UserPrivilege::Table, UserPrivilege::Oid)
254 .to(Object::Table, Object::Oid)
255 .on_delete(ForeignKeyAction::Cascade)
256 .to_owned(),
257 )
258 .to_owned(),
259 )
260 .await?;
261 manager
262 .create_table(
263 MigrationTable::create()
264 .table(ObjectDependency::Table)
265 .col(
266 ColumnDef::new(ObjectDependency::Id)
267 .integer()
268 .auto_increment()
269 .primary_key(),
270 )
271 .col(ColumnDef::new(ObjectDependency::Oid).integer().not_null())
272 .col(
273 ColumnDef::new(ObjectDependency::UsedBy)
274 .integer()
275 .not_null(),
276 )
277 .foreign_key(
278 &mut ForeignKey::create()
279 .name("FK_object_dependency_oid")
280 .from(ObjectDependency::Table, ObjectDependency::Oid)
281 .to(Object::Table, Object::Oid)
282 .on_delete(ForeignKeyAction::Cascade)
283 .to_owned(),
284 )
285 .foreign_key(
286 &mut ForeignKey::create()
287 .name("FK_object_dependency_used_by")
288 .from(ObjectDependency::Table, ObjectDependency::UsedBy)
289 .to(Object::Table, Object::Oid)
290 .on_delete(ForeignKeyAction::Cascade)
291 .to_owned(),
292 )
293 .to_owned(),
294 )
295 .await?;
296 manager
297 .create_table(
298 MigrationTable::create()
299 .table(Database::Table)
300 .col(ColumnDef::new(Database::DatabaseId).integer().primary_key())
301 .col(
302 ColumnDef::new(Database::Name)
303 .string()
304 .unique_key()
305 .not_null(),
306 )
307 .foreign_key(
308 &mut ForeignKey::create()
309 .name("FK_database_object_id")
310 .from(Database::Table, Database::DatabaseId)
311 .to(Object::Table, Object::Oid)
312 .on_delete(ForeignKeyAction::Cascade)
313 .to_owned(),
314 )
315 .to_owned(),
316 )
317 .await?;
318 manager
319 .create_table(
320 MigrationTable::create()
321 .table(Schema::Table)
322 .col(ColumnDef::new(Schema::SchemaId).integer().primary_key())
323 .col(ColumnDef::new(Schema::Name).string().not_null())
324 .foreign_key(
325 &mut ForeignKey::create()
326 .name("FK_schema_object_id")
327 .from(Schema::Table, Schema::SchemaId)
328 .to(Object::Table, Object::Oid)
329 .on_delete(ForeignKeyAction::Cascade)
330 .to_owned(),
331 )
332 .to_owned(),
333 )
334 .await?;
335 manager
336 .create_table(
337 MigrationTable::create()
338 .table(StreamingJob::Table)
339 .col(ColumnDef::new(StreamingJob::JobId).integer().primary_key())
340 .col(ColumnDef::new(StreamingJob::JobStatus).string().not_null())
341 .col(ColumnDef::new(StreamingJob::CreateType).string().not_null())
342 .col(ColumnDef::new(StreamingJob::Timezone).string())
343 .col(
344 ColumnDef::new(StreamingJob::Parallelism)
345 .json_binary()
346 .not_null(),
347 )
348 .foreign_key(
349 &mut ForeignKey::create()
350 .name("FK_streaming_job_object_id")
351 .from(StreamingJob::Table, StreamingJob::JobId)
352 .to(Object::Table, Object::Oid)
353 .on_delete(ForeignKeyAction::Cascade)
354 .to_owned(),
355 )
356 .to_owned(),
357 )
358 .await?;
359 manager
360 .create_table(
361 MigrationTable::create()
362 .table(Fragment::Table)
363 .col(
364 ColumnDef::new(Fragment::FragmentId)
365 .integer()
366 .primary_key()
367 .auto_increment(),
368 )
369 .col(ColumnDef::new(Fragment::JobId).integer().not_null())
370 .col(
371 ColumnDef::new(Fragment::FragmentTypeMask)
372 .integer()
373 .not_null(),
374 )
375 .col(
376 ColumnDef::new(Fragment::DistributionType)
377 .string()
378 .not_null(),
379 )
380 .col(
381 ColumnDef::new(Fragment::StreamNode)
382 .rw_binary(manager)
383 .not_null(),
384 )
385 .col(
386 ColumnDef::new(Fragment::VnodeMapping)
387 .rw_binary(manager)
388 .not_null(),
389 )
390 .col(ColumnDef::new(Fragment::StateTableIds).json_binary())
391 .col(ColumnDef::new(Fragment::UpstreamFragmentId).json_binary())
392 .foreign_key(
393 &mut ForeignKey::create()
394 .name("FK_fragment_table_id")
395 .from(Fragment::Table, Fragment::JobId)
396 .to(Object::Table, Object::Oid)
397 .on_delete(ForeignKeyAction::Cascade)
398 .to_owned(),
399 )
400 .to_owned(),
401 )
402 .await?;
403 manager
404 .create_table(
405 MigrationTable::create()
406 .table(Actor::Table)
407 .col(
408 ColumnDef::new(Actor::ActorId)
409 .integer()
410 .primary_key()
411 .auto_increment(),
412 )
413 .col(ColumnDef::new(Actor::FragmentId).integer().not_null())
414 .col(ColumnDef::new(Actor::Status).string().not_null())
415 .col(ColumnDef::new(Actor::Splits).rw_binary(manager))
416 .col(ColumnDef::new(Actor::ParallelUnitId).integer().not_null())
417 .col(ColumnDef::new(Actor::WorkerId).integer().not_null())
418 .col(ColumnDef::new(Actor::UpstreamActorIds).json_binary())
419 .col(ColumnDef::new(Actor::VnodeBitmap).rw_binary(manager))
420 .col(
421 ColumnDef::new(Actor::ExprContext)
422 .rw_binary(manager)
423 .not_null(),
424 )
425 .foreign_key(
426 &mut ForeignKey::create()
427 .name("FK_actor_fragment_id")
428 .from(Actor::Table, Actor::FragmentId)
429 .to(Fragment::Table, Fragment::FragmentId)
430 .on_delete(ForeignKeyAction::Cascade)
431 .to_owned(),
432 )
433 .to_owned(),
434 )
435 .await?;
436 manager
437 .create_table(
438 MigrationTable::create()
439 .table(ActorDispatcher::Table)
440 .col(
441 ColumnDef::new(ActorDispatcher::Id)
442 .integer()
443 .primary_key()
444 .auto_increment(),
445 )
446 .col(
447 ColumnDef::new(ActorDispatcher::ActorId)
448 .integer()
449 .not_null(),
450 )
451 .col(
452 ColumnDef::new(ActorDispatcher::DispatcherType)
453 .string()
454 .not_null(),
455 )
456 .col(
457 ColumnDef::new(ActorDispatcher::DistKeyIndices)
458 .json_binary()
459 .not_null(),
460 )
461 .col(
462 ColumnDef::new(ActorDispatcher::OutputIndices)
463 .json_binary()
464 .not_null(),
465 )
466 .col(ColumnDef::new(ActorDispatcher::HashMapping).rw_binary(manager))
467 .col(
468 ColumnDef::new(ActorDispatcher::DispatcherId)
469 .integer()
470 .not_null(),
471 )
472 .col(
473 ColumnDef::new(ActorDispatcher::DownstreamActorIds)
474 .json_binary()
475 .not_null(),
476 )
477 .col(ColumnDef::new(ActorDispatcher::DownstreamTableName).string())
478 .foreign_key(
479 &mut ForeignKey::create()
480 .name("FK_actor_dispatcher_actor_id")
481 .from(ActorDispatcher::Table, ActorDispatcher::ActorId)
482 .to(Actor::Table, Actor::ActorId)
483 .on_delete(ForeignKeyAction::Cascade)
484 .to_owned(),
485 )
486 .foreign_key(
487 &mut ForeignKey::create()
488 .name("FK_actor_dispatcher_dispatcher_id")
489 .from(ActorDispatcher::Table, ActorDispatcher::DispatcherId)
490 .to(Fragment::Table, Fragment::FragmentId)
491 .on_delete(ForeignKeyAction::Cascade)
492 .to_owned(),
493 )
494 .to_owned(),
495 )
496 .await?;
497 manager
498 .create_table(
499 MigrationTable::create()
500 .table(Connection::Table)
501 .col(
502 ColumnDef::new(Connection::ConnectionId)
503 .integer()
504 .primary_key(),
505 )
506 .col(ColumnDef::new(Connection::Name).string().not_null())
507 .col(
508 ColumnDef::new(Connection::Info)
509 .rw_binary(manager)
510 .not_null(),
511 )
512 .foreign_key(
513 &mut ForeignKey::create()
514 .name("FK_connection_object_id")
515 .from(Connection::Table, Connection::ConnectionId)
516 .to(Object::Table, Object::Oid)
517 .on_delete(ForeignKeyAction::Cascade)
518 .to_owned(),
519 )
520 .to_owned(),
521 )
522 .await?;
523 manager
524 .create_table(
525 MigrationTable::create()
526 .table(Source::Table)
527 .col(ColumnDef::new(Source::SourceId).integer().primary_key())
528 .col(ColumnDef::new(Source::Name).string().not_null())
529 .col(ColumnDef::new(Source::RowIdIndex).integer())
530 .col(
531 ColumnDef::new(Source::Columns)
532 .rw_binary(manager)
533 .not_null(),
534 )
535 .col(ColumnDef::new(Source::PkColumnIds).json_binary().not_null())
536 .col(
537 ColumnDef::new(Source::WithProperties)
538 .json_binary()
539 .not_null(),
540 )
541 .col(
542 ColumnDef::new(Source::Definition)
543 .rw_long_text(manager)
544 .not_null(),
545 )
546 .col(ColumnDef::new(Source::SourceInfo).rw_binary(manager))
547 .col(
548 ColumnDef::new(Source::WatermarkDescs)
549 .rw_binary(manager)
550 .not_null(),
551 )
552 .col(ColumnDef::new(Source::OptionalAssociatedTableId).integer())
553 .col(ColumnDef::new(Source::ConnectionId).integer())
554 .col(ColumnDef::new(Source::Version).big_integer().not_null())
555 .foreign_key(
556 &mut ForeignKey::create()
557 .name("FK_source_object_id")
558 .from(Source::Table, Source::SourceId)
559 .to(Object::Table, Object::Oid)
560 .on_delete(ForeignKeyAction::Cascade)
561 .to_owned(),
562 )
563 .foreign_key(
564 &mut ForeignKey::create()
565 .name("FK_source_connection_id")
566 .from(Source::Table, Source::ConnectionId)
567 .to(Connection::Table, Connection::ConnectionId)
568 .to_owned(),
569 )
570 .foreign_key(
571 &mut ForeignKey::create()
572 .name("FK_source_optional_associated_table_id")
573 .from(Source::Table, Source::OptionalAssociatedTableId)
574 .to(Object::Table, Object::Oid)
575 .on_delete(ForeignKeyAction::Cascade)
576 .to_owned(),
577 )
578 .to_owned(),
579 )
580 .await?;
581 manager
582 .create_table(
583 MigrationTable::create()
584 .table(Table::Table)
585 .col(ColumnDef::new(Table::TableId).integer().primary_key())
586 .col(ColumnDef::new(Table::Name).string().not_null())
587 .col(ColumnDef::new(Table::OptionalAssociatedSourceId).integer())
588 .col(ColumnDef::new(Table::TableType).string().not_null())
589 .col(ColumnDef::new(Table::BelongsToJobId).integer())
590 .col(ColumnDef::new(Table::Columns).rw_binary(manager).not_null())
591 .col(ColumnDef::new(Table::Pk).rw_binary(manager).not_null())
592 .col(
593 ColumnDef::new(Table::DistributionKey)
594 .json_binary()
595 .not_null(),
596 )
597 .col(ColumnDef::new(Table::StreamKey).json_binary().not_null())
598 .col(ColumnDef::new(Table::AppendOnly).boolean().not_null())
599 .col(ColumnDef::new(Table::FragmentId).integer())
600 .col(ColumnDef::new(Table::VnodeColIndex).integer())
601 .col(ColumnDef::new(Table::RowIdIndex).integer())
602 .col(ColumnDef::new(Table::ValueIndices).json_binary().not_null())
603 .col(
604 ColumnDef::new(Table::Definition)
605 .rw_long_text(manager)
606 .not_null(),
607 )
608 .col(
609 ColumnDef::new(Table::HandlePkConflictBehavior)
610 .string()
611 .not_null(),
612 )
613 .col(
614 ColumnDef::new(Table::ReadPrefixLenHint)
615 .integer()
616 .not_null(),
617 )
618 .col(
619 ColumnDef::new(Table::WatermarkIndices)
620 .json_binary()
621 .not_null(),
622 )
623 .col(ColumnDef::new(Table::DistKeyInPk).json_binary().not_null())
624 .col(ColumnDef::new(Table::DmlFragmentId).integer())
625 .col(ColumnDef::new(Table::Cardinality).rw_binary(manager))
626 .col(
627 ColumnDef::new(Table::CleanedByWatermark)
628 .boolean()
629 .not_null(),
630 )
631 .col(ColumnDef::new(Table::Description).string())
632 .col(ColumnDef::new(Table::Version).rw_binary(manager))
633 .col(ColumnDef::new(Table::RetentionSeconds).integer())
634 .col(
635 ColumnDef::new(Table::IncomingSinks)
636 .json_binary()
637 .not_null(),
638 )
639 .foreign_key(
640 &mut ForeignKey::create()
641 .name("FK_table_object_id")
642 .from(Table::Table, Table::TableId)
643 .to(Object::Table, Object::Oid)
644 .on_delete(ForeignKeyAction::Cascade)
645 .to_owned(),
646 )
647 .foreign_key(
648 &mut ForeignKey::create()
649 .name("FK_table_belongs_to_job_id")
650 .from(Table::Table, Table::BelongsToJobId)
651 .to(Object::Table, Object::Oid)
652 .on_delete(ForeignKeyAction::Cascade)
653 .to_owned(),
654 )
655 .foreign_key(
656 &mut ForeignKey::create()
657 .name("FK_table_fragment_id")
658 .from(Table::Table, Table::FragmentId)
659 .to(Fragment::Table, Fragment::FragmentId)
660 .on_delete(ForeignKeyAction::Cascade)
661 .to_owned(),
662 )
663 .foreign_key(
664 &mut ForeignKey::create()
665 .name("FK_table_dml_fragment_id")
666 .from(Table::Table, Table::DmlFragmentId)
667 .to(Fragment::Table, Fragment::FragmentId)
668 .to_owned(),
669 )
670 .foreign_key(
671 &mut ForeignKey::create()
672 .name("FK_table_optional_associated_source_id")
673 .from(Table::Table, Table::OptionalAssociatedSourceId)
674 .to(Object::Table, Object::Oid)
675 .on_delete(ForeignKeyAction::Cascade)
676 .to_owned(),
677 )
678 .to_owned(),
679 )
680 .await?;
681 manager
682 .create_table(
683 MigrationTable::create()
684 .table(Sink::Table)
685 .col(ColumnDef::new(Sink::SinkId).integer().primary_key())
686 .col(ColumnDef::new(Sink::Name).string().not_null())
687 .col(ColumnDef::new(Sink::Columns).rw_binary(manager).not_null())
688 .col(ColumnDef::new(Sink::PlanPk).rw_binary(manager).not_null())
689 .col(
690 ColumnDef::new(Sink::DistributionKey)
691 .json_binary()
692 .not_null(),
693 )
694 .col(ColumnDef::new(Sink::DownstreamPk).json_binary().not_null())
695 .col(ColumnDef::new(Sink::SinkType).string().not_null())
696 .col(ColumnDef::new(Sink::Properties).json_binary().not_null())
697 .col(
698 ColumnDef::new(Sink::Definition)
699 .rw_long_text(manager)
700 .not_null(),
701 )
702 .col(ColumnDef::new(Sink::ConnectionId).integer())
703 .col(ColumnDef::new(Sink::DbName).string().not_null())
704 .col(ColumnDef::new(Sink::SinkFromName).string().not_null())
705 .col(ColumnDef::new(Sink::SinkFormatDesc).rw_binary(manager))
706 .col(ColumnDef::new(Sink::TargetTable).integer())
707 .foreign_key(
708 &mut ForeignKey::create()
709 .name("FK_sink_object_id")
710 .from(Sink::Table, Sink::SinkId)
711 .to(Object::Table, Object::Oid)
712 .on_delete(ForeignKeyAction::Cascade)
713 .to_owned(),
714 )
715 .foreign_key(
716 &mut ForeignKey::create()
717 .name("FK_sink_connection_id")
718 .from(Sink::Table, Sink::ConnectionId)
719 .to(Connection::Table, Connection::ConnectionId)
720 .to_owned(),
721 )
722 .foreign_key(
723 &mut ForeignKey::create()
724 .name("FK_sink_target_table_id")
725 .from(Sink::Table, Sink::TargetTable)
726 .to(Table::Table, Table::TableId)
727 .to_owned(),
728 )
729 .to_owned(),
730 )
731 .await?;
732 manager
733 .create_table(
734 MigrationTable::create()
735 .table(View::Table)
736 .col(ColumnDef::new(View::ViewId).integer().primary_key())
737 .col(ColumnDef::new(View::Name).string().not_null())
738 .col(ColumnDef::new(View::Properties).json_binary().not_null())
739 .col(
740 ColumnDef::new(View::Definition)
741 .rw_long_text(manager)
742 .not_null(),
743 )
744 .col(ColumnDef::new(View::Columns).rw_binary(manager).not_null())
745 .foreign_key(
746 &mut ForeignKey::create()
747 .name("FK_view_object_id")
748 .from(View::Table, View::ViewId)
749 .to(Object::Table, Object::Oid)
750 .on_delete(ForeignKeyAction::Cascade)
751 .to_owned(),
752 )
753 .to_owned(),
754 )
755 .await?;
756 manager
757 .create_table(
758 MigrationTable::create()
759 .table(Index::Table)
760 .col(ColumnDef::new(Index::IndexId).integer().primary_key())
761 .col(ColumnDef::new(Index::Name).string().not_null())
762 .col(ColumnDef::new(Index::IndexTableId).integer().not_null())
763 .col(ColumnDef::new(Index::PrimaryTableId).integer().not_null())
764 .col(
765 ColumnDef::new(Index::IndexItems)
766 .rw_binary(manager)
767 .not_null(),
768 )
769 .col(ColumnDef::new(Index::IndexColumnsLen).integer().not_null())
770 .foreign_key(
771 &mut ForeignKey::create()
772 .name("FK_index_object_id")
773 .from(Index::Table, Index::IndexId)
774 .to(Object::Table, Object::Oid)
775 .on_delete(ForeignKeyAction::Cascade)
776 .to_owned(),
777 )
778 .foreign_key(
779 &mut ForeignKey::create()
780 .name("FK_index_index_table_id")
781 .from(Index::Table, Index::IndexTableId)
782 .to(Table::Table, Table::TableId)
783 .on_delete(ForeignKeyAction::Cascade)
784 .to_owned(),
785 )
786 .foreign_key(
787 &mut ForeignKey::create()
788 .name("FK_index_primary_table_id")
789 .from(Index::Table, Index::PrimaryTableId)
790 .to(Table::Table, Table::TableId)
791 .on_delete(ForeignKeyAction::Cascade)
792 .to_owned(),
793 )
794 .to_owned(),
795 )
796 .await?;
797 manager
798 .create_table(
799 MigrationTable::create()
800 .table(Function::Table)
801 .col(ColumnDef::new(Function::FunctionId).integer().primary_key())
802 .col(ColumnDef::new(Function::Name).string().not_null())
803 .col(ColumnDef::new(Function::ArgNames).string().not_null())
804 .col(
805 ColumnDef::new(Function::ArgTypes)
806 .rw_binary(manager)
807 .not_null(),
808 )
809 .col(
810 ColumnDef::new(Function::ReturnType)
811 .rw_binary(manager)
812 .not_null(),
813 )
814 .col(ColumnDef::new(Function::Language).string().not_null())
815 .col(ColumnDef::new(Function::Link).string())
816 .col(ColumnDef::new(Function::Identifier).string())
817 .col(ColumnDef::new(Function::Body).rw_long_text(manager))
818 .col(ColumnDef::new(Function::CompressedBinary).rw_long_text(manager))
820 .col(ColumnDef::new(Function::Kind).string().not_null())
821 .col(
822 ColumnDef::new(Function::AlwaysRetryOnNetworkError)
823 .boolean()
824 .not_null(),
825 )
826 .foreign_key(
827 &mut ForeignKey::create()
828 .name("FK_function_object_id")
829 .from(Function::Table, Function::FunctionId)
830 .to(Object::Table, Object::Oid)
831 .on_delete(ForeignKeyAction::Cascade)
832 .to_owned(),
833 )
834 .to_owned(),
835 )
836 .await?;
837 manager
838 .create_table(
839 MigrationTable::create()
840 .table(SystemParameter::Table)
841 .col(
842 ColumnDef::new(SystemParameter::Name)
843 .string()
844 .primary_key()
845 .not_null(),
846 )
847 .col(ColumnDef::new(SystemParameter::Value).string().not_null())
848 .col(
849 ColumnDef::new(SystemParameter::IsMutable)
850 .boolean()
851 .not_null(),
852 )
853 .col(ColumnDef::new(SystemParameter::Description).string())
854 .to_owned(),
855 )
856 .await?;
857 manager
858 .create_table(
859 crate::Table::create()
860 .table(CatalogVersion::Table)
861 .col(
862 ColumnDef::new(CatalogVersion::Name)
863 .string()
864 .not_null()
865 .primary_key(),
866 )
867 .col(
868 ColumnDef::new(CatalogVersion::Version)
869 .big_integer()
870 .not_null(),
871 )
872 .to_owned(),
873 )
874 .await?;
875
876 manager
878 .create_index(
879 MigrationIndex::create()
880 .table(Worker::Table)
881 .name("idx_worker_host_port")
882 .unique()
883 .col(Worker::Host)
884 .col(Worker::Port)
885 .to_owned(),
886 )
887 .await?;
888 manager
889 .create_index(
890 MigrationIndex::create()
891 .table(UserPrivilege::Table)
892 .name("idx_user_privilege_item")
893 .unique()
894 .col(UserPrivilege::UserId)
895 .col(UserPrivilege::Oid)
896 .col(UserPrivilege::Action)
897 .col(UserPrivilege::GrantedBy)
898 .to_owned(),
899 )
900 .await?;
901
902 let insert_cluster_id = Query::insert()
904 .into_table(Cluster::Table)
905 .columns([Cluster::ClusterId])
906 .values_panic([uuid::Uuid::new_v4().into()])
907 .to_owned();
908 let insert_sys_users = Query::insert()
909 .into_table(User::Table)
910 .columns([
911 User::UserId,
912 User::Name,
913 User::IsSuper,
914 User::CanCreateUser,
915 User::CanCreateDb,
916 User::CanLogin,
917 ])
918 .values_panic([
919 1.into(),
920 "root".into(),
921 true.into(),
922 true.into(),
923 true.into(),
924 true.into(),
925 ])
926 .values_panic([
927 2.into(),
928 "postgres".into(),
929 true.into(),
930 true.into(),
931 true.into(),
932 true.into(),
933 ])
934 .to_owned();
935
936 let insert_objects = Query::insert()
938 .into_table(Object::Table)
939 .columns([
940 Object::Oid,
941 Object::ObjType,
942 Object::OwnerId,
943 Object::DatabaseId,
944 ])
945 .values_panic([1.into(), "DATABASE".into(), 1.into(), None::<i32>.into()])
946 .values_panic([2.into(), "SCHEMA".into(), 1.into(), 1.into()]) .values_panic([3.into(), "SCHEMA".into(), 1.into(), 1.into()]) .values_panic([4.into(), "SCHEMA".into(), 1.into(), 1.into()]) .values_panic([5.into(), "SCHEMA".into(), 1.into(), 1.into()]) .to_owned();
951
952 let insert_sys_database = Query::insert()
953 .into_table(Database::Table)
954 .columns([Database::DatabaseId, Database::Name])
955 .values_panic([1.into(), "dev".into()])
956 .to_owned();
957 let insert_sys_schemas = Query::insert()
958 .into_table(Schema::Table)
959 .columns([Schema::SchemaId, Schema::Name])
960 .values_panic([2.into(), "public".into()])
961 .values_panic([3.into(), "pg_catalog".into()])
962 .values_panic([4.into(), "information_schema".into()])
963 .values_panic([5.into(), "rw_catalog".into()])
964 .to_owned();
965
966 manager.exec_stmt(insert_cluster_id).await?;
967 manager.exec_stmt(insert_sys_users).await?;
968 manager.exec_stmt(insert_objects).await?;
969 manager.exec_stmt(insert_sys_database).await?;
970 manager.exec_stmt(insert_sys_schemas).await?;
971
972 match manager.get_database_backend() {
974 DbBackend::MySql => {
975 manager
976 .get_connection()
977 .execute(Statement::from_string(
978 DatabaseBackend::MySql,
979 "ALTER TABLE object AUTO_INCREMENT = 6",
980 ))
981 .await?;
982 manager
983 .get_connection()
984 .execute(Statement::from_string(
985 DatabaseBackend::MySql,
986 "ALTER TABLE user AUTO_INCREMENT = 3",
987 ))
988 .await?;
989 }
990 DbBackend::Postgres => {
991 manager
992 .get_connection()
993 .execute(Statement::from_string(
994 DatabaseBackend::Postgres,
995 "SELECT setval('object_oid_seq', 5)",
996 ))
997 .await?;
998 manager
999 .get_connection()
1000 .execute(Statement::from_string(
1001 DatabaseBackend::Postgres,
1002 "SELECT setval('user_user_id_seq', 2)",
1003 ))
1004 .await?;
1005 }
1006 DbBackend::Sqlite => {}
1007 }
1008
1009 Ok(())
1010 }
1011
1012 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
1013 drop_tables!(
1015 manager,
1016 Cluster,
1017 WorkerProperty,
1018 Worker,
1019 UserPrivilege,
1020 Database,
1021 Schema,
1022 StreamingJob,
1023 ActorDispatcher,
1024 Actor,
1025 Sink,
1026 Index,
1027 Table,
1028 Fragment,
1029 Source,
1030 Connection,
1031 View,
1032 Function,
1033 ObjectDependency,
1034 Object,
1035 User,
1036 SystemParameter,
1037 CatalogVersion
1038 );
1039 Ok(())
1040 }
1041}
1042
1043#[derive(DeriveIden)]
1044enum Cluster {
1045 Table,
1046 ClusterId,
1047 CreatedAt,
1048}
1049
1050#[derive(DeriveIden)]
1051enum Worker {
1052 Table,
1053 WorkerId,
1054 WorkerType,
1055 Host,
1056 Port,
1057 TransactionId,
1058 Status,
1059}
1060
1061#[derive(DeriveIden)]
1062enum WorkerProperty {
1063 Table,
1064 WorkerId,
1065 ParallelUnitIds,
1066 IsStreaming,
1067 IsServing,
1068 IsUnschedulable,
1069}
1070
1071#[derive(DeriveIden)]
1072enum User {
1073 Table,
1074 UserId,
1075 Name,
1076 IsSuper,
1077 CanCreateDb,
1078 CanCreateUser,
1079 CanLogin,
1080 AuthInfo,
1081}
1082
1083#[derive(DeriveIden)]
1084enum UserPrivilege {
1085 Table,
1086 Id,
1087 DependentId,
1088 UserId,
1089 Oid,
1090 GrantedBy,
1091 Action,
1092 WithGrantOption,
1093}
1094
1095#[derive(DeriveIden)]
1096enum Database {
1097 Table,
1098 DatabaseId,
1099 Name,
1100}
1101
1102#[derive(DeriveIden)]
1103enum Schema {
1104 Table,
1105 SchemaId,
1106 Name,
1107}
1108
1109#[derive(DeriveIden)]
1110enum Fragment {
1111 Table,
1112 FragmentId,
1113 JobId,
1114 FragmentTypeMask,
1115 DistributionType,
1116 StreamNode,
1117 VnodeMapping,
1118 StateTableIds,
1119 UpstreamFragmentId,
1120}
1121
1122#[derive(DeriveIden)]
1123enum Actor {
1124 Table,
1125 ActorId,
1126 FragmentId,
1127 Status,
1128 Splits,
1129 ParallelUnitId,
1130 WorkerId,
1131 UpstreamActorIds,
1132 VnodeBitmap,
1133 ExprContext,
1134}
1135
1136#[derive(DeriveIden)]
1137enum ActorDispatcher {
1138 Table,
1139 Id,
1140 ActorId,
1141 DispatcherType,
1142 DistKeyIndices,
1143 OutputIndices,
1144 HashMapping,
1145 DispatcherId,
1146 DownstreamActorIds,
1147 DownstreamTableName,
1148}
1149
1150#[derive(DeriveIden)]
1151enum StreamingJob {
1152 Table,
1153 JobId,
1154 JobStatus,
1155 Timezone,
1156 CreateType,
1157 Parallelism,
1158}
1159
1160#[derive(DeriveIden)]
1161#[allow(clippy::enum_variant_names)]
1162enum Table {
1163 Table,
1164 TableId,
1165 Name,
1166 OptionalAssociatedSourceId,
1167 TableType,
1168 BelongsToJobId,
1169 Columns,
1170 Pk,
1171 DistributionKey,
1172 StreamKey,
1173 AppendOnly,
1174 FragmentId,
1175 VnodeColIndex,
1176 RowIdIndex,
1177 ValueIndices,
1178 Definition,
1179 HandlePkConflictBehavior,
1180 ReadPrefixLenHint,
1181 WatermarkIndices,
1182 DistKeyInPk,
1183 DmlFragmentId,
1184 Cardinality,
1185 CleanedByWatermark,
1186 Description,
1187 Version,
1188 RetentionSeconds,
1189 IncomingSinks,
1190}
1191
1192#[derive(DeriveIden)]
1193enum Source {
1194 Table,
1195 SourceId,
1196 Name,
1197 RowIdIndex,
1198 Columns,
1199 PkColumnIds,
1200 WithProperties,
1201 Definition,
1202 SourceInfo,
1203 WatermarkDescs,
1204 OptionalAssociatedTableId,
1205 ConnectionId,
1206 Version,
1207}
1208
1209#[derive(DeriveIden)]
1210enum Sink {
1211 Table,
1212 SinkId,
1213 Name,
1214 Columns,
1215 PlanPk,
1216 DistributionKey,
1217 DownstreamPk,
1218 SinkType,
1219 Properties,
1220 Definition,
1221 ConnectionId,
1222 DbName,
1223 SinkFromName,
1224 SinkFormatDesc,
1225 TargetTable,
1226}
1227
1228#[derive(DeriveIden)]
1229enum Connection {
1230 Table,
1231 ConnectionId,
1232 Name,
1233 Info,
1234}
1235
1236#[derive(DeriveIden)]
1237enum View {
1238 Table,
1239 ViewId,
1240 Name,
1241 Properties,
1242 Definition,
1243 Columns,
1244}
1245
1246#[derive(DeriveIden)]
1247enum Index {
1248 Table,
1249 IndexId,
1250 Name,
1251 IndexTableId,
1252 PrimaryTableId,
1253 IndexItems,
1254 IndexColumnsLen,
1255}
1256
1257#[derive(DeriveIden)]
1258enum Function {
1259 Table,
1260 FunctionId,
1261 Name,
1262 ArgNames,
1263 ArgTypes,
1264 ReturnType,
1265 Language,
1266 Link,
1267 Identifier,
1268 Body,
1269 CompressedBinary,
1270 Kind,
1271 AlwaysRetryOnNetworkError,
1272}
1273
1274#[derive(DeriveIden)]
1275pub(crate) enum Object {
1276 Table,
1277 Oid,
1278 ObjType,
1279 OwnerId,
1280 SchemaId,
1281 DatabaseId,
1282 InitializedAt,
1283 CreatedAt,
1284 InitializedAtClusterVersion,
1285 CreatedAtClusterVersion,
1286}
1287
1288#[derive(DeriveIden)]
1289enum ObjectDependency {
1290 Table,
1291 Id,
1292 Oid,
1293 UsedBy,
1294}
1295
1296#[derive(DeriveIden)]
1297enum SystemParameter {
1298 Table,
1299 Name,
1300 Value,
1301 IsMutable,
1302 Description,
1303}
1304
1305#[derive(DeriveIden)]
1306enum CatalogVersion {
1307 Table,
1308 Name,
1309 Version,
1310}