risedev/task/
lakekeeper_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::env;
16use std::path::{Path, PathBuf};
17use std::process::Command;
18
19use anyhow::{Context, Result, anyhow};
20use serde_json::json;
21use sqlx::ConnectOptions;
22use thiserror_ext::AsReport;
23
24use super::{ExecuteContext, Task};
25use crate::LakekeeperConfig;
26use crate::util::stylized_risedev_subcmd;
27
28pub struct LakekeeperService {
29    config: LakekeeperConfig,
30}
31
32impl LakekeeperService {
33    pub fn new(config: LakekeeperConfig) -> Result<Self> {
34        Ok(Self { config })
35    }
36
37    fn lakekeeper_path(&self) -> Result<PathBuf> {
38        let prefix_bin = env::var("PREFIX_BIN")?;
39        Ok(Path::new(&prefix_bin).join("lakekeeper"))
40    }
41
42    fn lakekeeper(&self) -> Result<Command> {
43        Ok(Command::new(self.lakekeeper_path()?))
44    }
45
46    fn base_uri(&self) -> String {
47        format!("http://{}:{}", &self.config.address, self.config.port)
48    }
49
50    /// Apply command args according to config
51    pub fn apply_command_args(cmd: &mut Command, config: &LakekeeperConfig) -> Result<()> {
52        // Set basic environment variables
53        let base_uri = format!("http://{}:{}", &config.address, config.port);
54        cmd.env("LAKEKEEPER__BASE_URI", &base_uri)
55            .env("LAKEKEEPER__LISTEN_PORT", config.port.to_string())
56            .env("LAKEKEEPER__PG_ENCRYPTION_KEY", &config.encryption_key);
57
58        // Configure database backend
59        if let Some(postgres_configs) = &config.provide_postgres_backend
60            && let Some(pg_config) = postgres_configs.first()
61        {
62            let database_url = format!(
63                "postgres://{}:{}@{}:{}/{}",
64                pg_config.user, pg_config.password, pg_config.address, pg_config.port, "lakekeeper"
65            );
66            cmd.env("LAKEKEEPER__PG_DATABASE_URL_READ", &database_url)
67                .env("LAKEKEEPER__PG_DATABASE_URL_WRITE", &database_url);
68        }
69
70        Ok(())
71    }
72
73    fn initialize_lakekeeper_database(
74        &self,
75        ctx: &mut ExecuteContext<impl std::io::Write>,
76    ) -> Result<()> {
77        if let Some(postgres_configs) = &self.config.provide_postgres_backend
78            && let Some(pg_config) = postgres_configs.first()
79        {
80            // Wait for PostgreSQL to be ready first
81            ctx.pb.set_message("waiting for PostgreSQL to be ready...");
82            let mut tcp_check = crate::TcpReadyCheckTask::new(
83                pg_config.address.clone(),
84                pg_config.port,
85                pg_config.user_managed,
86            )?;
87            tcp_check.execute(ctx)?;
88
89            // Give PostgreSQL a bit more time to fully initialize after TCP is ready
90            std::thread::sleep(std::time::Duration::from_secs(2));
91
92            let rt = tokio::runtime::Builder::new_current_thread()
93                .enable_all()
94                .build()?;
95
96            let db_name = "lakekeeper";
97            let host = pg_config.address.clone();
98            let port = pg_config.port;
99            let username = pg_config.user.clone();
100            let password = pg_config.password.clone();
101
102            rt.block_on(async move {
103                use sqlx::postgres::*;
104                use tokio::time::{sleep, Duration};
105                let options = PgConnectOptions::new()
106                    .host(&host)
107                    .port(port)
108                    .username(&username)
109                    .password(&password)
110                    .database("template1")
111                    .ssl_mode(sqlx::postgres::PgSslMode::Prefer);
112
113                // Retry connection with exponential backoff
114                let mut attempts = 0;
115                let max_attempts = 5;
116                let mut conn = loop {
117                    match options.connect().await {
118                        Ok(conn) => break conn,
119                        Err(_) if attempts < max_attempts => {
120                            attempts += 1;
121                            let delay = Duration::from_millis(500 * (1 << attempts)); // 1s, 2s, 4s, 8s, 16s
122                            sleep(delay).await;
123                        }
124                        Err(e) => {
125                            return Err(e).context("failed to connect to template database for lakekeeper after retries")?;
126                        }
127                    }
128                };
129
130                // Check if database exists before creating it
131                let db_exists_query = format!(
132                    "SELECT 1 FROM pg_database WHERE datname = '{}';",
133                    db_name
134                );
135                let db_exists = match sqlx::raw_sql(&db_exists_query)
136                    .fetch_one(&mut conn)
137                    .await
138                {
139                    Ok(_) => true,
140                    Err(sqlx::Error::RowNotFound) => false,
141                    Err(e) => return Err(e.into()),
142                };
143
144                if !db_exists {
145                    // Create database only if it doesn't exist
146                    // Intentionally not executing in a transaction because Postgres does not allow it.
147                    sqlx::raw_sql(&format!("CREATE DATABASE {};", db_name))
148                        .execute(&mut conn)
149                        .await?;
150                } else {
151                    // Database already exists, skipping creation
152                }
153
154                Ok::<_, anyhow::Error>(())
155            })
156            .context("failed to initialize lakekeeper database")?;
157        }
158
159        Ok(())
160    }
161
162    fn run_migrate(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> Result<()> {
163        ctx.pb.set_message("running database migration...");
164        let mut cmd = self.lakekeeper()?;
165        Self::apply_command_args(&mut cmd, &self.config)?;
166        cmd.arg("migrate");
167        ctx.run_command(cmd)?;
168        Ok(())
169    }
170
171    fn bootstrap_lakekeeper(&self, ctx: &mut ExecuteContext<impl std::io::Write>) -> Result<()> {
172        ctx.pb.set_message("bootstrapping lakekeeper...");
173
174        // Wait for lakekeeper service to be ready
175        std::thread::sleep(std::time::Duration::from_secs(3));
176
177        let bootstrap_config = json!({
178            "accept-terms-of-use": true,
179            "is-operator": true
180        });
181
182        let rt = tokio::runtime::Builder::new_current_thread()
183            .enable_all()
184            .build()?;
185
186        let bootstrap_json = bootstrap_config.to_string();
187        let lakekeeper_url = format!("{}/management/v1/bootstrap", self.base_uri());
188
189        rt.block_on(async move {
190            use tokio::time::{sleep, Duration};
191
192            // Retry bootstrap with exponential backoff
193            let mut attempts = 0;
194            let max_attempts = 5;
195
196            loop {
197                let client = reqwest::Client::new();
198                match client
199                    .post(&lakekeeper_url)
200                    .header("Content-Type", "application/json")
201                    .body(bootstrap_json.clone())
202                    .send()
203                    .await
204                {
205                    Ok(response) => {
206                        if response.status().is_success() {
207                            let _response_text = response.text().await.unwrap_or_default();
208                            // Successfully bootstrapped lakekeeper
209                            break;
210                        } else if response.status() == reqwest::StatusCode::CONFLICT || response.status() == reqwest::StatusCode::BAD_REQUEST {
211                            let error_text = response.text().await.unwrap_or_default();
212                            if error_text.contains("already bootstrapped") || error_text.contains("Server already initialized") {
213                                // Lakekeeper already bootstrapped, skipping
214                                break;
215                            } else {
216                                eprintln!("Bootstrap conflict: {}", error_text);
217                                break;
218                            }
219                        } else {
220                            let status = response.status();
221                            let error_text = response.text().await.unwrap_or_default();
222                            eprintln!("Failed to bootstrap lakekeeper ({}): {}", status, error_text);
223
224                            if attempts >= max_attempts {
225                                break;
226                            }
227                        }
228                    }
229                    Err(e) => {
230                        if attempts >= max_attempts {
231                            eprintln!("Failed to bootstrap lakekeeper after {} attempts: {}", max_attempts + 1, e.as_report());
232                            break;
233                        }
234
235                        attempts += 1;
236                        let delay = Duration::from_millis(1000 * (1 << attempts)); // 2s, 4s, 8s, 16s, 32s
237                        eprintln!("Failed to connect to lakekeeper for bootstrap, retrying in {}s... (attempt {}/{})",
238                               delay.as_secs(), attempts, max_attempts + 1);
239                        sleep(delay).await;
240                    }
241                }
242            }
243
244            Ok::<_, anyhow::Error>(())
245        }).context("failed to bootstrap lakekeeper")?;
246
247        Ok(())
248    }
249
250    fn create_default_warehouse(
251        &self,
252        ctx: &mut ExecuteContext<impl std::io::Write>,
253    ) -> Result<()> {
254        ctx.pb.set_message("creating default warehouse...");
255
256        // Brief wait after bootstrap
257        std::thread::sleep(std::time::Duration::from_secs(1));
258
259        let warehouse_config = if let Some(minio_configs) = &self.config.provide_minio
260            && let Some(minio_config) = minio_configs.first()
261        {
262            json!({
263                "warehouse-name": "risingwave-warehouse",
264                "storage-profile": {
265                    "type": "s3",
266                    "bucket": "hummock001",
267                    "key-prefix": "risingwave-lakekeeper",
268                    "region": "us-east-1",
269                    "sts-enabled": false,
270                    "flavor": "s3-compat",
271                    "endpoint": format!("http://{}:{}/", minio_config.address, minio_config.port),
272                    "path-style-access": true
273                },
274                "storage-credential": {
275                    "type": "s3",
276                    "credential-type": "access-key",
277                    "aws-access-key-id": minio_config.root_user,
278                    "aws-secret-access-key": minio_config.root_password
279                }
280            })
281        } else {
282            // Default S3 configuration if no MinIO is configured
283            json!({
284                "warehouse-name": "risingwave-warehouse",
285                "storage-profile": {
286                    "type": "s3",
287                    "bucket": "hummock001",
288                    "key-prefix": "risingwave-lakekeeper",
289                    "region": "us-east-1",
290                    "sts-enabled": false,
291                    "flavor": "s3-compat",
292                    "endpoint": "http://127.0.0.1:9301/",
293                    "path-style-access": true
294                },
295                "storage-credential": {
296                    "type": "s3",
297                    "credential-type": "access-key",
298                    "aws-access-key-id": "hummockadmin",
299                    "aws-secret-access-key": "hummockadmin"
300                }
301            })
302        };
303
304        let rt = tokio::runtime::Builder::new_current_thread()
305            .enable_all()
306            .build()?;
307
308        let warehouse_json = warehouse_config.to_string();
309        let lakekeeper_url = format!("{}/management/v1/warehouse", self.base_uri());
310
311        rt.block_on(async move {
312            use tokio::time::{Duration, sleep};
313
314            // Retry warehouse creation with exponential backoff
315            let mut attempts = 0;
316            let max_attempts = 5;
317
318            loop {
319                let client = reqwest::Client::new();
320                match client
321                    .post(&lakekeeper_url)
322                    .header("Content-Type", "application/json")
323                    .body(warehouse_json.clone())
324                    .send()
325                    .await
326                {
327                    Ok(response) => {
328                        if response.status().is_success() {
329                            let _response_text = response.text().await.unwrap_or_default();
330                            // Successfully created warehouse
331                            break;
332                        } else if response.status() == reqwest::StatusCode::BAD_REQUEST {
333                            let error_text = response.text().await.unwrap_or_default();
334                            if error_text.contains("Storage profile overlaps") {
335                                // Warehouse already exists, skipping creation
336                                break;
337                            } else {
338                                eprintln!("Failed to create warehouse: {}", error_text);
339                                break;
340                            }
341                        } else {
342                            let status = response.status();
343                            let error_text = response.text().await.unwrap_or_default();
344                            eprintln!("Failed to create warehouse ({}): {}", status, error_text);
345
346                            if attempts >= max_attempts {
347                                break;
348                            }
349                        }
350                    }
351                    Err(e) => {
352                        if attempts >= max_attempts {
353                            eprintln!(
354                                "Failed to create warehouse after {} attempts: {}",
355                                max_attempts + 1,
356                                e.as_report()
357                            );
358                            break;
359                        }
360
361                        attempts += 1;
362                        let delay = Duration::from_millis(1000 * (1 << attempts)); // 2s, 4s, 8s, 16s, 32s
363                        eprintln!(
364                            "Failed to connect to lakekeeper, retrying in {}s... (attempt {}/{})",
365                            delay.as_secs(),
366                            attempts,
367                            max_attempts + 1
368                        );
369                        sleep(delay).await;
370                    }
371                }
372            }
373
374            Ok::<_, anyhow::Error>(())
375        })
376        .context("failed to create default warehouse")?;
377
378        Ok(())
379    }
380}
381
382impl Task for LakekeeperService {
383    fn execute(&mut self, ctx: &mut ExecuteContext<impl std::io::Write>) -> anyhow::Result<()> {
384        ctx.service(self);
385        ctx.pb.set_message("starting...");
386
387        let path = self.lakekeeper_path()?;
388        if !path.exists() {
389            return Err(anyhow!(
390                "lakekeeper binary not found in {:?}\nDid you enable lakekeeper feature in `{}`?",
391                path,
392                stylized_risedev_subcmd("configure")
393            ));
394        }
395
396        // Initialize and migrate database if using postgres backend
397        if self.config.provide_postgres_backend.is_some() {
398            ctx.pb.set_message("initializing lakekeeper database...");
399            self.initialize_lakekeeper_database(ctx)?;
400            self.run_migrate(ctx)?;
401        }
402
403        let mut cmd = self.lakekeeper()?;
404        Self::apply_command_args(&mut cmd, &self.config)?;
405        cmd.arg("serve");
406
407        let prefix_config = env::var("PREFIX_CONFIG")?;
408        let data_path = Path::new(&env::var("PREFIX_DATA")?).join(self.id());
409        fs_err::create_dir_all(&data_path)?;
410
411        // Create config directory
412        let config_dir = Path::new(&prefix_config).join("lakekeeper");
413        fs_err::create_dir_all(&config_dir)?;
414
415        ctx.run_command(ctx.tmux_run(cmd)?)?;
416
417        ctx.pb.set_message("started");
418
419        // Bootstrap lakekeeper after service starts
420        if let Err(e) = self.bootstrap_lakekeeper(ctx) {
421            eprintln!("Warning: Failed to bootstrap lakekeeper: {}", e.as_report());
422            eprintln!("You can bootstrap it manually later using the lakekeeper API");
423        } else {
424            // Create default warehouse after successful bootstrap
425            if let Err(e) = self.create_default_warehouse(ctx) {
426                eprintln!(
427                    "Warning: Failed to create default warehouse: {}",
428                    e.as_report()
429                );
430                eprintln!("You can create it manually later using the lakekeeper API");
431            }
432        }
433
434        Ok(())
435    }
436
437    fn id(&self) -> String {
438        self.config.id.clone()
439    }
440}