risingwave_frontend/observer/
observer_manager.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use parking_lot::RwLock;
20use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
21use risingwave_common::catalog::CatalogVersion;
22use risingwave_common::hash::WorkerSlotMapping;
23use risingwave_common::license::LicenseManager;
24use risingwave_common::secret::LocalSecretManager;
25use risingwave_common::session_config::SessionConfig;
26use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
27use risingwave_common_service::ObserverState;
28use risingwave_hummock_sdk::FrontendHummockVersion;
29use risingwave_pb::common::WorkerNode;
30use risingwave_pb::hummock::{HummockVersionDeltas, HummockVersionStats};
31use risingwave_pb::meta::object::{ObjectInfo, PbObjectInfo};
32use risingwave_pb::meta::subscribe_response::{Info, Operation};
33use risingwave_pb::meta::{FragmentWorkerSlotMapping, MetaSnapshot, SubscribeResponse};
34use risingwave_rpc_client::ComputeClientPoolRef;
35use tokio::sync::watch::Sender;
36
37use crate::catalog::FragmentId;
38use crate::catalog::root_catalog::Catalog;
39use crate::scheduler::HummockSnapshotManagerRef;
40use crate::user::user_manager::UserInfoManager;
41
42pub struct FrontendObserverNode {
43    worker_node_manager: WorkerNodeManagerRef,
44    version: CatalogVersion,
45    catalog_updated_tx: Sender<CatalogVersion>,
46    catalog: Arc<RwLock<Catalog>>,
47    user_info_manager: Arc<RwLock<UserInfoManager>>,
48    hummock_snapshot_manager: HummockSnapshotManagerRef,
49    system_params_manager: LocalSystemParamsManagerRef,
50    session_params: Arc<RwLock<SessionConfig>>,
51    compute_client_pool: ComputeClientPoolRef,
52}
53
54impl ObserverState for FrontendObserverNode {
55    fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
56        risingwave_pb::meta::SubscribeType::Frontend
57    }
58
59    fn handle_notification(&mut self, resp: SubscribeResponse) {
60        let Some(info) = resp.info.as_ref() else {
61            return;
62        };
63
64        // TODO: this clone can be avoided
65        match info.to_owned() {
66            Info::Database(_)
67            | Info::Schema(_)
68            | Info::ObjectGroup(_)
69            | Info::Function(_)
70            | Info::Connection(_) => {
71                self.handle_catalog_notification(resp);
72            }
73            Info::Secret(_) => {
74                self.handle_catalog_notification(resp.clone());
75                self.handle_secret_notification(resp);
76            }
77            Info::Node(node) => {
78                self.update_worker_node_manager(resp.operation(), node);
79            }
80            Info::User(_) => {
81                self.handle_user_notification(resp);
82            }
83            Info::Snapshot(_) => {
84                panic!(
85                    "receiving a snapshot in the middle is unsupported now {:?}",
86                    resp
87                )
88            }
89            Info::HummockVersionDeltas(deltas) => {
90                self.handle_hummock_snapshot_notification(deltas);
91            }
92            Info::MetaBackupManifestId(_) => {
93                panic!("frontend node should not receive MetaBackupManifestId");
94            }
95            Info::HummockWriteLimits(_) => {
96                panic!("frontend node should not receive HummockWriteLimits");
97            }
98            Info::SystemParams(p) => {
99                self.system_params_manager.try_set_params(p);
100            }
101            Info::SessionParam(p) => {
102                self.session_params
103                    .write()
104                    .set(&p.param, p.value().to_owned(), &mut ())
105                    .unwrap();
106            }
107            Info::HummockStats(stats) => {
108                self.handle_table_stats_notification(stats);
109            }
110            Info::StreamingWorkerSlotMapping(_) => self.handle_fragment_mapping_notification(resp),
111            Info::ServingWorkerSlotMappings(m) => {
112                self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation())
113            }
114            Info::Recovery(_) => {
115                self.compute_client_pool.invalidate_all();
116            }
117            Info::ClusterResource(resource) => {
118                LicenseManager::get().update_cluster_resource(resource);
119            }
120        }
121    }
122
123    fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
124        let mut catalog_guard = self.catalog.write();
125        let mut user_guard = self.user_info_manager.write();
126        catalog_guard.clear();
127        user_guard.clear();
128
129        let Some(Info::Snapshot(snapshot)) = resp.info else {
130            unreachable!();
131        };
132        let MetaSnapshot {
133            databases,
134            schemas,
135            sources,
136            sinks,
137            tables,
138            indexes,
139            views,
140            subscriptions,
141            functions,
142            connections,
143            users,
144            nodes,
145            hummock_version,
146            meta_backup_manifest_id: _,
147            hummock_write_limits: _,
148            streaming_worker_slot_mappings,
149            serving_worker_slot_mappings,
150            session_params,
151            version,
152            secrets,
153            cluster_resource,
154            object_dependencies,
155        } = snapshot;
156
157        for db in databases {
158            catalog_guard.create_database(&db)
159        }
160        for schema in schemas {
161            catalog_guard.create_schema(&schema)
162        }
163        for source in sources {
164            catalog_guard.create_source(&source)
165        }
166        for sink in sinks {
167            catalog_guard.create_sink(&sink)
168        }
169        for subscription in subscriptions {
170            catalog_guard.create_subscription(&subscription)
171        }
172        for table in tables {
173            catalog_guard.create_table(&table)
174        }
175        for index in indexes {
176            catalog_guard.create_index(&index)
177        }
178        for view in views {
179            catalog_guard.create_view(&view)
180        }
181        for function in functions {
182            catalog_guard.create_function(&function)
183        }
184        for connection in connections {
185            catalog_guard.create_connection(&connection)
186        }
187        for secret in &secrets {
188            catalog_guard.create_secret(secret)
189        }
190        catalog_guard.set_object_dependencies(object_dependencies);
191        for user in users {
192            user_guard.create_user(user)
193        }
194
195        self.worker_node_manager.refresh(
196            nodes,
197            convert_worker_slot_mapping(&streaming_worker_slot_mappings),
198            convert_worker_slot_mapping(&serving_worker_slot_mappings),
199        );
200        self.hummock_snapshot_manager
201            .init(FrontendHummockVersion::from_protobuf(
202                hummock_version.unwrap(),
203            ));
204
205        let snapshot_version = version.unwrap();
206        self.version = snapshot_version.catalog_version;
207        self.catalog_updated_tx
208            .send(snapshot_version.catalog_version)
209            .unwrap();
210        *self.session_params.write() =
211            serde_json::from_str(&session_params.unwrap().params).unwrap();
212        LocalSecretManager::global().init_secrets(secrets);
213        LicenseManager::get().update_cluster_resource(cluster_resource.unwrap());
214    }
215}
216
217impl FrontendObserverNode {
218    pub fn new(
219        worker_node_manager: WorkerNodeManagerRef,
220        catalog: Arc<RwLock<Catalog>>,
221        catalog_updated_tx: Sender<CatalogVersion>,
222        user_info_manager: Arc<RwLock<UserInfoManager>>,
223        hummock_snapshot_manager: HummockSnapshotManagerRef,
224        system_params_manager: LocalSystemParamsManagerRef,
225        session_params: Arc<RwLock<SessionConfig>>,
226        compute_client_pool: ComputeClientPoolRef,
227    ) -> Self {
228        Self {
229            version: 0,
230            worker_node_manager,
231            catalog,
232            catalog_updated_tx,
233            user_info_manager,
234            hummock_snapshot_manager,
235            system_params_manager,
236            session_params,
237            compute_client_pool,
238        }
239    }
240
241    fn handle_table_stats_notification(&mut self, table_stats: HummockVersionStats) {
242        let mut catalog_guard = self.catalog.write();
243        catalog_guard.set_table_stats(table_stats);
244    }
245
246    fn handle_catalog_notification(&mut self, resp: SubscribeResponse) {
247        let Some(info) = resp.info.as_ref() else {
248            return;
249        };
250        tracing::trace!(op = ?resp.operation(), ?info, "handle catalog notification");
251
252        let mut catalog_guard = self.catalog.write();
253        match info {
254            Info::Database(database) => match resp.operation() {
255                Operation::Add => catalog_guard.create_database(database),
256                Operation::Delete => catalog_guard.drop_database(database.id),
257                Operation::Update => catalog_guard.update_database(database),
258                _ => panic!("receive an unsupported notify {:?}", resp),
259            },
260            Info::Schema(schema) => match resp.operation() {
261                Operation::Add => catalog_guard.create_schema(schema),
262                Operation::Delete => catalog_guard.drop_schema(schema.database_id, schema.id),
263                Operation::Update => catalog_guard.update_schema(schema),
264                _ => panic!("receive an unsupported notify {:?}", resp),
265            },
266            Info::ObjectGroup(object_group) => {
267                if !object_group.dependencies.is_empty() {
268                    catalog_guard.insert_object_dependencies(object_group.dependencies.clone());
269                }
270                for object in &object_group.objects {
271                    let Some(obj) = object.object_info.as_ref() else {
272                        continue;
273                    };
274                    match obj {
275                        ObjectInfo::Database(db) => match resp.operation() {
276                            Operation::Add => catalog_guard.create_database(db),
277                            Operation::Delete => catalog_guard.drop_database(db.id),
278                            Operation::Update => catalog_guard.update_database(db),
279                            _ => panic!("receive an unsupported notify {:?}", resp),
280                        },
281                        ObjectInfo::Schema(schema) => match resp.operation() {
282                            Operation::Add => catalog_guard.create_schema(schema),
283                            Operation::Delete => {
284                                catalog_guard.drop_schema(schema.database_id, schema.id)
285                            }
286                            Operation::Update => catalog_guard.update_schema(schema),
287                            _ => panic!("receive an unsupported notify {:?}", resp),
288                        },
289                        PbObjectInfo::Table(table) => match resp.operation() {
290                            Operation::Add => catalog_guard.create_table(table),
291                            Operation::Delete => catalog_guard.drop_table(
292                                table.database_id,
293                                table.schema_id,
294                                table.id,
295                            ),
296                            Operation::Update => {
297                                let old_fragment_id = catalog_guard
298                                    .get_any_table_by_id(table.id)
299                                    .unwrap()
300                                    .fragment_id;
301                                catalog_guard.update_table(table);
302                                if old_fragment_id != table.fragment_id {
303                                    // FIXME: the frontend node delete its fragment for the update
304                                    // operation by itself.
305                                    self.worker_node_manager
306                                        .remove_streaming_fragment_mapping(&old_fragment_id);
307                                }
308                            }
309                            _ => panic!("receive an unsupported notify {:?}", resp),
310                        },
311                        PbObjectInfo::Source(source) => match resp.operation() {
312                            Operation::Add => catalog_guard.create_source(source),
313                            Operation::Delete => catalog_guard.drop_source(
314                                source.database_id,
315                                source.schema_id,
316                                source.id,
317                            ),
318                            Operation::Update => catalog_guard.update_source(source),
319                            _ => panic!("receive an unsupported notify {:?}", resp),
320                        },
321                        PbObjectInfo::Sink(sink) => match resp.operation() {
322                            Operation::Add => catalog_guard.create_sink(sink),
323                            Operation::Delete => {
324                                catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id)
325                            }
326                            Operation::Update => catalog_guard.update_sink(sink),
327                            _ => panic!("receive an unsupported notify {:?}", resp),
328                        },
329                        PbObjectInfo::Subscription(subscription) => match resp.operation() {
330                            Operation::Add => catalog_guard.create_subscription(subscription),
331                            Operation::Delete => catalog_guard.drop_subscription(
332                                subscription.database_id,
333                                subscription.schema_id,
334                                subscription.id,
335                            ),
336                            Operation::Update => catalog_guard.update_subscription(subscription),
337                            _ => panic!("receive an unsupported notify {:?}", resp),
338                        },
339                        PbObjectInfo::Index(index) => match resp.operation() {
340                            Operation::Add => catalog_guard.create_index(index),
341                            Operation::Delete => catalog_guard.drop_index(
342                                index.database_id,
343                                index.schema_id,
344                                index.id,
345                            ),
346                            Operation::Update => catalog_guard.update_index(index),
347                            _ => panic!("receive an unsupported notify {:?}", resp),
348                        },
349                        PbObjectInfo::View(view) => match resp.operation() {
350                            Operation::Add => catalog_guard.create_view(view),
351                            Operation::Delete => {
352                                catalog_guard.drop_view(view.database_id, view.schema_id, view.id)
353                            }
354                            Operation::Update => catalog_guard.update_view(view),
355                            _ => panic!("receive an unsupported notify {:?}", resp),
356                        },
357                        ObjectInfo::Function(function) => match resp.operation() {
358                            Operation::Add => catalog_guard.create_function(function),
359                            Operation::Delete => catalog_guard.drop_function(
360                                function.database_id,
361                                function.schema_id,
362                                function.id,
363                            ),
364                            Operation::Update => catalog_guard.update_function(function),
365                            _ => panic!("receive an unsupported notify {:?}", resp),
366                        },
367                        ObjectInfo::Connection(connection) => match resp.operation() {
368                            Operation::Add => catalog_guard.create_connection(connection),
369                            Operation::Delete => catalog_guard.drop_connection(
370                                connection.database_id,
371                                connection.schema_id,
372                                connection.id,
373                            ),
374                            Operation::Update => catalog_guard.update_connection(connection),
375                            _ => panic!("receive an unsupported notify {:?}", resp),
376                        },
377                        ObjectInfo::Secret(secret) => {
378                            let mut secret = secret.clone();
379                            // The secret value should not be revealed to users. So mask it in the frontend catalog.
380                            secret.value =
381                                "SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
382                            match resp.operation() {
383                                Operation::Add => catalog_guard.create_secret(&secret),
384                                Operation::Delete => catalog_guard.drop_secret(
385                                    secret.database_id,
386                                    secret.schema_id,
387                                    secret.id,
388                                ),
389                                Operation::Update => catalog_guard.update_secret(&secret),
390                                _ => panic!("receive an unsupported notify {:?}", resp),
391                            }
392                        }
393                    }
394                }
395            }
396            Info::Function(function) => match resp.operation() {
397                Operation::Add => catalog_guard.create_function(function),
398                Operation::Delete => catalog_guard.drop_function(
399                    function.database_id,
400                    function.schema_id,
401                    function.id,
402                ),
403                Operation::Update => catalog_guard.update_function(function),
404                _ => panic!("receive an unsupported notify {:?}", resp),
405            },
406            Info::Connection(connection) => match resp.operation() {
407                Operation::Add => catalog_guard.create_connection(connection),
408                Operation::Delete => catalog_guard.drop_connection(
409                    connection.database_id,
410                    connection.schema_id,
411                    connection.id,
412                ),
413                Operation::Update => catalog_guard.update_connection(connection),
414                _ => panic!("receive an unsupported notify {:?}", resp),
415            },
416            Info::Secret(secret) => {
417                let mut secret = secret.clone();
418                // The secret value should not be revealed to users. So mask it in the frontend catalog.
419                secret.value = "SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
420                match resp.operation() {
421                    Operation::Add => catalog_guard.create_secret(&secret),
422                    Operation::Delete => {
423                        catalog_guard.drop_secret(secret.database_id, secret.schema_id, secret.id)
424                    }
425                    Operation::Update => catalog_guard.update_secret(&secret),
426                    _ => panic!("receive an unsupported notify {:?}", resp),
427                }
428            }
429            _ => unreachable!(),
430        }
431        assert!(
432            resp.version > self.version,
433            "resp version={:?}, current version={:?}",
434            resp.version,
435            self.version
436        );
437        self.version = resp.version;
438        self.catalog_updated_tx.send(resp.version).unwrap();
439    }
440
441    fn handle_user_notification(&mut self, resp: SubscribeResponse) {
442        let Some(info) = resp.info.as_ref() else {
443            return;
444        };
445
446        let mut user_guard = self.user_info_manager.write();
447        match info {
448            Info::User(user) => match resp.operation() {
449                Operation::Add => user_guard.create_user(user.clone()),
450                Operation::Delete => user_guard.drop_user(user.id),
451                Operation::Update => user_guard.update_user(user.clone()),
452                _ => panic!("receive an unsupported notify {:?}", resp),
453            },
454            _ => unreachable!(),
455        }
456        assert!(
457            resp.version > self.version,
458            "resp version={:?}, current version={:?}",
459            resp.version,
460            self.version
461        );
462        self.version = resp.version;
463        self.catalog_updated_tx.send(resp.version).unwrap();
464    }
465
466    fn handle_fragment_mapping_notification(&mut self, resp: SubscribeResponse) {
467        let Some(info) = resp.info.as_ref() else {
468            return;
469        };
470        match info {
471            Info::StreamingWorkerSlotMapping(streaming_worker_slot_mapping) => {
472                let fragment_id = streaming_worker_slot_mapping.fragment_id;
473                let mapping = || {
474                    WorkerSlotMapping::from_protobuf(
475                        streaming_worker_slot_mapping.mapping.as_ref().unwrap(),
476                    )
477                };
478
479                match resp.operation() {
480                    Operation::Add => {
481                        self.worker_node_manager
482                            .insert_streaming_fragment_mapping(fragment_id, mapping());
483                    }
484                    Operation::Delete => {
485                        self.worker_node_manager
486                            .remove_streaming_fragment_mapping(&fragment_id);
487                    }
488                    Operation::Update => {
489                        self.worker_node_manager
490                            .update_streaming_fragment_mapping(fragment_id, mapping());
491                    }
492                    _ => panic!("receive an unsupported notify {:?}", resp),
493                }
494            }
495            _ => unreachable!(),
496        }
497    }
498
499    fn handle_fragment_serving_mapping_notification(
500        &mut self,
501        mappings: Vec<FragmentWorkerSlotMapping>,
502        op: Operation,
503    ) {
504        match op {
505            Operation::Add | Operation::Update => {
506                self.worker_node_manager
507                    .upsert_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
508            }
509            Operation::Delete => self.worker_node_manager.remove_serving_fragment_mapping(
510                mappings
511                    .into_iter()
512                    .map(|m| m.fragment_id)
513                    .collect_vec()
514                    .as_slice(),
515            ),
516            Operation::Snapshot => {
517                self.worker_node_manager
518                    .set_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
519            }
520            _ => panic!("receive an unsupported notify {:?}", op),
521        }
522    }
523
524    /// Update max committed epoch in `HummockSnapshotManager`.
525    fn handle_hummock_snapshot_notification(&self, deltas: HummockVersionDeltas) {
526        self.hummock_snapshot_manager.update(deltas);
527    }
528
529    fn handle_secret_notification(&mut self, resp: SubscribeResponse) {
530        let resp_op = resp.operation();
531        let Some(Info::Secret(secret)) = resp.info else {
532            unreachable!();
533        };
534        match resp_op {
535            Operation::Add => {
536                LocalSecretManager::global().add_secret(secret.id, secret.value);
537            }
538            Operation::Delete => {
539                LocalSecretManager::global().remove_secret(secret.id);
540            }
541            Operation::Update => {
542                LocalSecretManager::global().update_secret(secret.id, secret.value);
543            }
544            _ => {
545                panic!("invalid notification operation: {resp_op:?}");
546            }
547        }
548    }
549
550    /// `update_worker_node_manager` is called in `start` method.
551    /// It calls `add_worker_node` and `remove_worker_node` of `WorkerNodeManager`.
552    fn update_worker_node_manager(&self, operation: Operation, node: WorkerNode) {
553        tracing::debug!(
554            "Update worker nodes, operation: {:?}, node: {:?}",
555            operation,
556            node
557        );
558
559        match operation {
560            Operation::Add => self.worker_node_manager.add_worker_node(node),
561            Operation::Delete => self.worker_node_manager.remove_worker_node(node),
562            _ => (),
563        }
564    }
565}
566
567fn convert_worker_slot_mapping(
568    worker_slot_mappings: &[FragmentWorkerSlotMapping],
569) -> HashMap<FragmentId, WorkerSlotMapping> {
570    worker_slot_mappings
571        .iter()
572        .map(
573            |FragmentWorkerSlotMapping {
574                 fragment_id,
575                 mapping,
576             }| {
577                let mapping = WorkerSlotMapping::from_protobuf(mapping.as_ref().unwrap());
578                (*fragment_id, mapping)
579            },
580        )
581        .collect()
582}