risingwave_meta/stream/
refresh_manager.rs1use anyhow::anyhow;
16use risingwave_common::catalog::{DatabaseId, TableId};
17use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
18use risingwave_pb::meta::{RefreshRequest, RefreshResponse};
19use thiserror_ext::AsReport;
20
21use crate::barrier::{BarrierScheduler, Command};
22use crate::manager::MetadataManager;
23use crate::{MetaError, MetaResult};
24
25pub struct RefreshManager {
27 metadata_manager: MetadataManager,
28 barrier_scheduler: BarrierScheduler,
29}
30
31impl RefreshManager {
32 pub fn new(metadata_manager: MetadataManager, barrier_scheduler: BarrierScheduler) -> Self {
34 Self {
35 metadata_manager,
36 barrier_scheduler,
37 }
38 }
39
40 pub async fn refresh_table(&self, request: RefreshRequest) -> MetaResult<RefreshResponse> {
47 let table_id = TableId::new(request.table_id);
48 let associated_source_id = TableId::new(request.associated_source_id);
49
50 tracing::info!("Starting refresh operation for table {}", table_id);
51
52 self.validate_refreshable_table(table_id, associated_source_id)
54 .await?;
55
56 let database_id = DatabaseId::new(
58 self.metadata_manager
59 .catalog_controller
60 .get_object_database_id(table_id.table_id() as _)
61 .await? as _,
62 );
63
64 let refresh_command = Command::Refresh {
66 table_id,
67 associated_source_id,
68 };
69
70 match self
72 .barrier_scheduler
73 .run_command(database_id, refresh_command)
74 .await
75 {
76 Ok(_) => {
77 tracing::info!(
78 table_id = %table_id,
79 "Refresh command completed successfully"
80 );
81
82 Ok(RefreshResponse {
83 status: None, })
85 }
86 Err(e) => {
87 tracing::error!(
88 error = %e.as_report(),
89 table_id = %table_id,
90 "Failed to execute refresh command"
91 );
92
93 Err(anyhow!(e)
94 .context(format!("Failed to refresh table {}", table_id))
95 .into())
96 }
97 }
98 }
99
100 async fn validate_refreshable_table(
102 &self,
103 table_id: TableId,
104 associated_source_id: TableId,
105 ) -> MetaResult<()> {
106 let table = self
108 .metadata_manager
109 .catalog_controller
110 .get_table_by_id(table_id.table_id as _)
111 .await?;
112
113 if !table.refreshable {
115 return Err(MetaError::invalid_parameter(format!(
116 "Table '{}' is not refreshable. Only tables created with REFRESHABLE flag support manual refresh.",
117 table.name
118 )));
119 }
120
121 if table.optional_associated_source_id
122 != Some(OptionalAssociatedSourceId::AssociatedSourceId(
123 associated_source_id.table_id(),
124 ))
125 {
126 return Err(MetaError::invalid_parameter(format!(
127 "Table '{}' is not associated with source '{}'. table.optional_associated_source_id: {:?}",
128 table.name, associated_source_id, table.optional_associated_source_id
129 )));
130 }
131
132 tracing::debug!(
133 table_id = %table_id,
134 table_name = %table.name,
135 "Table validation passed for refresh operation"
136 );
137
138 Ok(())
139 }
140}