risingwave_meta/controller/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::time::Duration;
17
18use anyhow::{Context, anyhow};
19use risingwave_common::bail;
20use risingwave_common::cast::datetime_to_timestamp_millis;
21use risingwave_common::hash::VnodeCount;
22use risingwave_common::util::epoch::Epoch;
23use risingwave_meta_model::{
24    PrivateLinkService, connection, database, function, index, object, schema, secret, sink,
25    source, streaming_job as streaming_job_model, subscription, table, view,
26};
27use risingwave_meta_model_migration::{MigrationStatus, Migrator, MigratorTrait};
28use risingwave_pb::catalog::connection::PbInfo as PbConnectionInfo;
29use risingwave_pb::catalog::table::{CdcTableType as PbCdcTableType, PbEngine, PbTableType};
30use risingwave_pb::catalog::{
31    PbConnection, PbCreateType, PbDatabase, PbFunction, PbHandleConflictBehavior, PbIndex,
32    PbSchema, PbSecret, PbSink, PbSinkType, PbSource, PbStreamJobStatus, PbSubscription, PbTable,
33    PbView,
34};
35use sea_orm::{ConnectOptions, DatabaseConnection, DbBackend, ModelTrait};
36
37use crate::{MetaError, MetaResult, MetaStoreBackend};
38
39pub mod catalog;
40pub mod cluster;
41pub mod fragment;
42pub mod id;
43pub mod rename;
44pub mod scale;
45pub mod session_params;
46pub mod streaming_job;
47pub mod system_param;
48pub mod user;
49pub mod utils;
50
51// todo: refine the error transform.
52impl From<sea_orm::DbErr> for MetaError {
53    fn from(err: sea_orm::DbErr) -> Self {
54        if let Some(err) = err.sql_err() {
55            return anyhow!(err).into();
56        }
57        anyhow!(err).into()
58    }
59}
60
61#[derive(Clone)]
62pub struct SqlMetaStore {
63    pub conn: DatabaseConnection,
64    pub endpoint: String,
65}
66
67impl SqlMetaStore {
68    /// Connect to the SQL meta store based on the given configuration.
69    pub async fn connect(backend: MetaStoreBackend) -> MetaResult<Self> {
70        const MAX_DURATION: Duration = Duration::new(u64::MAX / 4, 0);
71
72        #[easy_ext::ext]
73        impl ConnectOptions {
74            /// Apply common settings for `SQLite` connections.
75            fn sqlite_common(&mut self) -> &mut Self {
76                self
77                    // Since Sqlite is prone to the error "(code: 5) database is locked" under concurrent access,
78                    // here we forcibly specify the number of connections as 1.
79                    .min_connections(1)
80                    .max_connections(1)
81                    // Workaround for https://github.com/risingwavelabs/risingwave/issues/18966.
82                    // Note: don't quite get the point but `acquire_timeout` and `connect_timeout` maps to the
83                    //       same underlying setting in `sqlx` under current implementation.
84                    .acquire_timeout(MAX_DURATION)
85                    .connect_timeout(MAX_DURATION)
86            }
87        }
88
89        Ok(match backend {
90            MetaStoreBackend::Mem => {
91                const IN_MEMORY_STORE: &str = "sqlite::memory:";
92
93                let mut options = ConnectOptions::new(IN_MEMORY_STORE);
94
95                options
96                    .sqlite_common()
97                    // Releasing the connection to in-memory SQLite database is unacceptable
98                    // because it will clear the database. Set a large enough timeout to prevent it.
99                    // `sqlx` actually supports disabling these timeouts by passing a `None`, but
100                    // `sea-orm` does not expose this option.
101                    .idle_timeout(MAX_DURATION)
102                    .max_lifetime(MAX_DURATION);
103
104                let conn = sea_orm::Database::connect(options).await?;
105                Self {
106                    conn,
107                    endpoint: IN_MEMORY_STORE.to_owned(),
108                }
109            }
110            MetaStoreBackend::Sql { endpoint, config } => {
111                let mut options = ConnectOptions::new(endpoint.clone());
112                options
113                    .max_connections(config.max_connections)
114                    .min_connections(config.min_connections)
115                    .connect_timeout(Duration::from_secs(config.connection_timeout_sec))
116                    .idle_timeout(Duration::from_secs(config.idle_timeout_sec))
117                    .acquire_timeout(Duration::from_secs(config.acquire_timeout_sec));
118
119                if DbBackend::Sqlite.is_prefix_of(&endpoint) {
120                    if endpoint.contains(":memory:") || endpoint.contains("mode=memory") {
121                        bail!(
122                            "use the `mem` backend instead of specifying a URL of in-memory SQLite"
123                        );
124                    }
125                    options.sqlite_common();
126                }
127
128                let conn = sea_orm::Database::connect(options).await?;
129                Self { conn, endpoint }
130            }
131        })
132    }
133
134    #[cfg(any(test, feature = "test"))]
135    pub async fn for_test() -> Self {
136        let this = Self::connect(MetaStoreBackend::Mem).await.unwrap();
137        Migrator::up(&this.conn, None).await.unwrap();
138        this
139    }
140
141    /// Check whether the cluster, which uses SQL as the backend, is a new cluster.
142    /// It determines this by inspecting the applied migrations. If the migration `m20230908_072257_init` has been applied,
143    /// then it is considered an old cluster.
144    ///
145    /// Note: this check should be performed before [`Self::up()`].
146    async fn is_first_launch(&self) -> MetaResult<bool> {
147        let migrations = Migrator::get_applied_migrations(&self.conn)
148            .await
149            .context("failed to get applied migrations")?;
150        for migration in migrations {
151            if migration.name() == "m20230908_072257_init"
152                && migration.status() == MigrationStatus::Applied
153            {
154                return Ok(false);
155            }
156        }
157        Ok(true)
158    }
159
160    /// Apply all the migrations to the meta store before starting the service.
161    ///
162    /// Returns whether the cluster is the first launch.
163    pub async fn up(&self) -> MetaResult<bool> {
164        let cluster_first_launch = self.is_first_launch().await?;
165        // Try to upgrade if any new model changes are added.
166        Migrator::up(&self.conn, None)
167            .await
168            .context("failed to upgrade models in meta store")?;
169
170        Ok(cluster_first_launch)
171    }
172}
173
174pub struct ObjectModel<M: ModelTrait>(M, object::Model, Option<streaming_job_model::Model>);
175
176fn stream_job_status_and_create_type(
177    streaming_job: &Option<streaming_job_model::Model>,
178) -> (i32, i32) {
179    streaming_job
180        .as_ref()
181        .map(|job| {
182            (
183                PbStreamJobStatus::from(job.job_status) as i32,
184                PbCreateType::from(job.create_type.clone()) as i32,
185            )
186        })
187        .unwrap_or((
188            PbStreamJobStatus::Created as i32,
189            PbCreateType::Foreground as i32,
190        ))
191}
192
193impl From<ObjectModel<database::Model>> for PbDatabase {
194    fn from(value: ObjectModel<database::Model>) -> Self {
195        Self {
196            id: value.0.database_id,
197            name: value.0.name,
198            owner: value.1.owner_id as _,
199            resource_group: value.0.resource_group.clone(),
200            barrier_interval_ms: value.0.barrier_interval_ms.map(|v| v as u32),
201            checkpoint_frequency: value.0.checkpoint_frequency.map(|v| v as u64),
202        }
203    }
204}
205
206impl From<ObjectModel<secret::Model>> for PbSecret {
207    fn from(value: ObjectModel<secret::Model>) -> Self {
208        Self {
209            id: value.0.secret_id,
210            name: value.0.name,
211            database_id: value.1.database_id.unwrap(),
212            value: value.0.value,
213            owner: value.1.owner_id as _,
214            schema_id: value.1.schema_id.unwrap(),
215        }
216    }
217}
218
219impl From<ObjectModel<schema::Model>> for PbSchema {
220    fn from(value: ObjectModel<schema::Model>) -> Self {
221        Self {
222            id: value.0.schema_id,
223            name: value.0.name,
224            database_id: value.1.database_id.unwrap(),
225            owner: value.1.owner_id as _,
226        }
227    }
228}
229
230impl From<ObjectModel<table::Model>> for PbTable {
231    fn from(value: ObjectModel<table::Model>) -> Self {
232        let (stream_job_status, create_type) = stream_job_status_and_create_type(&value.2);
233        Self {
234            id: value.0.table_id,
235            schema_id: value.1.schema_id.unwrap(),
236            database_id: value.1.database_id.unwrap(),
237            name: value.0.name,
238            columns: value.0.columns.to_protobuf(),
239            pk: value.0.pk.to_protobuf(),
240            table_type: PbTableType::from(value.0.table_type) as _,
241            distribution_key: value.0.distribution_key.0,
242            stream_key: value.0.stream_key.0,
243            append_only: value.0.append_only,
244            owner: value.1.owner_id as _,
245            fragment_id: value.0.fragment_id.unwrap_or_default(),
246            vnode_col_index: value.0.vnode_col_index.map(|index| index as _),
247            row_id_index: value.0.row_id_index.map(|index| index as _),
248            value_indices: value.0.value_indices.0,
249            definition: value.0.definition,
250            handle_pk_conflict_behavior: PbHandleConflictBehavior::from(
251                value.0.handle_pk_conflict_behavior,
252            ) as _,
253            version_column_indices: value
254                .0
255                .version_column_indices
256                .unwrap_or_default()
257                .0
258                .iter()
259                .map(|&idx| idx as u32)
260                .collect(),
261            read_prefix_len_hint: value.0.read_prefix_len_hint as _,
262            watermark_indices: value.0.watermark_indices.0,
263            dist_key_in_pk: value.0.dist_key_in_pk.0,
264            dml_fragment_id: value.0.dml_fragment_id,
265            cardinality: value
266                .0
267                .cardinality
268                .map(|cardinality| cardinality.to_protobuf()),
269            initialized_at_epoch: Some(
270                Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.initialized_at) as _)
271                    .0,
272            ),
273            created_at_epoch: Some(
274                Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.created_at) as _).0,
275            ),
276            #[expect(deprecated)]
277            cleaned_by_watermark: value.0.cleaned_by_watermark,
278            stream_job_status,
279            create_type,
280            version: value.0.version.map(|v| v.to_protobuf()),
281            optional_associated_source_id: value.0.optional_associated_source_id.map(Into::into),
282            description: value.0.description,
283            #[expect(deprecated)]
284            incoming_sinks: vec![],
285            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
286            created_at_cluster_version: value.1.created_at_cluster_version,
287            retention_seconds: value.0.retention_seconds.map(|id| id as u32),
288            cdc_table_id: value.0.cdc_table_id,
289            maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(),
290            webhook_info: value.0.webhook_info.map(|info| info.to_protobuf()),
291            job_id: value.0.belongs_to_job_id,
292            engine: value.0.engine.map(|engine| PbEngine::from(engine) as i32),
293            #[expect(deprecated)]
294            clean_watermark_index_in_pk: value.0.clean_watermark_index_in_pk,
295            clean_watermark_indices: value
296                .0
297                .clean_watermark_indices
298                .map(|indices| indices.0.iter().map(|&x| x as u32).collect())
299                .unwrap_or_default(),
300            refreshable: value.0.refreshable,
301            vector_index_info: value.0.vector_index_info.map(|index| index.to_protobuf()),
302            cdc_table_type: value
303                .0
304                .cdc_table_type
305                .map(|cdc_type| PbCdcTableType::from(cdc_type) as i32),
306        }
307    }
308}
309
310impl From<ObjectModel<source::Model>> for PbSource {
311    fn from(value: ObjectModel<source::Model>) -> Self {
312        let mut secret_ref_map = BTreeMap::new();
313        if let Some(secret_ref) = value.0.secret_ref {
314            secret_ref_map = secret_ref.to_protobuf();
315        }
316        Self {
317            id: value.0.source_id as _,
318            schema_id: value.1.schema_id.unwrap(),
319            database_id: value.1.database_id.unwrap(),
320            name: value.0.name,
321            row_id_index: value.0.row_id_index.map(|id| id as _),
322            columns: value.0.columns.to_protobuf(),
323            pk_column_ids: value.0.pk_column_ids.0,
324            with_properties: value.0.with_properties.0,
325            owner: value.1.owner_id as _,
326            info: value.0.source_info.map(|info| info.to_protobuf()),
327            watermark_descs: value.0.watermark_descs.to_protobuf(),
328            definition: value.0.definition,
329            connection_id: value.0.connection_id,
330            // todo: using the timestamp from the database directly.
331            initialized_at_epoch: Some(
332                Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.initialized_at) as _)
333                    .0,
334            ),
335            created_at_epoch: Some(
336                Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.created_at) as _).0,
337            ),
338            version: value.0.version as _,
339            optional_associated_table_id: value.0.optional_associated_table_id.map(Into::into),
340            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
341            created_at_cluster_version: value.1.created_at_cluster_version,
342            secret_refs: secret_ref_map,
343            rate_limit: value.0.rate_limit.map(|v| v as _),
344            refresh_mode: value
345                .0
346                .refresh_mode
347                .map(|refresh_mode| refresh_mode.to_protobuf()),
348        }
349    }
350}
351
352impl From<ObjectModel<sink::Model>> for PbSink {
353    fn from(value: ObjectModel<sink::Model>) -> Self {
354        let (stream_job_status, create_type) = stream_job_status_and_create_type(&value.2);
355        let mut secret_ref_map = BTreeMap::new();
356        if let Some(secret_ref) = value.0.secret_ref {
357            secret_ref_map = secret_ref.to_protobuf();
358        }
359        Self {
360            id: value.0.sink_id as _,
361            schema_id: value.1.schema_id.unwrap(),
362            database_id: value.1.database_id.unwrap(),
363            name: value.0.name,
364            columns: value.0.columns.to_protobuf(),
365            plan_pk: value.0.plan_pk.to_protobuf(),
366            distribution_key: value.0.distribution_key.0,
367            downstream_pk: value.0.downstream_pk.0,
368            sink_type: PbSinkType::from(value.0.sink_type) as _,
369            raw_ignore_delete: value.0.ignore_delete,
370            owner: value.1.owner_id as _,
371            properties: value.0.properties.0,
372            definition: value.0.definition,
373            connection_id: value.0.connection_id,
374            initialized_at_epoch: Some(
375                Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.initialized_at) as _)
376                    .0,
377            ),
378            created_at_epoch: Some(
379                Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.created_at) as _).0,
380            ),
381            db_name: value.0.db_name,
382            sink_from_name: value.0.sink_from_name,
383            stream_job_status,
384            format_desc: value.0.sink_format_desc.map(|desc| desc.to_protobuf()),
385            target_table: value.0.target_table,
386            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
387            created_at_cluster_version: value.1.created_at_cluster_version,
388            create_type,
389            secret_refs: secret_ref_map,
390            original_target_columns: value
391                .0
392                .original_target_columns
393                .map(|cols| cols.to_protobuf())
394                .unwrap_or_default(),
395            auto_refresh_schema_from_table: value.0.auto_refresh_schema_from_table,
396        }
397    }
398}
399
400impl From<ObjectModel<subscription::Model>> for PbSubscription {
401    fn from(value: ObjectModel<subscription::Model>) -> Self {
402        Self {
403            id: value.0.subscription_id as _,
404            schema_id: value.1.schema_id.unwrap(),
405            database_id: value.1.database_id.unwrap(),
406            name: value.0.name,
407            owner: value.1.owner_id as _,
408            retention_seconds: value.0.retention_seconds as _,
409            definition: value.0.definition,
410            initialized_at_epoch: Some(
411                Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.initialized_at) as _)
412                    .0,
413            ),
414            created_at_epoch: Some(
415                Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.created_at) as _).0,
416            ),
417            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
418            created_at_cluster_version: value.1.created_at_cluster_version,
419            dependent_table_id: value.0.dependent_table_id,
420            subscription_state: value.0.subscription_state as _,
421        }
422    }
423}
424
425impl From<ObjectModel<index::Model>> for PbIndex {
426    fn from(value: ObjectModel<index::Model>) -> Self {
427        let (stream_job_status, create_type) = stream_job_status_and_create_type(&value.2);
428        Self {
429            id: value.0.index_id as _,
430            schema_id: value.1.schema_id.unwrap(),
431            database_id: value.1.database_id.unwrap(),
432            name: value.0.name,
433            owner: value.1.owner_id as _,
434            index_table_id: value.0.index_table_id,
435            primary_table_id: value.0.primary_table_id,
436            index_item: value.0.index_items.to_protobuf(),
437            index_column_properties: value
438                .0
439                .index_column_properties
440                .map(|p| p.to_protobuf())
441                .unwrap_or_default(),
442            index_columns_len: value.0.index_columns_len as _,
443            initialized_at_epoch: Some(
444                Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.initialized_at) as _)
445                    .0,
446            ),
447            created_at_epoch: Some(
448                Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.created_at) as _).0,
449            ),
450            stream_job_status,
451            initialized_at_cluster_version: value.1.initialized_at_cluster_version,
452            created_at_cluster_version: value.1.created_at_cluster_version,
453            create_type,
454        }
455    }
456}
457
458impl From<ObjectModel<view::Model>> for PbView {
459    fn from(value: ObjectModel<view::Model>) -> Self {
460        Self {
461            id: value.0.view_id as _,
462            schema_id: value.1.schema_id.unwrap(),
463            database_id: value.1.database_id.unwrap(),
464            name: value.0.name,
465            owner: value.1.owner_id as _,
466            properties: value.0.properties.0,
467            sql: value.0.definition,
468            columns: value.0.columns.to_protobuf(),
469            created_at_epoch: Some(
470                Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.created_at) as _).0,
471            ),
472            created_at_cluster_version: value.1.created_at_cluster_version,
473        }
474    }
475}
476
477impl From<ObjectModel<connection::Model>> for PbConnection {
478    fn from(value: ObjectModel<connection::Model>) -> Self {
479        let info: PbConnectionInfo = if value.0.info == PrivateLinkService::default() {
480            PbConnectionInfo::ConnectionParams(value.0.params.to_protobuf())
481        } else {
482            PbConnectionInfo::PrivateLinkService(value.0.info.to_protobuf())
483        };
484        Self {
485            id: value.1.oid.as_connection_id(),
486            schema_id: value.1.schema_id.unwrap(),
487            database_id: value.1.database_id.unwrap(),
488            name: value.0.name,
489            owner: value.1.owner_id as _,
490            info: Some(info),
491        }
492    }
493}
494
495impl From<ObjectModel<function::Model>> for PbFunction {
496    fn from(value: ObjectModel<function::Model>) -> Self {
497        Self {
498            id: value.0.function_id as _,
499            schema_id: value.1.schema_id.unwrap(),
500            database_id: value.1.database_id.unwrap(),
501            name: value.0.name,
502            owner: value.1.owner_id as _,
503            arg_names: value.0.arg_names.split(',').map(|s| s.to_owned()).collect(),
504            arg_types: value.0.arg_types.to_protobuf(),
505            return_type: Some(value.0.return_type.to_protobuf()),
506            language: value.0.language,
507            runtime: value.0.runtime,
508            link: value.0.link,
509            name_in_runtime: value.0.name_in_runtime,
510            body: value.0.body,
511            compressed_binary: value.0.compressed_binary,
512            kind: Some(value.0.kind.into()),
513            always_retry_on_network_error: value.0.always_retry_on_network_error,
514            is_async: value
515                .0
516                .options
517                .as_ref()
518                .and_then(|o| o.0.get("async").map(|v| v == "true")),
519            is_batched: value
520                .0
521                .options
522                .as_ref()
523                .and_then(|o| o.0.get("batch").map(|v| v == "true")),
524            created_at_epoch: Some(
525                Epoch::from_unix_millis(datetime_to_timestamp_millis(value.1.created_at) as _).0,
526            ),
527            created_at_cluster_version: value.1.created_at_cluster_version,
528        }
529    }
530}