risingwave_frontend/observer/
observer_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use parking_lot::RwLock;
20use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
21use risingwave_common::catalog::CatalogVersion;
22use risingwave_common::hash::WorkerSlotMapping;
23use risingwave_common::license::LicenseManager;
24use risingwave_common::secret::LocalSecretManager;
25use risingwave_common::session_config::SessionConfig;
26use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
27use risingwave_common_service::ObserverState;
28use risingwave_hummock_sdk::FrontendHummockVersion;
29use risingwave_pb::common::WorkerNode;
30use risingwave_pb::hummock::{HummockVersionDeltas, HummockVersionStats};
31use risingwave_pb::meta::object::{ObjectInfo, PbObjectInfo};
32use risingwave_pb::meta::subscribe_response::{Info, Operation};
33use risingwave_pb::meta::{FragmentWorkerSlotMapping, MetaSnapshot, SubscribeResponse};
34use risingwave_rpc_client::ComputeClientPoolRef;
35use tokio::sync::watch::Sender;
36
37use crate::catalog::FragmentId;
38use crate::catalog::root_catalog::Catalog;
39use crate::scheduler::HummockSnapshotManagerRef;
40use crate::user::user_manager::UserInfoManager;
41
42pub struct FrontendObserverNode {
43    worker_node_manager: WorkerNodeManagerRef,
44    version: CatalogVersion,
45    catalog_updated_tx: Sender<CatalogVersion>,
46    catalog: Arc<RwLock<Catalog>>,
47    user_info_manager: Arc<RwLock<UserInfoManager>>,
48    hummock_snapshot_manager: HummockSnapshotManagerRef,
49    system_params_manager: LocalSystemParamsManagerRef,
50    session_params: Arc<RwLock<SessionConfig>>,
51    compute_client_pool: ComputeClientPoolRef,
52}
53
54impl ObserverState for FrontendObserverNode {
55    fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
56        risingwave_pb::meta::SubscribeType::Frontend
57    }
58
59    fn handle_notification(&mut self, resp: SubscribeResponse) {
60        let Some(info) = resp.info.as_ref() else {
61            return;
62        };
63
64        // TODO: this clone can be avoided
65        match info.to_owned() {
66            Info::Database(_)
67            | Info::Schema(_)
68            | Info::ObjectGroup(_)
69            | Info::Function(_)
70            | Info::Connection(_) => {
71                self.handle_catalog_notification(resp);
72            }
73            Info::Secret(_) => {
74                self.handle_catalog_notification(resp.clone());
75                self.handle_secret_notification(resp);
76            }
77            Info::Node(node) => {
78                self.update_worker_node_manager(resp.operation(), node);
79            }
80            Info::User(_) => {
81                self.handle_user_notification(resp);
82            }
83            Info::Snapshot(_) => {
84                panic!(
85                    "receiving a snapshot in the middle is unsupported now {:?}",
86                    resp
87                )
88            }
89            Info::HummockVersionDeltas(deltas) => {
90                self.handle_hummock_snapshot_notification(deltas);
91            }
92            Info::MetaBackupManifestId(_) => {
93                panic!("frontend node should not receive MetaBackupManifestId");
94            }
95            Info::HummockWriteLimits(_) => {
96                panic!("frontend node should not receive HummockWriteLimits");
97            }
98            Info::SystemParams(p) => {
99                self.system_params_manager.try_set_params(p);
100            }
101            Info::SessionParam(p) => {
102                self.session_params
103                    .write()
104                    .set(&p.param, p.value().to_owned(), &mut ())
105                    .unwrap();
106            }
107            Info::HummockStats(stats) => {
108                self.handle_table_stats_notification(stats);
109            }
110            Info::StreamingWorkerSlotMapping(_) => self.handle_fragment_mapping_notification(resp),
111            Info::ServingWorkerSlotMappings(m) => {
112                self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation())
113            }
114            Info::Recovery(_) => {
115                self.compute_client_pool.invalidate_all();
116            }
117            Info::ClusterResource(resource) => {
118                LicenseManager::get().update_cluster_resource(resource);
119            }
120        }
121    }
122
123    fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
124        let mut catalog_guard = self.catalog.write();
125        let mut user_guard = self.user_info_manager.write();
126        catalog_guard.clear();
127        user_guard.clear();
128
129        let Some(Info::Snapshot(snapshot)) = resp.info else {
130            unreachable!();
131        };
132        let MetaSnapshot {
133            databases,
134            schemas,
135            sources,
136            sinks,
137            tables,
138            indexes,
139            views,
140            subscriptions,
141            functions,
142            connections,
143            users,
144            nodes,
145            hummock_version,
146            meta_backup_manifest_id: _,
147            hummock_write_limits: _,
148            streaming_worker_slot_mappings,
149            serving_worker_slot_mappings,
150            session_params,
151            version,
152            secrets,
153            cluster_resource,
154        } = snapshot;
155
156        for db in databases {
157            catalog_guard.create_database(&db)
158        }
159        for schema in schemas {
160            catalog_guard.create_schema(&schema)
161        }
162        for source in sources {
163            catalog_guard.create_source(&source)
164        }
165        for sink in sinks {
166            catalog_guard.create_sink(&sink)
167        }
168        for subscription in subscriptions {
169            catalog_guard.create_subscription(&subscription)
170        }
171        for table in tables {
172            catalog_guard.create_table(&table)
173        }
174        for index in indexes {
175            catalog_guard.create_index(&index)
176        }
177        for view in views {
178            catalog_guard.create_view(&view)
179        }
180        for function in functions {
181            catalog_guard.create_function(&function)
182        }
183        for connection in connections {
184            catalog_guard.create_connection(&connection)
185        }
186        for secret in &secrets {
187            catalog_guard.create_secret(secret)
188        }
189        for user in users {
190            user_guard.create_user(user)
191        }
192
193        self.worker_node_manager.refresh(
194            nodes,
195            convert_worker_slot_mapping(&streaming_worker_slot_mappings),
196            convert_worker_slot_mapping(&serving_worker_slot_mappings),
197        );
198        self.hummock_snapshot_manager
199            .init(FrontendHummockVersion::from_protobuf(
200                hummock_version.unwrap(),
201            ));
202
203        let snapshot_version = version.unwrap();
204        self.version = snapshot_version.catalog_version;
205        self.catalog_updated_tx
206            .send(snapshot_version.catalog_version)
207            .unwrap();
208        *self.session_params.write() =
209            serde_json::from_str(&session_params.unwrap().params).unwrap();
210        LocalSecretManager::global().init_secrets(secrets);
211        LicenseManager::get().update_cluster_resource(cluster_resource.unwrap());
212    }
213}
214
215impl FrontendObserverNode {
216    pub fn new(
217        worker_node_manager: WorkerNodeManagerRef,
218        catalog: Arc<RwLock<Catalog>>,
219        catalog_updated_tx: Sender<CatalogVersion>,
220        user_info_manager: Arc<RwLock<UserInfoManager>>,
221        hummock_snapshot_manager: HummockSnapshotManagerRef,
222        system_params_manager: LocalSystemParamsManagerRef,
223        session_params: Arc<RwLock<SessionConfig>>,
224        compute_client_pool: ComputeClientPoolRef,
225    ) -> Self {
226        Self {
227            version: 0,
228            worker_node_manager,
229            catalog,
230            catalog_updated_tx,
231            user_info_manager,
232            hummock_snapshot_manager,
233            system_params_manager,
234            session_params,
235            compute_client_pool,
236        }
237    }
238
239    fn handle_table_stats_notification(&mut self, table_stats: HummockVersionStats) {
240        let mut catalog_guard = self.catalog.write();
241        catalog_guard.set_table_stats(table_stats);
242    }
243
244    fn handle_catalog_notification(&mut self, resp: SubscribeResponse) {
245        let Some(info) = resp.info.as_ref() else {
246            return;
247        };
248        tracing::trace!(op = ?resp.operation(), ?info, "handle catalog notification");
249
250        let mut catalog_guard = self.catalog.write();
251        match info {
252            Info::Database(database) => match resp.operation() {
253                Operation::Add => catalog_guard.create_database(database),
254                Operation::Delete => catalog_guard.drop_database(database.id),
255                Operation::Update => catalog_guard.update_database(database),
256                _ => panic!("receive an unsupported notify {:?}", resp),
257            },
258            Info::Schema(schema) => match resp.operation() {
259                Operation::Add => catalog_guard.create_schema(schema),
260                Operation::Delete => catalog_guard.drop_schema(schema.database_id, schema.id),
261                Operation::Update => catalog_guard.update_schema(schema),
262                _ => panic!("receive an unsupported notify {:?}", resp),
263            },
264            Info::ObjectGroup(object_group) => {
265                for object in &object_group.objects {
266                    let Some(obj) = object.object_info.as_ref() else {
267                        continue;
268                    };
269                    match obj {
270                        ObjectInfo::Database(db) => match resp.operation() {
271                            Operation::Add => catalog_guard.create_database(db),
272                            Operation::Delete => catalog_guard.drop_database(db.id),
273                            Operation::Update => catalog_guard.update_database(db),
274                            _ => panic!("receive an unsupported notify {:?}", resp),
275                        },
276                        ObjectInfo::Schema(schema) => match resp.operation() {
277                            Operation::Add => catalog_guard.create_schema(schema),
278                            Operation::Delete => {
279                                catalog_guard.drop_schema(schema.database_id, schema.id)
280                            }
281                            Operation::Update => catalog_guard.update_schema(schema),
282                            _ => panic!("receive an unsupported notify {:?}", resp),
283                        },
284                        PbObjectInfo::Table(table) => match resp.operation() {
285                            Operation::Add => catalog_guard.create_table(table),
286                            Operation::Delete => catalog_guard.drop_table(
287                                table.database_id,
288                                table.schema_id,
289                                table.id,
290                            ),
291                            Operation::Update => {
292                                let old_fragment_id = catalog_guard
293                                    .get_any_table_by_id(table.id)
294                                    .unwrap()
295                                    .fragment_id;
296                                catalog_guard.update_table(table);
297                                if old_fragment_id != table.fragment_id {
298                                    // FIXME: the frontend node delete its fragment for the update
299                                    // operation by itself.
300                                    self.worker_node_manager
301                                        .remove_streaming_fragment_mapping(&old_fragment_id);
302                                }
303                            }
304                            _ => panic!("receive an unsupported notify {:?}", resp),
305                        },
306                        PbObjectInfo::Source(source) => match resp.operation() {
307                            Operation::Add => catalog_guard.create_source(source),
308                            Operation::Delete => catalog_guard.drop_source(
309                                source.database_id,
310                                source.schema_id,
311                                source.id,
312                            ),
313                            Operation::Update => catalog_guard.update_source(source),
314                            _ => panic!("receive an unsupported notify {:?}", resp),
315                        },
316                        PbObjectInfo::Sink(sink) => match resp.operation() {
317                            Operation::Add => catalog_guard.create_sink(sink),
318                            Operation::Delete => {
319                                catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id)
320                            }
321                            Operation::Update => catalog_guard.update_sink(sink),
322                            _ => panic!("receive an unsupported notify {:?}", resp),
323                        },
324                        PbObjectInfo::Subscription(subscription) => match resp.operation() {
325                            Operation::Add => catalog_guard.create_subscription(subscription),
326                            Operation::Delete => catalog_guard.drop_subscription(
327                                subscription.database_id,
328                                subscription.schema_id,
329                                subscription.id,
330                            ),
331                            Operation::Update => catalog_guard.update_subscription(subscription),
332                            _ => panic!("receive an unsupported notify {:?}", resp),
333                        },
334                        PbObjectInfo::Index(index) => match resp.operation() {
335                            Operation::Add => catalog_guard.create_index(index),
336                            Operation::Delete => catalog_guard.drop_index(
337                                index.database_id,
338                                index.schema_id,
339                                index.id,
340                            ),
341                            Operation::Update => catalog_guard.update_index(index),
342                            _ => panic!("receive an unsupported notify {:?}", resp),
343                        },
344                        PbObjectInfo::View(view) => match resp.operation() {
345                            Operation::Add => catalog_guard.create_view(view),
346                            Operation::Delete => {
347                                catalog_guard.drop_view(view.database_id, view.schema_id, view.id)
348                            }
349                            Operation::Update => catalog_guard.update_view(view),
350                            _ => panic!("receive an unsupported notify {:?}", resp),
351                        },
352                        ObjectInfo::Function(function) => match resp.operation() {
353                            Operation::Add => catalog_guard.create_function(function),
354                            Operation::Delete => catalog_guard.drop_function(
355                                function.database_id,
356                                function.schema_id,
357                                function.id,
358                            ),
359                            Operation::Update => catalog_guard.update_function(function),
360                            _ => panic!("receive an unsupported notify {:?}", resp),
361                        },
362                        ObjectInfo::Connection(connection) => match resp.operation() {
363                            Operation::Add => catalog_guard.create_connection(connection),
364                            Operation::Delete => catalog_guard.drop_connection(
365                                connection.database_id,
366                                connection.schema_id,
367                                connection.id,
368                            ),
369                            Operation::Update => catalog_guard.update_connection(connection),
370                            _ => panic!("receive an unsupported notify {:?}", resp),
371                        },
372                        ObjectInfo::Secret(secret) => {
373                            let mut secret = secret.clone();
374                            // The secret value should not be revealed to users. So mask it in the frontend catalog.
375                            secret.value =
376                                "SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
377                            match resp.operation() {
378                                Operation::Add => catalog_guard.create_secret(&secret),
379                                Operation::Delete => catalog_guard.drop_secret(
380                                    secret.database_id,
381                                    secret.schema_id,
382                                    secret.id,
383                                ),
384                                Operation::Update => catalog_guard.update_secret(&secret),
385                                _ => panic!("receive an unsupported notify {:?}", resp),
386                            }
387                        }
388                    }
389                }
390            }
391            Info::Function(function) => match resp.operation() {
392                Operation::Add => catalog_guard.create_function(function),
393                Operation::Delete => catalog_guard.drop_function(
394                    function.database_id,
395                    function.schema_id,
396                    function.id,
397                ),
398                Operation::Update => catalog_guard.update_function(function),
399                _ => panic!("receive an unsupported notify {:?}", resp),
400            },
401            Info::Connection(connection) => match resp.operation() {
402                Operation::Add => catalog_guard.create_connection(connection),
403                Operation::Delete => catalog_guard.drop_connection(
404                    connection.database_id,
405                    connection.schema_id,
406                    connection.id,
407                ),
408                Operation::Update => catalog_guard.update_connection(connection),
409                _ => panic!("receive an unsupported notify {:?}", resp),
410            },
411            Info::Secret(secret) => {
412                let mut secret = secret.clone();
413                // The secret value should not be revealed to users. So mask it in the frontend catalog.
414                secret.value = "SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
415                match resp.operation() {
416                    Operation::Add => catalog_guard.create_secret(&secret),
417                    Operation::Delete => {
418                        catalog_guard.drop_secret(secret.database_id, secret.schema_id, secret.id)
419                    }
420                    Operation::Update => catalog_guard.update_secret(&secret),
421                    _ => panic!("receive an unsupported notify {:?}", resp),
422                }
423            }
424            _ => unreachable!(),
425        }
426        assert!(
427            resp.version > self.version,
428            "resp version={:?}, current version={:?}",
429            resp.version,
430            self.version
431        );
432        self.version = resp.version;
433        self.catalog_updated_tx.send(resp.version).unwrap();
434    }
435
436    fn handle_user_notification(&mut self, resp: SubscribeResponse) {
437        let Some(info) = resp.info.as_ref() else {
438            return;
439        };
440
441        let mut user_guard = self.user_info_manager.write();
442        match info {
443            Info::User(user) => match resp.operation() {
444                Operation::Add => user_guard.create_user(user.clone()),
445                Operation::Delete => user_guard.drop_user(user.id),
446                Operation::Update => user_guard.update_user(user.clone()),
447                _ => panic!("receive an unsupported notify {:?}", resp),
448            },
449            _ => unreachable!(),
450        }
451        assert!(
452            resp.version > self.version,
453            "resp version={:?}, current version={:?}",
454            resp.version,
455            self.version
456        );
457        self.version = resp.version;
458        self.catalog_updated_tx.send(resp.version).unwrap();
459    }
460
461    fn handle_fragment_mapping_notification(&mut self, resp: SubscribeResponse) {
462        let Some(info) = resp.info.as_ref() else {
463            return;
464        };
465        match info {
466            Info::StreamingWorkerSlotMapping(streaming_worker_slot_mapping) => {
467                let fragment_id = streaming_worker_slot_mapping.fragment_id;
468                let mapping = || {
469                    WorkerSlotMapping::from_protobuf(
470                        streaming_worker_slot_mapping.mapping.as_ref().unwrap(),
471                    )
472                };
473
474                match resp.operation() {
475                    Operation::Add => {
476                        self.worker_node_manager
477                            .insert_streaming_fragment_mapping(fragment_id, mapping());
478                    }
479                    Operation::Delete => {
480                        self.worker_node_manager
481                            .remove_streaming_fragment_mapping(&fragment_id);
482                    }
483                    Operation::Update => {
484                        self.worker_node_manager
485                            .update_streaming_fragment_mapping(fragment_id, mapping());
486                    }
487                    _ => panic!("receive an unsupported notify {:?}", resp),
488                }
489            }
490            _ => unreachable!(),
491        }
492    }
493
494    fn handle_fragment_serving_mapping_notification(
495        &mut self,
496        mappings: Vec<FragmentWorkerSlotMapping>,
497        op: Operation,
498    ) {
499        match op {
500            Operation::Add | Operation::Update => {
501                self.worker_node_manager
502                    .upsert_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
503            }
504            Operation::Delete => self.worker_node_manager.remove_serving_fragment_mapping(
505                mappings
506                    .into_iter()
507                    .map(|m| m.fragment_id)
508                    .collect_vec()
509                    .as_slice(),
510            ),
511            Operation::Snapshot => {
512                self.worker_node_manager
513                    .set_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
514            }
515            _ => panic!("receive an unsupported notify {:?}", op),
516        }
517    }
518
519    /// Update max committed epoch in `HummockSnapshotManager`.
520    fn handle_hummock_snapshot_notification(&self, deltas: HummockVersionDeltas) {
521        self.hummock_snapshot_manager.update(deltas);
522    }
523
524    fn handle_secret_notification(&mut self, resp: SubscribeResponse) {
525        let resp_op = resp.operation();
526        let Some(Info::Secret(secret)) = resp.info else {
527            unreachable!();
528        };
529        match resp_op {
530            Operation::Add => {
531                LocalSecretManager::global().add_secret(secret.id, secret.value);
532            }
533            Operation::Delete => {
534                LocalSecretManager::global().remove_secret(secret.id);
535            }
536            Operation::Update => {
537                LocalSecretManager::global().update_secret(secret.id, secret.value);
538            }
539            _ => {
540                panic!("invalid notification operation: {resp_op:?}");
541            }
542        }
543    }
544
545    /// `update_worker_node_manager` is called in `start` method.
546    /// It calls `add_worker_node` and `remove_worker_node` of `WorkerNodeManager`.
547    fn update_worker_node_manager(&self, operation: Operation, node: WorkerNode) {
548        tracing::debug!(
549            "Update worker nodes, operation: {:?}, node: {:?}",
550            operation,
551            node
552        );
553
554        match operation {
555            Operation::Add => self.worker_node_manager.add_worker_node(node),
556            Operation::Delete => self.worker_node_manager.remove_worker_node(node),
557            _ => (),
558        }
559    }
560}
561
562fn convert_worker_slot_mapping(
563    worker_slot_mappings: &[FragmentWorkerSlotMapping],
564) -> HashMap<FragmentId, WorkerSlotMapping> {
565    worker_slot_mappings
566        .iter()
567        .map(
568            |FragmentWorkerSlotMapping {
569                 fragment_id,
570                 mapping,
571             }| {
572                let mapping = WorkerSlotMapping::from_protobuf(mapping.as_ref().unwrap());
573                (*fragment_id, mapping)
574            },
575        )
576        .collect()
577}