risingwave_ctl/cmd_impl/hummock/
migrate_legacy_object.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use anyhow::anyhow;
18use futures::StreamExt;
19use futures::future::try_join_all;
20use risingwave_common::config::ObjectStoreConfig;
21use risingwave_hummock_sdk::{
22    VALID_OBJECT_ID_SUFFIXES, get_object_data_path, get_object_id_from_path,
23};
24use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
25use risingwave_object_store::object::prefix::opendal_engine::get_object_prefix;
26use risingwave_object_store::object::{
27    ObjectStoreImpl, OpendalObjectStore, build_remote_object_store,
28};
29
30pub async fn migrate_legacy_object(
31    url: String,
32    source_dir: String,
33    target_dir: String,
34    concurrency: u32,
35) -> anyhow::Result<()> {
36    let source_dir = source_dir.trim_end_matches('/');
37    let target_dir = target_dir.trim_end_matches('/');
38    println!("Normalized source_dir: {source_dir}.");
39    println!("Normalized target_dir: {target_dir}.");
40    if source_dir.is_empty() || target_dir.is_empty() {
41        return Err(anyhow!("the source_dir and target_dir must not be empty"));
42    }
43    if target_dir.starts_with(source_dir) {
44        return Err(anyhow!("the target_dir must not include source_dir"));
45    }
46    let mut config = ObjectStoreConfig::default();
47    config.s3.developer.use_opendal = true;
48    let store = build_remote_object_store(
49        &url,
50        ObjectStoreMetrics::unused().into(),
51        "migrate_legacy_object",
52        config.into(),
53    )
54    .await;
55    let ObjectStoreImpl::Opendal(opendal) = store else {
56        return Err(anyhow!("OpenDAL is required"));
57    };
58    let mut iter = opendal.list(source_dir, None, None).await?;
59    let mut count = 0;
60    println!("Migration is started: from {source_dir} to {target_dir}.");
61    let mut from_to = Vec::with_capacity(concurrency as usize);
62    let timer = Instant::now();
63    while let Some(object) = iter.next().await {
64        let object = object?;
65        if !VALID_OBJECT_ID_SUFFIXES
66            .iter()
67            .any(|suffix| object.key.ends_with(suffix))
68        {
69            let legacy_path = object.key;
70            assert_eq!(
71                &legacy_path[..source_dir.len()],
72                source_dir,
73                "{legacy_path} versus {source_dir}"
74            );
75            if legacy_path.ends_with('/') {
76                tracing::warn!(legacy_path, "skip directory");
77                continue;
78            }
79            let new_path = format!("{}{}", target_dir, &legacy_path[source_dir.len()..]);
80            from_to.push((legacy_path, new_path));
81        } else {
82            let object_id = get_object_id_from_path(&object.key);
83            let legacy_prefix = get_object_prefix(object_id.as_raw().inner(), false);
84            let legacy_path = get_object_data_path(&legacy_prefix, source_dir, object_id);
85            if object.key != legacy_path {
86                return Err(anyhow!(format!(
87                    "the source object store does not appear to be legacy: {} versus {}",
88                    object.key, legacy_path
89                )));
90            }
91            let new_path = get_object_data_path(
92                &get_object_prefix(object_id.as_raw().inner(), true),
93                target_dir,
94                object_id,
95            );
96            from_to.push((legacy_path, new_path));
97        }
98        count += 1;
99        if from_to.len() >= concurrency as usize {
100            copy(std::mem::take(&mut from_to).into_iter(), opendal.inner()).await?;
101        }
102    }
103    if !from_to.is_empty() {
104        copy(from_to.into_iter(), opendal.inner()).await?;
105    }
106    let cost = timer.elapsed();
107    println!(
108        "Migration is finished in {} seconds. {count} objects have been migrated from {source_dir} to {target_dir}.",
109        cost.as_secs()
110    );
111    Ok(())
112}
113
114async fn copy(
115    from_to: impl Iterator<Item = (String, String)>,
116    opendal: &OpendalObjectStore,
117) -> anyhow::Result<()> {
118    let futures = from_to.map(|(from_path, to_path)| async move {
119        println!("From {from_path} to {to_path}");
120        opendal.copy(&from_path, &to_path).await
121    });
122    try_join_all(futures).await?;
123    Ok(())
124}