1use std::collections::BTreeMap;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use risingwave_common::bail;
20use risingwave_common::hash::VnodeCount;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_meta_model::{
23 PrivateLinkService, connection, database, function, index, object, schema, secret, sink,
24 source, subscription, table, view,
25};
26use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait};
27use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo;
28use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
29use risingwave_pb::catalog::table::{PbEngine, PbOptionalAssociatedSourceId, PbTableType};
30use risingwave_pb::catalog::{
31 PbConnection, PbCreateType, PbDatabase, PbFunction, PbHandleConflictBehavior, PbIndex,
32 PbSchema, PbSecret, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable,
33 PbView,
34};
35use sea_orm::{ConnectOptions, DatabaseConnection, DbBackend, ModelTrait};
36
37use crate::{MetaError, MetaResult, MetaStoreBackend};
38
39pub mod catalog;
40pub mod cluster;
41pub mod fragment;
42pub mod id;
43pub mod rename;
44pub mod scale;
45pub mod session_params;
46pub mod streaming_job;
47pub mod system_param;
48pub mod user;
49pub mod utils;
50
51impl From<sea_orm::DbErr> for MetaError {
53 fn from(err: sea_orm::DbErr) -> Self {
54 if let Some(err) = err.sql_err() {
55 return anyhow!(err).into();
56 }
57 anyhow!(err).into()
58 }
59}
60
61#[derive(Clone)]
62pub struct SqlMetaStore {
63 pub conn: DatabaseConnection,
64 pub endpoint: String,
65}
66
67impl SqlMetaStore {
68 pub async fn connect(backend: MetaStoreBackend) -> MetaResult<Self> {
70 const MAX_DURATION: Duration = Duration::new(u64::MAX / 4, 0);
71
72 #[easy_ext::ext]
73 impl ConnectOptions {
74 fn sqlite_common(&mut self) -> &mut Self {
76 self
77 .min_connections(1)
80 .max_connections(1)
81 .acquire_timeout(MAX_DURATION)
85 .connect_timeout(MAX_DURATION)
86 }
87 }
88
89 Ok(match backend {
90 MetaStoreBackend::Mem => {
91 const IN_MEMORY_STORE: &str = "sqlite::memory:";
92
93 let mut options = ConnectOptions::new(IN_MEMORY_STORE);
94
95 options
96 .sqlite_common()
97 .idle_timeout(MAX_DURATION)
102 .max_lifetime(MAX_DURATION);
103
104 let conn = sea_orm::Database::connect(options).await?;
105 Self {
106 conn,
107 endpoint: IN_MEMORY_STORE.to_owned(),
108 }
109 }
110 MetaStoreBackend::Sql { endpoint, config } => {
111 let mut options = ConnectOptions::new(endpoint.clone());
112 options
113 .max_connections(config.max_connections)
114 .min_connections(config.min_connections)
115 .connect_timeout(Duration::from_secs(config.connection_timeout_sec))
116 .idle_timeout(Duration::from_secs(config.idle_timeout_sec))
117 .acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));
118
119 if DbBackend::Sqlite.is_prefix_of(&endpoint) {
120 if endpoint.contains(":memory:") || endpoint.contains("mode=memory") {
121 bail!(
122 "use the `mem` backend instead of specifying a URL of in-memory SQLite"
123 );
124 }
125 options.sqlite_common();
126 }
127
128 let conn = sea_orm::Database::connect(options).await?;
129 Self { conn, endpoint }
130 }
131 })
132 }
133
134 #[cfg(any(test, feature = "test"))]
135 pub async fn for_test() -> Self {
136 let this = Self::connect(MetaStoreBackend::Mem).await.unwrap();
137 Migrator::up(&this.conn, None).await.unwrap();
138 this
139 }
140
141 async fn is_first_launch(&self) -> MetaResult<bool> {
147 let migrations = Migrator::get_applied_migrations(&self.conn)
148 .await
149 .context("failed to get applied migrations")?;
150 for migration in migrations {
151 if migration.name() == "m20230908_072257_init"
152 && migration.status() == MigrationStatus::Applied
153 {
154 return Ok(false);
155 }
156 }
157 Ok(true)
158 }
159
160 pub async fn up(&self) -> MetaResult<bool> {
164 let cluster_first_launch = self.is_first_launch().await?;
165 Migrator::up(&self.conn, None)
167 .await
168 .context("failed to upgrade models in meta store")?;
169
170 Ok(cluster_first_launch)
171 }
172}
173
174pub struct ObjectModel<M: ModelTrait>(M, object::Model);
175
176impl From<ObjectModel<database::Model>> for PbDatabase {
177 fn from(value: ObjectModel<database::Model>) -> Self {
178 Self {
179 id: value.0.database_id as _,
180 name: value.0.name,
181 owner: value.1.owner_id as _,
182 resource_group: value.0.resource_group.clone(),
183 barrier_interval_ms: value.0.barrier_interval_ms.map(|v| v as u32),
184 checkpoint_frequency: value.0.checkpoint_frequency.map(|v| v as u64),
185 }
186 }
187}
188
189impl From<ObjectModel<secret::Model>> for PbSecret {
190 fn from(value: ObjectModel<secret::Model>) -> Self {
191 Self {
192 id: value.0.secret_id as _,
193 name: value.0.name,
194 database_id: value.1.database_id.unwrap() as _,
195 value: value.0.value,
196 owner: value.1.owner_id as _,
197 schema_id: value.1.schema_id.unwrap() as _,
198 }
199 }
200}
201
202impl From<ObjectModel<schema::Model>> for PbSchema {
203 fn from(value: ObjectModel<schema::Model>) -> Self {
204 Self {
205 id: value.0.schema_id as _,
206 name: value.0.name,
207 database_id: value.1.database_id.unwrap() as _,
208 owner: value.1.owner_id as _,
209 }
210 }
211}
212
213impl From<ObjectModel<table::Model>> for PbTable {
214 fn from(value: ObjectModel<table::Model>) -> Self {
215 Self {
216 id: value.0.table_id as _,
217 schema_id: value.1.schema_id.unwrap() as _,
218 database_id: value.1.database_id.unwrap() as _,
219 name: value.0.name,
220 columns: value.0.columns.to_protobuf(),
221 pk: value.0.pk.to_protobuf(),
222 table_type: PbTableType::from(value.0.table_type) as _,
223 distribution_key: value.0.distribution_key.0,
224 stream_key: value.0.stream_key.0,
225 append_only: value.0.append_only,
226 owner: value.1.owner_id as _,
227 fragment_id: value.0.fragment_id.unwrap_or_default() as u32,
228 vnode_col_index: value.0.vnode_col_index.map(|index| index as _),
229 row_id_index: value.0.row_id_index.map(|index| index as _),
230 value_indices: value.0.value_indices.0,
231 definition: value.0.definition,
232 handle_pk_conflict_behavior: PbHandleConflictBehavior::from(
233 value.0.handle_pk_conflict_behavior,
234 ) as _,
235 version_column_index: value.0.version_column_index.map(|x| x as u32),
236 read_prefix_len_hint: value.0.read_prefix_len_hint as _,
237 watermark_indices: value.0.watermark_indices.0,
238 dist_key_in_pk: value.0.dist_key_in_pk.0,
239 dml_fragment_id: value.0.dml_fragment_id.map(|id| id as u32),
240 cardinality: value
241 .0
242 .cardinality
243 .map(|cardinality| cardinality.to_protobuf()),
244 initialized_at_epoch: Some(
245 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
246 ),
247 created_at_epoch: Some(
248 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
249 ),
250 cleaned_by_watermark: value.0.cleaned_by_watermark,
251 stream_job_status: PbStreamJobStatus::Created as _,
252 create_type: PbCreateType::Foreground as _,
253 version: value.0.version.map(|v| v.to_protobuf()),
254 optional_associated_source_id: value
255 .0
256 .optional_associated_source_id
257 .map(|id| PbOptionalAssociatedSourceId::AssociatedSourceId(id as _)),
258 description: value.0.description,
259 incoming_sinks: value.0.incoming_sinks.into_u32_array(),
260 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
261 created_at_cluster_version: value.1.created_at_cluster_version,
262 retention_seconds: value.0.retention_seconds.map(|id| id as u32),
263 cdc_table_id: value.0.cdc_table_id,
264 maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(),
265 webhook_info: value.0.webhook_info.map(|info| info.to_protobuf()),
266 job_id: value.0.belongs_to_job_id.map(|id| id as _),
267 engine: value.0.engine.map(|engine| PbEngine::from(engine) as i32),
268 clean_watermark_index_in_pk: value.0.clean_watermark_index_in_pk,
269 refreshable: value.0.refreshable,
270 vector_index_info: value.0.vector_index_info.map(|index| index.to_protobuf()),
271 }
272 }
273}
274
275impl From<ObjectModel<source::Model>> for PbSource {
276 fn from(value: ObjectModel<source::Model>) -> Self {
277 let mut secret_ref_map = BTreeMap::new();
278 if let Some(secret_ref) = value.0.secret_ref {
279 secret_ref_map = secret_ref.to_protobuf();
280 }
281 Self {
282 id: value.0.source_id as _,
283 schema_id: value.1.schema_id.unwrap() as _,
284 database_id: value.1.database_id.unwrap() as _,
285 name: value.0.name,
286 row_id_index: value.0.row_id_index.map(|id| id as _),
287 columns: value.0.columns.to_protobuf(),
288 pk_column_ids: value.0.pk_column_ids.0,
289 with_properties: value.0.with_properties.0,
290 owner: value.1.owner_id as _,
291 info: value.0.source_info.map(|info| info.to_protobuf()),
292 watermark_descs: value.0.watermark_descs.to_protobuf(),
293 definition: value.0.definition,
294 connection_id: value.0.connection_id.map(|id| id as _),
295 initialized_at_epoch: Some(
297 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
298 ),
299 created_at_epoch: Some(
300 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
301 ),
302 version: value.0.version as _,
303 optional_associated_table_id: value
304 .0
305 .optional_associated_table_id
306 .map(|id| PbOptionalAssociatedTableId::AssociatedTableId(id as _)),
307 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
308 created_at_cluster_version: value.1.created_at_cluster_version,
309 secret_refs: secret_ref_map,
310 rate_limit: value.0.rate_limit.map(|v| v as _),
311 }
312 }
313}
314
315impl From<ObjectModel<sink::Model>> for PbSink {
316 fn from(value: ObjectModel<sink::Model>) -> Self {
317 let mut secret_ref_map = BTreeMap::new();
318 if let Some(secret_ref) = value.0.secret_ref {
319 secret_ref_map = secret_ref.to_protobuf();
320 }
321 Self {
322 id: value.0.sink_id as _,
323 schema_id: value.1.schema_id.unwrap() as _,
324 database_id: value.1.database_id.unwrap() as _,
325 name: value.0.name,
326 columns: value.0.columns.to_protobuf(),
327 plan_pk: value.0.plan_pk.to_protobuf(),
328 distribution_key: value.0.distribution_key.0,
329 downstream_pk: value.0.downstream_pk.0,
330 sink_type: PbSinkType::from(value.0.sink_type) as _,
331 owner: value.1.owner_id as _,
332 properties: value.0.properties.0,
333 definition: value.0.definition,
334 connection_id: value.0.connection_id.map(|id| id as _),
335 initialized_at_epoch: Some(
336 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
337 ),
338 created_at_epoch: Some(
339 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
340 ),
341 db_name: value.0.db_name,
342 sink_from_name: value.0.sink_from_name,
343 stream_job_status: PbStreamJobStatus::Created as _,
344 format_desc: value.0.sink_format_desc.map(|desc| desc.to_protobuf()),
345 target_table: value.0.target_table.map(|id| id as _),
346 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
347 created_at_cluster_version: value.1.created_at_cluster_version,
348 create_type: PbCreateType::Foreground as _,
349 secret_refs: secret_ref_map,
350 original_target_columns: value
351 .0
352 .original_target_columns
353 .map(|cols| cols.to_protobuf())
354 .unwrap_or_default(),
355 auto_refresh_schema_from_table: value
356 .0
357 .auto_refresh_schema_from_table
358 .map(|id| id as _),
359 }
360 }
361}
362
363impl From<ObjectModel<subscription::Model>> for PbSubscription {
364 fn from(value: ObjectModel<subscription::Model>) -> Self {
365 Self {
366 id: value.0.subscription_id as _,
367 schema_id: value.1.schema_id.unwrap() as _,
368 database_id: value.1.database_id.unwrap() as _,
369 name: value.0.name,
370 owner: value.1.owner_id as _,
371 retention_seconds: value.0.retention_seconds as _,
372 definition: value.0.definition,
373 initialized_at_epoch: Some(
374 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
375 ),
376 created_at_epoch: Some(
377 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
378 ),
379 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
380 created_at_cluster_version: value.1.created_at_cluster_version,
381 dependent_table_id: value.0.dependent_table_id as _,
382 subscription_state: value.0.subscription_state as _,
383 }
384 }
385}
386
387impl From<ObjectModel<index::Model>> for PbIndex {
388 fn from(value: ObjectModel<index::Model>) -> Self {
389 Self {
390 id: value.0.index_id as _,
391 schema_id: value.1.schema_id.unwrap() as _,
392 database_id: value.1.database_id.unwrap() as _,
393 name: value.0.name,
394 owner: value.1.owner_id as _,
395 index_table_id: value.0.index_table_id as _,
396 primary_table_id: value.0.primary_table_id as _,
397 index_item: value.0.index_items.to_protobuf(),
398 index_column_properties: value
399 .0
400 .index_column_properties
401 .map(|p| p.to_protobuf())
402 .unwrap_or_default(),
403 index_columns_len: value.0.index_columns_len as _,
404 initialized_at_epoch: Some(
405 Epoch::from_unix_millis(value.1.initialized_at.and_utc().timestamp_millis() as _).0,
406 ),
407 created_at_epoch: Some(
408 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
409 ),
410 stream_job_status: PbStreamJobStatus::Created as _,
411 initialized_at_cluster_version: value.1.initialized_at_cluster_version,
412 created_at_cluster_version: value.1.created_at_cluster_version,
413 }
414 }
415}
416
417impl From<ObjectModel<view::Model>> for PbView {
418 fn from(value: ObjectModel<view::Model>) -> Self {
419 Self {
420 id: value.0.view_id as _,
421 schema_id: value.1.schema_id.unwrap() as _,
422 database_id: value.1.database_id.unwrap() as _,
423 name: value.0.name,
424 owner: value.1.owner_id as _,
425 properties: value.0.properties.0,
426 sql: value.0.definition,
427 columns: value.0.columns.to_protobuf(),
428 }
429 }
430}
431
432impl From<ObjectModel<connection::Model>> for PbConnection {
433 fn from(value: ObjectModel<connection::Model>) -> Self {
434 let info: PbConnectionInfo = if value.0.info == PrivateLinkService::default() {
435 PbConnectionInfo::ConnectionParams(value.0.params.to_protobuf())
436 } else {
437 PbConnectionInfo::PrivateLinkService(value.0.info.to_protobuf())
438 };
439 Self {
440 id: value.1.oid as _,
441 schema_id: value.1.schema_id.unwrap() as _,
442 database_id: value.1.database_id.unwrap() as _,
443 name: value.0.name,
444 owner: value.1.owner_id as _,
445 info: Some(info),
446 }
447 }
448}
449
450impl From<ObjectModel<function::Model>> for PbFunction {
451 fn from(value: ObjectModel<function::Model>) -> Self {
452 Self {
453 id: value.0.function_id as _,
454 schema_id: value.1.schema_id.unwrap() as _,
455 database_id: value.1.database_id.unwrap() as _,
456 name: value.0.name,
457 owner: value.1.owner_id as _,
458 arg_names: value.0.arg_names.split(',').map(|s| s.to_owned()).collect(),
459 arg_types: value.0.arg_types.to_protobuf(),
460 return_type: Some(value.0.return_type.to_protobuf()),
461 language: value.0.language,
462 runtime: value.0.runtime,
463 link: value.0.link,
464 name_in_runtime: value.0.name_in_runtime,
465 body: value.0.body,
466 compressed_binary: value.0.compressed_binary,
467 kind: Some(value.0.kind.into()),
468 always_retry_on_network_error: value.0.always_retry_on_network_error,
469 is_async: value
470 .0
471 .options
472 .as_ref()
473 .and_then(|o| o.0.get("async").map(|v| v == "true")),
474 is_batched: value
475 .0
476 .options
477 .as_ref()
478 .and_then(|o| o.0.get("batch").map(|v| v == "true")),
479 created_at_epoch: Some(
480 Epoch::from_unix_millis(value.1.created_at.and_utc().timestamp_millis() as _).0,
481 ),
482 created_at_cluster_version: value.1.created_at_cluster_version,
483 }
484 }
485}