1use std::collections::HashMap;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use parking_lot::RwLock;
20use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
21use risingwave_common::catalog::CatalogVersion;
22use risingwave_common::hash::WorkerSlotMapping;
23use risingwave_common::license::LicenseManager;
24use risingwave_common::secret::LocalSecretManager;
25use risingwave_common::session_config::SessionConfig;
26use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
27use risingwave_common_service::ObserverState;
28use risingwave_hummock_sdk::FrontendHummockVersion;
29use risingwave_pb::common::WorkerNode;
30use risingwave_pb::hummock::{HummockVersionDeltas, HummockVersionStats};
31use risingwave_pb::meta::object::{ObjectInfo, PbObjectInfo};
32use risingwave_pb::meta::subscribe_response::{Info, Operation};
33use risingwave_pb::meta::{FragmentWorkerSlotMapping, MetaSnapshot, SubscribeResponse};
34use risingwave_rpc_client::ComputeClientPoolRef;
35use tokio::sync::watch::Sender;
36
37use crate::catalog::FragmentId;
38use crate::catalog::root_catalog::Catalog;
39use crate::scheduler::HummockSnapshotManagerRef;
40use crate::user::user_manager::UserInfoManager;
41
42pub struct FrontendObserverNode {
43 worker_node_manager: WorkerNodeManagerRef,
44 version: CatalogVersion,
45 catalog_updated_tx: Sender<CatalogVersion>,
46 catalog: Arc<RwLock<Catalog>>,
47 user_info_manager: Arc<RwLock<UserInfoManager>>,
48 hummock_snapshot_manager: HummockSnapshotManagerRef,
49 system_params_manager: LocalSystemParamsManagerRef,
50 session_params: Arc<RwLock<SessionConfig>>,
51 compute_client_pool: ComputeClientPoolRef,
52}
53
54impl ObserverState for FrontendObserverNode {
55 fn subscribe_type() -> risingwave_pb::meta::SubscribeType {
56 risingwave_pb::meta::SubscribeType::Frontend
57 }
58
59 fn handle_notification(&mut self, resp: SubscribeResponse) {
60 let Some(info) = resp.info.as_ref() else {
61 return;
62 };
63
64 match info.to_owned() {
66 Info::Database(_)
67 | Info::Schema(_)
68 | Info::ObjectGroup(_)
69 | Info::Function(_)
70 | Info::Connection(_) => {
71 self.handle_catalog_notification(resp);
72 }
73 Info::Secret(_) => {
74 self.handle_catalog_notification(resp.clone());
75 self.handle_secret_notification(resp);
76 }
77 Info::Node(node) => {
78 self.update_worker_node_manager(resp.operation(), node);
79 }
80 Info::User(_) => {
81 self.handle_user_notification(resp);
82 }
83 Info::Snapshot(_) => {
84 panic!(
85 "receiving a snapshot in the middle is unsupported now {:?}",
86 resp
87 )
88 }
89 Info::HummockVersionDeltas(deltas) => {
90 self.handle_hummock_snapshot_notification(deltas);
91 }
92 Info::MetaBackupManifestId(_) => {
93 panic!("frontend node should not receive MetaBackupManifestId");
94 }
95 Info::HummockWriteLimits(_) => {
96 panic!("frontend node should not receive HummockWriteLimits");
97 }
98 Info::SystemParams(p) => {
99 self.system_params_manager.try_set_params(p);
100 }
101 Info::SessionParam(p) => {
102 self.session_params
103 .write()
104 .set(&p.param, p.value().to_owned(), &mut ())
105 .unwrap();
106 }
107 Info::HummockStats(stats) => {
108 self.handle_table_stats_notification(stats);
109 }
110 Info::StreamingWorkerSlotMapping(_) => self.handle_fragment_mapping_notification(resp),
111 Info::ServingWorkerSlotMappings(m) => {
112 self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation())
113 }
114 Info::Recovery(_) => {
115 self.compute_client_pool.invalidate_all();
116 }
117 Info::ClusterResource(resource) => {
118 LicenseManager::get().update_cluster_resource(resource);
119 }
120 }
121 }
122
123 fn handle_initialization_notification(&mut self, resp: SubscribeResponse) {
124 let mut catalog_guard = self.catalog.write();
125 let mut user_guard = self.user_info_manager.write();
126 catalog_guard.clear();
127 user_guard.clear();
128
129 let Some(Info::Snapshot(snapshot)) = resp.info else {
130 unreachable!();
131 };
132 let MetaSnapshot {
133 databases,
134 schemas,
135 sources,
136 sinks,
137 tables,
138 indexes,
139 views,
140 subscriptions,
141 functions,
142 connections,
143 users,
144 nodes,
145 hummock_version,
146 meta_backup_manifest_id: _,
147 hummock_write_limits: _,
148 streaming_worker_slot_mappings,
149 serving_worker_slot_mappings,
150 session_params,
151 version,
152 secrets,
153 cluster_resource,
154 object_dependencies,
155 } = snapshot;
156
157 for db in databases {
158 catalog_guard.create_database(&db)
159 }
160 for schema in schemas {
161 catalog_guard.create_schema(&schema)
162 }
163 for source in sources {
164 catalog_guard.create_source(&source)
165 }
166 for sink in sinks {
167 catalog_guard.create_sink(&sink)
168 }
169 for subscription in subscriptions {
170 catalog_guard.create_subscription(&subscription)
171 }
172 for table in tables {
173 catalog_guard.create_table(&table)
174 }
175 for index in indexes {
176 catalog_guard.create_index(&index)
177 }
178 for view in views {
179 catalog_guard.create_view(&view)
180 }
181 for function in functions {
182 catalog_guard.create_function(&function)
183 }
184 for connection in connections {
185 catalog_guard.create_connection(&connection)
186 }
187 for secret in &secrets {
188 catalog_guard.create_secret(secret)
189 }
190 catalog_guard.set_object_dependencies(object_dependencies);
191 for user in users {
192 user_guard.create_user(user)
193 }
194
195 self.worker_node_manager.refresh(
196 nodes,
197 convert_worker_slot_mapping(&streaming_worker_slot_mappings),
198 convert_worker_slot_mapping(&serving_worker_slot_mappings),
199 );
200 self.hummock_snapshot_manager
201 .init(FrontendHummockVersion::from_protobuf(
202 hummock_version.unwrap(),
203 ));
204
205 let snapshot_version = version.unwrap();
206 self.version = snapshot_version.catalog_version;
207 self.catalog_updated_tx
208 .send(snapshot_version.catalog_version)
209 .unwrap();
210 *self.session_params.write() =
211 serde_json::from_str(&session_params.unwrap().params).unwrap();
212 LocalSecretManager::global().init_secrets(secrets);
213 LicenseManager::get().update_cluster_resource(cluster_resource.unwrap());
214 }
215}
216
217impl FrontendObserverNode {
218 pub fn new(
219 worker_node_manager: WorkerNodeManagerRef,
220 catalog: Arc<RwLock<Catalog>>,
221 catalog_updated_tx: Sender<CatalogVersion>,
222 user_info_manager: Arc<RwLock<UserInfoManager>>,
223 hummock_snapshot_manager: HummockSnapshotManagerRef,
224 system_params_manager: LocalSystemParamsManagerRef,
225 session_params: Arc<RwLock<SessionConfig>>,
226 compute_client_pool: ComputeClientPoolRef,
227 ) -> Self {
228 Self {
229 version: 0,
230 worker_node_manager,
231 catalog,
232 catalog_updated_tx,
233 user_info_manager,
234 hummock_snapshot_manager,
235 system_params_manager,
236 session_params,
237 compute_client_pool,
238 }
239 }
240
241 fn handle_table_stats_notification(&mut self, table_stats: HummockVersionStats) {
242 let mut catalog_guard = self.catalog.write();
243 catalog_guard.set_table_stats(table_stats);
244 }
245
246 fn handle_catalog_notification(&mut self, resp: SubscribeResponse) {
247 let Some(info) = resp.info.as_ref() else {
248 return;
249 };
250 tracing::trace!(op = ?resp.operation(), ?info, "handle catalog notification");
251
252 let mut catalog_guard = self.catalog.write();
253 match info {
254 Info::Database(database) => match resp.operation() {
255 Operation::Add => catalog_guard.create_database(database),
256 Operation::Delete => catalog_guard.drop_database(database.id),
257 Operation::Update => catalog_guard.update_database(database),
258 _ => panic!("receive an unsupported notify {:?}", resp),
259 },
260 Info::Schema(schema) => match resp.operation() {
261 Operation::Add => catalog_guard.create_schema(schema),
262 Operation::Delete => catalog_guard.drop_schema(schema.database_id, schema.id),
263 Operation::Update => catalog_guard.update_schema(schema),
264 _ => panic!("receive an unsupported notify {:?}", resp),
265 },
266 Info::ObjectGroup(object_group) => {
267 if !object_group.dependencies.is_empty() {
268 catalog_guard.insert_object_dependencies(object_group.dependencies.clone());
269 }
270 for object in &object_group.objects {
271 let Some(obj) = object.object_info.as_ref() else {
272 continue;
273 };
274 match obj {
275 ObjectInfo::Database(db) => match resp.operation() {
276 Operation::Add => catalog_guard.create_database(db),
277 Operation::Delete => catalog_guard.drop_database(db.id),
278 Operation::Update => catalog_guard.update_database(db),
279 _ => panic!("receive an unsupported notify {:?}", resp),
280 },
281 ObjectInfo::Schema(schema) => match resp.operation() {
282 Operation::Add => catalog_guard.create_schema(schema),
283 Operation::Delete => {
284 catalog_guard.drop_schema(schema.database_id, schema.id)
285 }
286 Operation::Update => catalog_guard.update_schema(schema),
287 _ => panic!("receive an unsupported notify {:?}", resp),
288 },
289 PbObjectInfo::Table(table) => match resp.operation() {
290 Operation::Add => catalog_guard.create_table(table),
291 Operation::Delete => catalog_guard.drop_table(
292 table.database_id,
293 table.schema_id,
294 table.id,
295 ),
296 Operation::Update => {
297 let old_fragment_id = catalog_guard
298 .get_any_table_by_id(table.id)
299 .unwrap()
300 .fragment_id;
301 catalog_guard.update_table(table);
302 if old_fragment_id != table.fragment_id {
303 self.worker_node_manager
306 .remove_streaming_fragment_mapping(&old_fragment_id);
307 }
308 }
309 _ => panic!("receive an unsupported notify {:?}", resp),
310 },
311 PbObjectInfo::Source(source) => match resp.operation() {
312 Operation::Add => catalog_guard.create_source(source),
313 Operation::Delete => catalog_guard.drop_source(
314 source.database_id,
315 source.schema_id,
316 source.id,
317 ),
318 Operation::Update => catalog_guard.update_source(source),
319 _ => panic!("receive an unsupported notify {:?}", resp),
320 },
321 PbObjectInfo::Sink(sink) => match resp.operation() {
322 Operation::Add => catalog_guard.create_sink(sink),
323 Operation::Delete => {
324 catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id)
325 }
326 Operation::Update => catalog_guard.update_sink(sink),
327 _ => panic!("receive an unsupported notify {:?}", resp),
328 },
329 PbObjectInfo::Subscription(subscription) => match resp.operation() {
330 Operation::Add => catalog_guard.create_subscription(subscription),
331 Operation::Delete => catalog_guard.drop_subscription(
332 subscription.database_id,
333 subscription.schema_id,
334 subscription.id,
335 ),
336 Operation::Update => catalog_guard.update_subscription(subscription),
337 _ => panic!("receive an unsupported notify {:?}", resp),
338 },
339 PbObjectInfo::Index(index) => match resp.operation() {
340 Operation::Add => catalog_guard.create_index(index),
341 Operation::Delete => catalog_guard.drop_index(
342 index.database_id,
343 index.schema_id,
344 index.id,
345 ),
346 Operation::Update => catalog_guard.update_index(index),
347 _ => panic!("receive an unsupported notify {:?}", resp),
348 },
349 PbObjectInfo::View(view) => match resp.operation() {
350 Operation::Add => catalog_guard.create_view(view),
351 Operation::Delete => {
352 catalog_guard.drop_view(view.database_id, view.schema_id, view.id)
353 }
354 Operation::Update => catalog_guard.update_view(view),
355 _ => panic!("receive an unsupported notify {:?}", resp),
356 },
357 ObjectInfo::Function(function) => match resp.operation() {
358 Operation::Add => catalog_guard.create_function(function),
359 Operation::Delete => catalog_guard.drop_function(
360 function.database_id,
361 function.schema_id,
362 function.id,
363 ),
364 Operation::Update => catalog_guard.update_function(function),
365 _ => panic!("receive an unsupported notify {:?}", resp),
366 },
367 ObjectInfo::Connection(connection) => match resp.operation() {
368 Operation::Add => catalog_guard.create_connection(connection),
369 Operation::Delete => catalog_guard.drop_connection(
370 connection.database_id,
371 connection.schema_id,
372 connection.id,
373 ),
374 Operation::Update => catalog_guard.update_connection(connection),
375 _ => panic!("receive an unsupported notify {:?}", resp),
376 },
377 ObjectInfo::Secret(secret) => {
378 let mut secret = secret.clone();
379 secret.value =
381 "SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
382 match resp.operation() {
383 Operation::Add => catalog_guard.create_secret(&secret),
384 Operation::Delete => catalog_guard.drop_secret(
385 secret.database_id,
386 secret.schema_id,
387 secret.id,
388 ),
389 Operation::Update => catalog_guard.update_secret(&secret),
390 _ => panic!("receive an unsupported notify {:?}", resp),
391 }
392 }
393 }
394 }
395 }
396 Info::Function(function) => match resp.operation() {
397 Operation::Add => catalog_guard.create_function(function),
398 Operation::Delete => catalog_guard.drop_function(
399 function.database_id,
400 function.schema_id,
401 function.id,
402 ),
403 Operation::Update => catalog_guard.update_function(function),
404 _ => panic!("receive an unsupported notify {:?}", resp),
405 },
406 Info::Connection(connection) => match resp.operation() {
407 Operation::Add => catalog_guard.create_connection(connection),
408 Operation::Delete => catalog_guard.drop_connection(
409 connection.database_id,
410 connection.schema_id,
411 connection.id,
412 ),
413 Operation::Update => catalog_guard.update_connection(connection),
414 _ => panic!("receive an unsupported notify {:?}", resp),
415 },
416 Info::Secret(secret) => {
417 let mut secret = secret.clone();
418 secret.value = "SECRET VALUE SHOULD NOT BE REVEALED".as_bytes().to_vec();
420 match resp.operation() {
421 Operation::Add => catalog_guard.create_secret(&secret),
422 Operation::Delete => {
423 catalog_guard.drop_secret(secret.database_id, secret.schema_id, secret.id)
424 }
425 Operation::Update => catalog_guard.update_secret(&secret),
426 _ => panic!("receive an unsupported notify {:?}", resp),
427 }
428 }
429 _ => unreachable!(),
430 }
431 assert!(
432 resp.version > self.version,
433 "resp version={:?}, current version={:?}",
434 resp.version,
435 self.version
436 );
437 self.version = resp.version;
438 self.catalog_updated_tx.send(resp.version).unwrap();
439 }
440
441 fn handle_user_notification(&mut self, resp: SubscribeResponse) {
442 let Some(info) = resp.info.as_ref() else {
443 return;
444 };
445
446 let mut user_guard = self.user_info_manager.write();
447 match info {
448 Info::User(user) => match resp.operation() {
449 Operation::Add => user_guard.create_user(user.clone()),
450 Operation::Delete => user_guard.drop_user(user.id),
451 Operation::Update => user_guard.update_user(user.clone()),
452 _ => panic!("receive an unsupported notify {:?}", resp),
453 },
454 _ => unreachable!(),
455 }
456 assert!(
457 resp.version > self.version,
458 "resp version={:?}, current version={:?}",
459 resp.version,
460 self.version
461 );
462 self.version = resp.version;
463 self.catalog_updated_tx.send(resp.version).unwrap();
464 }
465
466 fn handle_fragment_mapping_notification(&mut self, resp: SubscribeResponse) {
467 let Some(info) = resp.info.as_ref() else {
468 return;
469 };
470 match info {
471 Info::StreamingWorkerSlotMapping(streaming_worker_slot_mapping) => {
472 let fragment_id = streaming_worker_slot_mapping.fragment_id;
473 let mapping = || {
474 WorkerSlotMapping::from_protobuf(
475 streaming_worker_slot_mapping.mapping.as_ref().unwrap(),
476 )
477 };
478
479 match resp.operation() {
480 Operation::Add => {
481 self.worker_node_manager
482 .insert_streaming_fragment_mapping(fragment_id, mapping());
483 }
484 Operation::Delete => {
485 self.worker_node_manager
486 .remove_streaming_fragment_mapping(&fragment_id);
487 }
488 Operation::Update => {
489 self.worker_node_manager
490 .update_streaming_fragment_mapping(fragment_id, mapping());
491 }
492 _ => panic!("receive an unsupported notify {:?}", resp),
493 }
494 }
495 _ => unreachable!(),
496 }
497 }
498
499 fn handle_fragment_serving_mapping_notification(
500 &mut self,
501 mappings: Vec<FragmentWorkerSlotMapping>,
502 op: Operation,
503 ) {
504 match op {
505 Operation::Add | Operation::Update => {
506 self.worker_node_manager
507 .upsert_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
508 }
509 Operation::Delete => self.worker_node_manager.remove_serving_fragment_mapping(
510 mappings
511 .into_iter()
512 .map(|m| m.fragment_id)
513 .collect_vec()
514 .as_slice(),
515 ),
516 Operation::Snapshot => {
517 self.worker_node_manager
518 .set_serving_fragment_mapping(convert_worker_slot_mapping(&mappings));
519 }
520 _ => panic!("receive an unsupported notify {:?}", op),
521 }
522 }
523
524 fn handle_hummock_snapshot_notification(&self, deltas: HummockVersionDeltas) {
526 self.hummock_snapshot_manager.update(deltas);
527 }
528
529 fn handle_secret_notification(&mut self, resp: SubscribeResponse) {
530 let resp_op = resp.operation();
531 let Some(Info::Secret(secret)) = resp.info else {
532 unreachable!();
533 };
534 match resp_op {
535 Operation::Add => {
536 LocalSecretManager::global().add_secret(secret.id, secret.value);
537 }
538 Operation::Delete => {
539 LocalSecretManager::global().remove_secret(secret.id);
540 }
541 Operation::Update => {
542 LocalSecretManager::global().update_secret(secret.id, secret.value);
543 }
544 _ => {
545 panic!("invalid notification operation: {resp_op:?}");
546 }
547 }
548 }
549
550 fn update_worker_node_manager(&self, operation: Operation, node: WorkerNode) {
553 tracing::debug!(
554 "Update worker nodes, operation: {:?}, node: {:?}",
555 operation,
556 node
557 );
558
559 match operation {
560 Operation::Add => self.worker_node_manager.add_worker_node(node),
561 Operation::Delete => self.worker_node_manager.remove_worker_node(node),
562 _ => (),
563 }
564 }
565}
566
567fn convert_worker_slot_mapping(
568 worker_slot_mappings: &[FragmentWorkerSlotMapping],
569) -> HashMap<FragmentId, WorkerSlotMapping> {
570 worker_slot_mappings
571 .iter()
572 .map(
573 |FragmentWorkerSlotMapping {
574 fragment_id,
575 mapping,
576 }| {
577 let mapping = WorkerSlotMapping::from_protobuf(mapping.as_ref().unwrap());
578 (*fragment_id, mapping)
579 },
580 )
581 .collect()
582}