1use std::env;
16use std::path::{Path, PathBuf};
17use std::process::Command;
18
19use anyhow::{Context, Result, anyhow};
20use serde_json::json;
21use sqlx::ConnectOptions;
22use thiserror_ext::AsReport;
23
24use super::{ExecuteContext, Task};
25use crate::LakekeeperConfig;
26use crate::util::stylized_risedev_subcmd;
27
28pub struct LakekeeperService {
29 config: LakekeeperConfig,
30}
31
32impl LakekeeperService {
33 pub fn new(config: LakekeeperConfig) -> Result<Self> {
34 Ok(Self { config })
35 }
36
37 fn lakekeeper_path(&self) -> Result<PathBuf> {
38 let prefix_bin = env::var("PREFIX_BIN")?;
39 Ok(Path::new(&prefix_bin).join("lakekeeper"))
40 }
41
42 fn lakekeeper(&self) -> Result<Command> {
43 Ok(Command::new(self.lakekeeper_path()?))
44 }
45
46 fn base_uri(&self) -> String {
47 format!("http://{}:{}", &self.config.address, self.config.port)
48 }
49
50 pub fn apply_command_args(cmd: &mut Command, config: &LakekeeperConfig) -> Result<()> {
52 let base_uri = format!("http://{}:{}", &config.address, config.port);
54 cmd.env("LAKEKEEPER__BASE_URI", &base_uri)
55 .env("LAKEKEEPER__LISTEN_PORT", config.port.to_string())
56 .env("LAKEKEEPER__PG_ENCRYPTION_KEY", &config.encryption_key);
57
58 if let Some(postgres_configs) = &config.provide_postgres_backend
60 && let Some(pg_config) = postgres_configs.first()
61 {
62 let database_url = format!(
63 "postgres://{}:{}@{}:{}/{}",
64 pg_config.user, pg_config.password, pg_config.address, pg_config.port, "lakekeeper"
65 );
66 cmd.env("LAKEKEEPER__PG_DATABASE_URL_READ", &database_url)
67 .env("LAKEKEEPER__PG_DATABASE_URL_WRITE", &database_url);
68 }
69
70 Ok(())
71 }
72
73 fn initialize_lakekeeper_database(
74 &self,
75 ctx: &mut ExecuteContext<impl std::io::Write>,
76 ) -> Result<()> {
77 if let Some(postgres_configs) = &self.config.provide_postgres_backend
78 && let Some(pg_config) = postgres_configs.first()
79 {
80 ctx.pb.set_message("waiting for PostgreSQL to be ready...");
82 let mut tcp_check = crate::TcpReadyCheckTask::new(
83 pg_config.address.clone(),
84 pg_config.port,
85 pg_config.user_managed,
86 )?;
87 tcp_check.execute(ctx)?;
88
89 std::thread::sleep(std::time::Duration::from_secs(2));
91
92 let rt = tokio::runtime::Builder::new_current_thread()
93 .enable_all()
94 .build()?;
95
96 let db_name = "lakekeeper";
97 let host = pg_config.address.clone();
98 let port = pg_config.port;
99 let username = pg_config.user.clone();
100 let password = pg_config.password.clone();
101
102 rt.block_on(async move {
103 use sqlx::postgres::*;
104 use tokio::time::{sleep, Duration};
105 let options = PgConnectOptions::new()
106 .host(&host)
107 .port(port)
108 .username(&username)
109 .password(&password)
110 .database("template1")
111 .ssl_mode(sqlx::postgres::PgSslMode::Prefer);
112
113 let mut attempts = 0;
115 let max_attempts = 5;
116 let mut conn = loop {
117 match options.connect().await {
118 Ok(conn) => break conn,
119 Err(_) if attempts < max_attempts => {
120 attempts += 1;
121 let delay = Duration::from_millis(500 * (1 << attempts)); sleep(delay).await;
123 }
124 Err(e) => {
125 return Err(e).context("failed to connect to template database for lakekeeper after retries")?;
126 }
127 }
128 };
129
130 let db_exists_query = format!(
132 "SELECT 1 FROM pg_database WHERE datname = '{}';",
133 db_name
134 );
135 let db_exists = match sqlx::raw_sql(&db_exists_query)
136 .fetch_one(&mut conn)
137 .await
138 {
139 Ok(_) => true,
140 Err(sqlx::Error::RowNotFound) => false,
141 Err(e) => return Err(e.into()),
142 };
143
144 if !db_exists {
145 sqlx::raw_sql(&format!("CREATE DATABASE {};", db_name))
148 .execute(&mut conn)
149 .await?;
150 } else {
151 }
153
154 Ok::<_, anyhow::Error>(())
155 })
156 .context("failed to initialize lakekeeper database")?;
157 }
158
159 Ok(())
160 }
161
162 fn run_migrate(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> Result<()> {
163 ctx.pb.set_message("running database migration...");
164 let mut cmd = self.lakekeeper()?;
165 Self::apply_command_args(&mut cmd, &self.config)?;
166 cmd.arg("migrate");
167 ctx.run_command(cmd)?;
168 Ok(())
169 }
170
171 fn bootstrap_lakekeeper(&self, ctx: &mut ExecuteContext<impl std::io::Write>) -> Result<()> {
172 ctx.pb.set_message("bootstrapping lakekeeper...");
173
174 std::thread::sleep(std::time::Duration::from_secs(3));
176
177 let bootstrap_config = json!({
178 "accept-terms-of-use": true,
179 "is-operator": true
180 });
181
182 let rt = tokio::runtime::Builder::new_current_thread()
183 .enable_all()
184 .build()?;
185
186 let bootstrap_json = bootstrap_config.to_string();
187 let lakekeeper_url = format!("{}/management/v1/bootstrap", self.base_uri());
188
189 rt.block_on(async move {
190 use tokio::time::{sleep, Duration};
191
192 let mut attempts = 0;
194 let max_attempts = 5;
195
196 loop {
197 let client = reqwest::Client::new();
198 match client
199 .post(&lakekeeper_url)
200 .header("Content-Type", "application/json")
201 .body(bootstrap_json.clone())
202 .send()
203 .await
204 {
205 Ok(response) => {
206 if response.status().is_success() {
207 let _response_text = response.text().await.unwrap_or_default();
208 break;
210 } else if response.status() == reqwest::StatusCode::CONFLICT || response.status() == reqwest::StatusCode::BAD_REQUEST {
211 let error_text = response.text().await.unwrap_or_default();
212 if error_text.contains("already bootstrapped") || error_text.contains("Server already initialized") {
213 break;
215 } else {
216 eprintln!("Bootstrap conflict: {}", error_text);
217 break;
218 }
219 } else {
220 let status = response.status();
221 let error_text = response.text().await.unwrap_or_default();
222 eprintln!("Failed to bootstrap lakekeeper ({}): {}", status, error_text);
223
224 if attempts >= max_attempts {
225 break;
226 }
227 }
228 }
229 Err(e) => {
230 if attempts >= max_attempts {
231 eprintln!("Failed to bootstrap lakekeeper after {} attempts: {}", max_attempts + 1, e.as_report());
232 break;
233 }
234
235 attempts += 1;
236 let delay = Duration::from_millis(1000 * (1 << attempts)); eprintln!("Failed to connect to lakekeeper for bootstrap, retrying in {}s... (attempt {}/{})",
238 delay.as_secs(), attempts, max_attempts + 1);
239 sleep(delay).await;
240 }
241 }
242 }
243
244 Ok::<_, anyhow::Error>(())
245 }).context("failed to bootstrap lakekeeper")?;
246
247 Ok(())
248 }
249
250 fn create_default_warehouse(
251 &self,
252 ctx: &mut ExecuteContext<impl std::io::Write>,
253 ) -> Result<()> {
254 ctx.pb.set_message("creating default warehouse...");
255
256 std::thread::sleep(std::time::Duration::from_secs(1));
258
259 let warehouse_config = if let Some(minio_configs) = &self.config.provide_minio
260 && let Some(minio_config) = minio_configs.first()
261 {
262 json!({
263 "warehouse-name": "risingwave-warehouse",
264 "storage-profile": {
265 "type": "s3",
266 "bucket": "hummock001",
267 "key-prefix": "risingwave-lakekeeper",
268 "region": "us-east-1",
269 "sts-enabled": false,
270 "flavor": "s3-compat",
271 "endpoint": format!("http://{}:{}/", minio_config.address, minio_config.port),
272 "path-style-access": true
273 },
274 "storage-credential": {
275 "type": "s3",
276 "credential-type": "access-key",
277 "aws-access-key-id": minio_config.root_user,
278 "aws-secret-access-key": minio_config.root_password
279 }
280 })
281 } else {
282 json!({
284 "warehouse-name": "risingwave-warehouse",
285 "storage-profile": {
286 "type": "s3",
287 "bucket": "hummock001",
288 "key-prefix": "risingwave-lakekeeper",
289 "region": "us-east-1",
290 "sts-enabled": false,
291 "flavor": "s3-compat",
292 "endpoint": "http://127.0.0.1:9301/",
293 "path-style-access": true
294 },
295 "storage-credential": {
296 "type": "s3",
297 "credential-type": "access-key",
298 "aws-access-key-id": "hummockadmin",
299 "aws-secret-access-key": "hummockadmin"
300 }
301 })
302 };
303
304 let rt = tokio::runtime::Builder::new_current_thread()
305 .enable_all()
306 .build()?;
307
308 let warehouse_json = warehouse_config.to_string();
309 let lakekeeper_url = format!("{}/management/v1/warehouse", self.base_uri());
310
311 rt.block_on(async move {
312 use tokio::time::{Duration, sleep};
313
314 let mut attempts = 0;
316 let max_attempts = 5;
317
318 loop {
319 let client = reqwest::Client::new();
320 match client
321 .post(&lakekeeper_url)
322 .header("Content-Type", "application/json")
323 .body(warehouse_json.clone())
324 .send()
325 .await
326 {
327 Ok(response) => {
328 if response.status().is_success() {
329 let _response_text = response.text().await.unwrap_or_default();
330 break;
332 } else if response.status() == reqwest::StatusCode::BAD_REQUEST {
333 let error_text = response.text().await.unwrap_or_default();
334 if error_text.contains("Storage profile overlaps") {
335 break;
337 } else {
338 eprintln!("Failed to create warehouse: {}", error_text);
339 break;
340 }
341 } else {
342 let status = response.status();
343 let error_text = response.text().await.unwrap_or_default();
344 eprintln!("Failed to create warehouse ({}): {}", status, error_text);
345
346 if attempts >= max_attempts {
347 break;
348 }
349 }
350 }
351 Err(e) => {
352 if attempts >= max_attempts {
353 eprintln!(
354 "Failed to create warehouse after {} attempts: {}",
355 max_attempts + 1,
356 e.as_report()
357 );
358 break;
359 }
360
361 attempts += 1;
362 let delay = Duration::from_millis(1000 * (1 << attempts)); eprintln!(
364 "Failed to connect to lakekeeper, retrying in {}s... (attempt {}/{})",
365 delay.as_secs(),
366 attempts,
367 max_attempts + 1
368 );
369 sleep(delay).await;
370 }
371 }
372 }
373
374 Ok::<_, anyhow::Error>(())
375 })
376 .context("failed to create default warehouse")?;
377
378 Ok(())
379 }
380}
381
382impl Task for LakekeeperService {
383 fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
384 ctx.service(self);
385 ctx.pb.set_message("starting...");
386
387 let path = self.lakekeeper_path()?;
388 if !path.exists() {
389 return Err(anyhow!(
390 "lakekeeper binary not found in {:?}\nDid you enable lakekeeper feature in `{}`?",
391 path,
392 stylized_risedev_subcmd("configure")
393 ));
394 }
395
396 if self.config.provide_postgres_backend.is_some() {
398 ctx.pb.set_message("initializing lakekeeper database...");
399 self.initialize_lakekeeper_database(ctx)?;
400 self.run_migrate(ctx)?;
401 }
402
403 let mut cmd = self.lakekeeper()?;
404 Self::apply_command_args(&mut cmd, &self.config)?;
405 cmd.arg("serve");
406
407 let prefix_config = env::var("PREFIX_CONFIG")?;
408 let data_path = Path::new(&env::var("PREFIX_DATA")?).join(self.id());
409 fs_err::create_dir_all(&data_path)?;
410
411 let config_dir = Path::new(&prefix_config).join("lakekeeper");
413 fs_err::create_dir_all(&config_dir)?;
414
415 ctx.run_command(ctx.tmux_run(cmd)?)?;
416
417 ctx.pb.set_message("started");
418
419 if let Err(e) = self.bootstrap_lakekeeper(ctx) {
421 eprintln!("Warning: Failed to bootstrap lakekeeper: {}", e.as_report());
422 eprintln!("You can bootstrap it manually later using the lakekeeper API");
423 } else {
424 if let Err(e) = self.create_default_warehouse(ctx) {
426 eprintln!(
427 "Warning: Failed to create default warehouse: {}",
428 e.as_report()
429 );
430 eprintln!("You can create it manually later using the lakekeeper API");
431 }
432 }
433
434 Ok(())
435 }
436
437 fn id(&self) -> String {
438 self.config.id.clone()
439 }
440}