1use std::collections::BTreeMap;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use risingwave_common::bail;
20use risingwave_common::cast::datetime_to_timestamp_millis;
21use risingwave_common::hash::VnodeCount;
22use risingwave_common::util::epoch::Epoch;
23use risingwave_meta_model::{
24 PrivateLinkService, connection, database, function, index, object, schema, secret, sink,
25 source, streaming_job as streaming_job_model, subscription, table, view,
26};
27use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait};
28use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo;
29use risingwave_pb::catalog::table::{CdcTableType as PbCdcTableType, PbEngine, PbTableType};
30use risingwave_pb::catalog::{
31 PbConnection, PbCreateType, PbDatabase, PbFunction, PbHandleConflictBehavior, PbIndex,
32 PbSchema, PbSecret, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable,
33 PbView,
34};
35use sea_orm::{ConnectOptions, DatabaseConnection, DbBackend, ModelTrait};
36
37use crate::{MetaError, MetaResult, MetaStoreBackend};
38
39pub mod catalog;
40pub mod cluster;
41pub mod fragment;
42pub mod id;
43pub mod rename;
44pub mod scale;
45pub mod session_params;
46pub mod streaming_job;
47pub mod system_param;
48pub mod user;
49pub mod utils;
50
51impl From<sea_orm::DbErr> for MetaError {
53 fn from(err: sea_orm::DbErr) -> Self {
54 if let Some(err) = err.sql_err() {
55 return anyhow!(err).into();
56 }
57 anyhow!(err).into()
58 }
59}
60
61#[derive(Clone)]
62pub struct SqlMetaStore {
63 pub conn: DatabaseConnection,
64 pub endpoint: String,
65}
66
67impl SqlMetaStore {
68 pub async fn connect(backend: MetaStoreBackend) -> MetaResult<Self> {
70 const MAX_DURATION: Duration = Duration::new(u64::MAX / 4, 0);
71
72 #[easy_ext::ext]
73 impl ConnectOptions {
74 fn sqlite_common(&mut self) -> &mut Self {
76 self
77 .min_connections(1)
80 .max_connections(1)
81 .acquire_timeout(MAX_DURATION)
85 .connect_timeout(MAX_DURATION)
86 }
87 }
88
89 Ok(match backend {
90 MetaStoreBackend::Mem => {
91 const IN_MEMORY_STORE: &str = "sqlite::memory:";
92
93 let mut options = ConnectOptions::new(IN_MEMORY_STORE);
94
95 options
96 .sqlite_common()
97 .idle_timeout(MAX_DURATION)
102 .max_lifetime(MAX_DURATION);
103
104 let conn = sea_orm::Database::connect(options).await?;
105 Self {
106 conn,
107 endpoint: IN_MEMORY_STORE.to_owned(),
108 }
109 }
110 MetaStoreBackend::Sql { endpoint, config } => {
111 let mut options = ConnectOptions::new(endpoint.clone());
112 options
113 .max_connections(config.max_connections)
114 .min_connections(config.min_connections)
115 .connect_timeout(Duration::from_secs(config.connection_timeout_sec))
116 .idle_timeout(Duration::from_secs(config.idle_timeout_sec))
117 .acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));
118
119 if DbBackend::Sqlite.is_prefix_of(&endpoint) {
120 if endpoint.contains(":memory:") || endpoint.contains("mode=memory") {
121 bail!(
122 "use the `mem` backend instead of specifying a URL of in-memory SQLite"
123 );
124 }
125 options.sqlite_common();
126 }
127
128 let conn = sea_orm::Database::connect(options).await?;
129 Self { conn, endpoint }
130 }
131 })
132 }
133
134 #[cfg(any(test, feature = "test"))]
135 pub async fn for_test() -> Self {
136 let this = Self::connect(MetaStoreBackend::Mem).await.unwrap();
137 Migrator::up(&this.conn, None).await.unwrap();
138 this
139 }
140
141 async fn is_first_launch(&self) -> MetaResult<bool> {
147 let migrations = Migrator::get_applied_migrations(&self.conn)
148 .await
149 .context("failed to get applied migrations")?;
150 for migration in migrations {
151 if migration.name() == "m20230908_072257_init"
152 && migration.status() == MigrationStatus::Applied
153 {
154 return Ok(false);
155 }
156 }
157 Ok(true)
158 }
159
160 pub async fn up(&self) -> MetaResult<bool> {
164 let cluster_first_launch = self.is_first_launch().await?;
165 Migrator::up(&self.conn, None)
167 .await
168 .context("failed to upgrade models in meta store")?;
169
170 Ok(cluster_first_launch)
171 }
172}
173
174pub struct ObjectModel<M: ModelTrait>(M, object::Model, Option<streaming_job_model::Model>);
175
176fn stream_job_status_and_create_type(
177 streaming_job: &Option<streaming_job_model::Model>,
178) -> (i32, i32) {
179 streaming_job
180 .as_ref()
181 .map(|job| {
182 (
183 PbStreamJobStatus::from(job.job_status) as i32,
184 PbCreateType::from(job.create_type.clone()) as i32,
185 )
186 })
187 .unwrap_or((
188 PbStreamJobStatus::Created as i32,
189 PbCreateType::Foreground as i32,
190 ))
191}
192
193impl From<ObjectModel<database::Model>> for PbDatabase {
194 fn from(value: ObjectModel<database::Model>) -> Self {
195 Self {
196 id: value.0.database_id,
197 name: value.0.name,
198 owner: value.1.owner_id as _,
199 resource_group: value.0.resource_group.clone(),
200 barrier_interval_ms: value.0.barrier_interval_ms.map(|v| v as u32),
201 checkpoint_frequency: value.0.checkpoint_frequency.map(|v| v as u64),
202 }
203 }
204}
205
206impl From<ObjectModel<secret::Model>> for PbSecret {
207 fn from(value: ObjectModel<secret::Model>) -> Self {
208 Self {
209 id: value.0.secret_id,
210 name: value.0.name,
211 database_id: value.1.database_id.unwrap(),
212 value: value.0.value,
213 owner: value.1.owner_id as _,
214 schema_id: value.1.schema_id.unwrap(),
215 }
216 }
217}
218
219impl From<ObjectModel<schema::Model>> for PbSchema {
220 fn from(value: ObjectModel<schema::Model>) -> Self {
221 Self {
222 id: value.0.schema_id,
223 name: value.0.name,
224 database_id: value.1.database_id.unwrap(),
225 owner: value.1.owner_id as _,
226 }
227 }
228}
229
230impl From<ObjectModel<table::Model>> for PbTable {
231 fn from(value: ObjectModel<table::Model>) -> Self {
232 let (stream_job_status, create_type) = stream_job_status_and_create_type(&value.2);
233 Self {
234 id: value.0.table_id,
235 schema_id: value.1.schema_id.unwrap(),
236 database_id: value.1.database_id.unwrap(),
237 name: value.0.name,
238 columns: value.0.columns.to_protobuf(),
239 pk: value.0.pk.to_protobuf(),
240 table_type: PbTableType::from(value.0.table_type) as _,
241 distribution_key: value.0.distribution_key.0,
242 stream_key: value.0.stream_key.0,
243 append_only: value.0.append_only,
244 owner: value.1.owner_id as _,
245 fragment_id: value.0.fragment_id.unwrap_or_default(),
246 vnode_col_index: value.0.vnode_col_index.map(|index| index as _),
247 row_id_index: value.0.row_id_index.map(|index| index as _),
248 value_indices: value.0.value_indices.0,
249 definition: value.0.definition,
250 handle_pk_conflict_behavior: PbHandleConflictBehavior::from(
251 value.0.handle_pk_conflict_behavior,
252 ) as _,
253 version_column_indices: value
254 .0
255 .version_column_indices
256 .unwrap_or_default()
257 .0
258 .iter()
259 .map(|&idx| idx as u32)
260 .collect(),
261 read_prefix_len_hint: value.0.read_prefix_len_hint as _,
262 watermark_indices: value.0.watermark_indices.0,
263 dist_key_in_pk: value.0.dist_key_in_pk.0,
264 dml_fragment_id: value.0.dml_fragment_id,
265 cardinality: value
266 .0
267 .cardinality
268 .map(|cardinality| cardinality.to_protobuf()),
269 initialized_at_epoch: Some(
270 Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.initialized_at) as _)
271 .0,
272 ),
273 created_at_epoch: Some(
274 Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.created_at) as _).0,
275 ),
276 #[expect(deprecated)]
277 cleaned_by_watermark: value.0.cleaned_by_watermark,
278 stream_job_status,
279 create_type,
280 version: value.0.version.map(|v| v.to_protobuf()),
281 optional_associated_source_id: value.0.optional_associated_source_id.map(Into::into),
282 description: value.0.description,
283 #[expect(deprecated)]
284 incoming_sinks: vec![],
285 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
286 created_at_cluster_version: value.1.created_at_cluster_version,
287 retention_seconds: value.0.retention_seconds.map(|id| id as u32),
288 cdc_table_id: value.0.cdc_table_id,
289 maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(),
290 webhook_info: value.0.webhook_info.map(|info| info.to_protobuf()),
291 job_id: value.0.belongs_to_job_id,
292 engine: value.0.engine.map(|engine| PbEngine::from(engine) as i32),
293 #[expect(deprecated)]
294 clean_watermark_index_in_pk: value.0.clean_watermark_index_in_pk,
295 clean_watermark_indices: value
296 .0
297 .clean_watermark_indices
298 .map(|indices| indices.0.iter().map(|&x| x as u32).collect())
299 .unwrap_or_default(),
300 refreshable: value.0.refreshable,
301 vector_index_info: value.0.vector_index_info.map(|index| index.to_protobuf()),
302 cdc_table_type: value
303 .0
304 .cdc_table_type
305 .map(|cdc_type| PbCdcTableType::from(cdc_type) as i32),
306 }
307 }
308}
309
310impl From<ObjectModel<source::Model>> for PbSource {
311 fn from(value: ObjectModel<source::Model>) -> Self {
312 let mut secret_ref_map = BTreeMap::new();
313 if let Some(secret_ref) = value.0.secret_ref {
314 secret_ref_map = secret_ref.to_protobuf();
315 }
316 Self {
317 id: value.0.source_id as _,
318 schema_id: value.1.schema_id.unwrap(),
319 database_id: value.1.database_id.unwrap(),
320 name: value.0.name,
321 row_id_index: value.0.row_id_index.map(|id| id as _),
322 columns: value.0.columns.to_protobuf(),
323 pk_column_ids: value.0.pk_column_ids.0,
324 with_properties: value.0.with_properties.0,
325 owner: value.1.owner_id as _,
326 info: value.0.source_info.map(|info| info.to_protobuf()),
327 watermark_descs: value.0.watermark_descs.to_protobuf(),
328 definition: value.0.definition,
329 connection_id: value.0.connection_id,
330 initialized_at_epoch: Some(
332 Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.initialized_at) as _)
333 .0,
334 ),
335 created_at_epoch: Some(
336 Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.created_at) as _).0,
337 ),
338 version: value.0.version as _,
339 optional_associated_table_id: value.0.optional_associated_table_id.map(Into::into),
340 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
341 created_at_cluster_version: value.1.created_at_cluster_version,
342 secret_refs: secret_ref_map,
343 rate_limit: value.0.rate_limit.map(|v| v as _),
344 refresh_mode: value
345 .0
346 .refresh_mode
347 .map(|refresh_mode| refresh_mode.to_protobuf()),
348 }
349 }
350}
351
352impl From<ObjectModel<sink::Model>> for PbSink {
353 fn from(value: ObjectModel<sink::Model>) -> Self {
354 let (stream_job_status, create_type) = stream_job_status_and_create_type(&value.2);
355 let mut secret_ref_map = BTreeMap::new();
356 if let Some(secret_ref) = value.0.secret_ref {
357 secret_ref_map = secret_ref.to_protobuf();
358 }
359 Self {
360 id: value.0.sink_id as _,
361 schema_id: value.1.schema_id.unwrap(),
362 database_id: value.1.database_id.unwrap(),
363 name: value.0.name,
364 columns: value.0.columns.to_protobuf(),
365 plan_pk: value.0.plan_pk.to_protobuf(),
366 distribution_key: value.0.distribution_key.0,
367 downstream_pk: value.0.downstream_pk.0,
368 sink_type: PbSinkType::from(value.0.sink_type) as _,
369 raw_ignore_delete: value.0.ignore_delete,
370 owner: value.1.owner_id as _,
371 properties: value.0.properties.0,
372 definition: value.0.definition,
373 connection_id: value.0.connection_id,
374 initialized_at_epoch: Some(
375 Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.initialized_at) as _)
376 .0,
377 ),
378 created_at_epoch: Some(
379 Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.created_at) as _).0,
380 ),
381 db_name: value.0.db_name,
382 sink_from_name: value.0.sink_from_name,
383 stream_job_status,
384 format_desc: value.0.sink_format_desc.map(|desc| desc.to_protobuf()),
385 target_table: value.0.target_table,
386 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
387 created_at_cluster_version: value.1.created_at_cluster_version,
388 create_type,
389 secret_refs: secret_ref_map,
390 original_target_columns: value
391 .0
392 .original_target_columns
393 .map(|cols| cols.to_protobuf())
394 .unwrap_or_default(),
395 auto_refresh_schema_from_table: value.0.auto_refresh_schema_from_table,
396 }
397 }
398}
399
400impl From<ObjectModel<subscription::Model>> for PbSubscription {
401 fn from(value: ObjectModel<subscription::Model>) -> Self {
402 Self {
403 id: value.0.subscription_id as _,
404 schema_id: value.1.schema_id.unwrap(),
405 database_id: value.1.database_id.unwrap(),
406 name: value.0.name,
407 owner: value.1.owner_id as _,
408 retention_seconds: value.0.retention_seconds as _,
409 definition: value.0.definition,
410 initialized_at_epoch: Some(
411 Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.initialized_at) as _)
412 .0,
413 ),
414 created_at_epoch: Some(
415 Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.created_at) as _).0,
416 ),
417 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
418 created_at_cluster_version: value.1.created_at_cluster_version,
419 dependent_table_id: value.0.dependent_table_id,
420 subscription_state: value.0.subscription_state as _,
421 }
422 }
423}
424
425impl From<ObjectModel<index::Model>> for PbIndex {
426 fn from(value: ObjectModel<index::Model>) -> Self {
427 let (stream_job_status, create_type) = stream_job_status_and_create_type(&value.2);
428 Self {
429 id: value.0.index_id as _,
430 schema_id: value.1.schema_id.unwrap(),
431 database_id: value.1.database_id.unwrap(),
432 name: value.0.name,
433 owner: value.1.owner_id as _,
434 index_table_id: value.0.index_table_id,
435 primary_table_id: value.0.primary_table_id,
436 index_item: value.0.index_items.to_protobuf(),
437 index_column_properties: value
438 .0
439 .index_column_properties
440 .map(|p| p.to_protobuf())
441 .unwrap_or_default(),
442 index_columns_len: value.0.index_columns_len as _,
443 initialized_at_epoch: Some(
444 Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.initialized_at) as _)
445 .0,
446 ),
447 created_at_epoch: Some(
448 Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.created_at) as _).0,
449 ),
450 stream_job_status,
451 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
452 created_at_cluster_version: value.1.created_at_cluster_version,
453 create_type,
454 }
455 }
456}
457
458impl From<ObjectModel<view::Model>> for PbView {
459 fn from(value: ObjectModel<view::Model>) -> Self {
460 Self {
461 id: value.0.view_id as _,
462 schema_id: value.1.schema_id.unwrap(),
463 database_id: value.1.database_id.unwrap(),
464 name: value.0.name,
465 owner: value.1.owner_id as _,
466 properties: value.0.properties.0,
467 sql: value.0.definition,
468 columns: value.0.columns.to_protobuf(),
469 created_at_epoch: Some(
470 Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.created_at) as _).0,
471 ),
472 created_at_cluster_version: value.1.created_at_cluster_version,
473 }
474 }
475}
476
477impl From<ObjectModel<connection::Model>> for PbConnection {
478 fn from(value: ObjectModel<connection::Model>) -> Self {
479 let info: PbConnectionInfo = if value.0.info == PrivateLinkService::default() {
480 PbConnectionInfo::ConnectionParams(value.0.params.to_protobuf())
481 } else {
482 PbConnectionInfo::PrivateLinkService(value.0.info.to_protobuf())
483 };
484 Self {
485 id: value.1.oid.as_connection_id(),
486 schema_id: value.1.schema_id.unwrap(),
487 database_id: value.1.database_id.unwrap(),
488 name: value.0.name,
489 owner: value.1.owner_id as _,
490 info: Some(info),
491 }
492 }
493}
494
495impl From<ObjectModel<function::Model>> for PbFunction {
496 fn from(value: ObjectModel<function::Model>) -> Self {
497 Self {
498 id: value.0.function_id as _,
499 schema_id: value.1.schema_id.unwrap(),
500 database_id: value.1.database_id.unwrap(),
501 name: value.0.name,
502 owner: value.1.owner_id as _,
503 arg_names: value.0.arg_names.split(',').map(|s| s.to_owned()).collect(),
504 arg_types: value.0.arg_types.to_protobuf(),
505 return_type: Some(value.0.return_type.to_protobuf()),
506 language: value.0.language,
507 runtime: value.0.runtime,
508 link: value.0.link,
509 name_in_runtime: value.0.name_in_runtime,
510 body: value.0.body,
511 compressed_binary: value.0.compressed_binary,
512 kind: Some(value.0.kind.into()),
513 always_retry_on_network_error: value.0.always_retry_on_network_error,
514 is_async: value
515 .0
516 .options
517 .as_ref()
518 .and_then(|o| o.0.get("async").map(|v| v == "true")),
519 is_batched: value
520 .0
521 .options
522 .as_ref()
523 .and_then(|o| o.0.get("batch").map(|v| v == "true")),
524 created_at_epoch: Some(
525 Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.created_at) as _).0,
526 ),
527 created_at_cluster_version: value.1.created_at_cluster_version,
528 }
529 }
530}