risingwave_ctl/cmd_impl/hummock/
migrate_legacy_object.rs1use std::time::Instant;
16
17use anyhow::anyhow;
18use futures::StreamExt;
19use futures::future::try_join_all;
20use risingwave_common::config::ObjectStoreConfig;
21use risingwave_hummock_sdk::{
22 VALID_OBJECT_ID_SUFFIXES, get_object_data_path, get_object_id_from_path,
23};
24use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
25use risingwave_object_store::object::prefix::opendal_engine::get_object_prefix;
26use risingwave_object_store::object::{
27 ObjectStoreImpl, OpendalObjectStore, build_remote_object_store,
28};
29
30pub async fn migrate_legacy_object(
31 url: String,
32 source_dir: String,
33 target_dir: String,
34 concurrency: u32,
35) -> anyhow::Result<()> {
36 let source_dir = source_dir.trim_end_matches('/');
37 let target_dir = target_dir.trim_end_matches('/');
38 println!("Normalized source_dir: {source_dir}.");
39 println!("Normalized target_dir: {target_dir}.");
40 if source_dir.is_empty() || target_dir.is_empty() {
41 return Err(anyhow!("the source_dir and target_dir must not be empty"));
42 }
43 if target_dir.starts_with(source_dir) {
44 return Err(anyhow!("the target_dir must not include source_dir"));
45 }
46 let mut config = ObjectStoreConfig::default();
47 config.s3.developer.use_opendal = true;
48 let store = build_remote_object_store(
49 &url,
50 ObjectStoreMetrics::unused().into(),
51 "migrate_legacy_object",
52 config.into(),
53 )
54 .await;
55 let ObjectStoreImpl::Opendal(opendal) = store else {
56 return Err(anyhow!("OpenDAL is required"));
57 };
58 let mut iter = opendal.list(source_dir, None, None).await?;
59 let mut count = 0;
60 println!("Migration is started: from {source_dir} to {target_dir}.");
61 let mut from_to = Vec::with_capacity(concurrency as usize);
62 let timer = Instant::now();
63 while let Some(object) = iter.next().await {
64 let object = object?;
65 if !VALID_OBJECT_ID_SUFFIXES
66 .iter()
67 .any(|suffix| object.key.ends_with(suffix))
68 {
69 let legacy_path = object.key;
70 assert_eq!(
71 &legacy_path[..source_dir.len()],
72 source_dir,
73 "{legacy_path} versus {source_dir}"
74 );
75 if legacy_path.ends_with('/') {
76 tracing::warn!(legacy_path, "skip directory");
77 continue;
78 }
79 let new_path = format!("{}{}", target_dir, &legacy_path[source_dir.len()..]);
80 from_to.push((legacy_path, new_path));
81 } else {
82 let object_id = get_object_id_from_path(&object.key);
83 let legacy_prefix = get_object_prefix(object_id.as_raw().inner(), false);
84 let legacy_path = get_object_data_path(&legacy_prefix, source_dir, object_id);
85 if object.key != legacy_path {
86 return Err(anyhow!(format!(
87 "the source object store does not appear to be legacy: {} versus {}",
88 object.key, legacy_path
89 )));
90 }
91 let new_path = get_object_data_path(
92 &get_object_prefix(object_id.as_raw().inner(), true),
93 target_dir,
94 object_id,
95 );
96 from_to.push((legacy_path, new_path));
97 }
98 count += 1;
99 if from_to.len() >= concurrency as usize {
100 copy(std::mem::take(&mut from_to).into_iter(), opendal.inner()).await?;
101 }
102 }
103 if !from_to.is_empty() {
104 copy(from_to.into_iter(), opendal.inner()).await?;
105 }
106 let cost = timer.elapsed();
107 println!(
108 "Migration is finished in {} seconds. {count} objects have been migrated from {source_dir} to {target_dir}.",
109 cost.as_secs()
110 );
111 Ok(())
112}
113
114async fn copy(
115 from_to: impl Iterator<Item = (String, String)>,
116 opendal: &OpendalObjectStore,
117) -> anyhow::Result<()> {
118 let futures = from_to.map(|(from_path, to_path)| async move {
119 println!("From {from_path} to {to_path}");
120 opendal.copy(&from_path, &to_path).await
121 });
122 try_join_all(futures).await?;
123 Ok(())
124}