diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 99f7e53528..c386d4cca4 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -63,12 +63,15 @@ pub(crate) enum LayerCmd { #[clap(long)] new_timeline_id: Option, }, - Compress { + CompressOne { dest_path: Utf8PathBuf, - layer_file_path: Option, - tenant_remote_prefix: Option, - tenant_remote_config: Option, - layers_dir: Option, + layer_file_path: Utf8PathBuf, + }, + CompressMany { + dest_path: Utf8PathBuf, + tenant_remote_prefix: String, + tenant_remote_config: String, + layers_dir: Utf8PathBuf, parallelism: Option, }, } @@ -256,9 +259,25 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { anyhow::bail!("not an image or delta layer: {layer_file_path}"); } - LayerCmd::Compress { + LayerCmd::CompressOne { dest_path, layer_file_path, + } => { + pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); + pageserver::page_cache::init(100); + + let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); + + let stats = + ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?; + println!( + "Statistics: {stats:#?}\n{}", + serde_json::to_string(&stats).unwrap() + ); + Ok(()) + } + LayerCmd::CompressMany { + dest_path, tenant_remote_prefix, tenant_remote_config, layers_dir, @@ -267,103 +286,83 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { pageserver::virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs); pageserver::page_cache::init(100); - let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); + let toml_document = toml_edit::Document::from_str(tenant_remote_config)?; + let toml_item = toml_document + .get("remote_storage") + .expect("need remote_storage"); + let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config"); + let storage = remote_storage::GenericRemoteStorage::from_config(&config)?; + let storage = Arc::new(storage); - if let Some(layer_file_path) = layer_file_path { - let stats = - ImageLayer::compression_statistics(dest_path, layer_file_path, &ctx).await?; - println!( - "Statistics: {stats:#?}\n{}", - serde_json::to_string(&stats).unwrap() - ); - } else if let ( - Some(tenant_remote_prefix), - Some(tenant_remote_config), - Some(layers_dir), - ) = (tenant_remote_prefix, tenant_remote_config, layers_dir) - { - let toml_document = toml_edit::Document::from_str(tenant_remote_config)?; - let toml_item = toml_document - .get("remote_storage") - .expect("need remote_storage"); - let config = RemoteStorageConfig::from_toml(toml_item)?.expect("incomplete config"); - let storage = remote_storage::GenericRemoteStorage::from_config(&config)?; - let storage = Arc::new(storage); + let cancel = CancellationToken::new(); + let path = RemotePath::from_string(tenant_remote_prefix)?; + let max_files = NonZeroU32::new(16000); + let files_list = storage + .list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel) + .await?; - let cancel = CancellationToken::new(); - let path = RemotePath::from_string(tenant_remote_prefix)?; - let max_files = NonZeroU32::new(16000); - let files_list = storage - .list(Some(&path), ListingMode::NoDelimiter, max_files, &cancel) - .await?; + let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize)); - let semaphore = Arc::new(Semaphore::new(parallelism.unwrap_or(1) as usize)); - - let mut tasks = JoinSet::new(); - for file_key in files_list.keys.iter() { - let Some(file_name) = file_key.object_name() else { - continue; - }; - let Ok(LayerName::Image(_layer_file_name)) = LayerName::from_str(file_name) - else { - // Skipping because it's either not a layer or a delta layer - continue; - }; - let json_file_path = layers_dir.join(format!("{file_name}.json")); - if tokio::fs::try_exists(&json_file_path).await? { - // If we have already created a report for the layer, skip it. - continue; - } - let local_layer_path = layers_dir.join(file_name); - async fn stats( - semaphore: Arc, - local_layer_path: Utf8PathBuf, - json_file_path: Utf8PathBuf, - dest_path: Utf8PathBuf, - storage: Arc, - file_key: RemotePath, - ) -> Result, u64)>, anyhow::Error> - { - let _permit = semaphore.acquire().await?; - let cancel = CancellationToken::new(); - let download = storage.download(&file_key, &cancel).await?; - let mut dest_layer_file = - tokio::fs::File::create(&local_layer_path).await?; - let mut body = tokio_util::io::StreamReader::new(download.download_stream); - let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?; - let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); - let stats = - ImageLayer::compression_statistics(&dest_path, &local_layer_path, &ctx) - .await?; - - let stats_str = serde_json::to_string(&stats).unwrap(); - tokio::fs::write(json_file_path, stats_str).await?; - - tokio::fs::remove_file(&local_layer_path).await?; - Ok(stats) - } - let semaphore = semaphore.clone(); - let file_key = file_key.to_owned(); - let storage = storage.clone(); - let dest_path = dest_path.to_owned(); - let file_name = file_name.to_owned(); - tasks.spawn(async move { - let stats = stats( - semaphore, - local_layer_path.to_owned(), - json_file_path.to_owned(), - dest_path, - storage, - file_key, - ) - .await; - println!("Statistics for {file_name}: {stats:#?}\n"); - }); + let mut tasks = JoinSet::new(); + for file_key in files_list.keys.iter() { + let Some(file_name) = file_key.object_name() else { + continue; + }; + let Ok(LayerName::Image(_layer_file_name)) = LayerName::from_str(file_name) else { + // Skipping because it's either not a layer or a delta layer + continue; + }; + let json_file_path = layers_dir.join(format!("{file_name}.json")); + if tokio::fs::try_exists(&json_file_path).await? { + // If we have already created a report for the layer, skip it. + continue; } - while let Some(_res) = tasks.join_next().await {} - } else { - anyhow::bail!("No tenant dir or remote config or layers dir specified"); + let local_layer_path = layers_dir.join(file_name); + async fn stats( + semaphore: Arc, + local_layer_path: Utf8PathBuf, + json_file_path: Utf8PathBuf, + dest_path: Utf8PathBuf, + storage: Arc, + file_key: RemotePath, + ) -> Result, u64)>, anyhow::Error> + { + let _permit = semaphore.acquire().await?; + let cancel = CancellationToken::new(); + let download = storage.download(&file_key, &cancel).await?; + let mut dest_layer_file = tokio::fs::File::create(&local_layer_path).await?; + let mut body = tokio_util::io::StreamReader::new(download.download_stream); + let _size = tokio::io::copy_buf(&mut body, &mut dest_layer_file).await?; + let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); + let stats = + ImageLayer::compression_statistics(&dest_path, &local_layer_path, &ctx) + .await?; + + let stats_str = serde_json::to_string(&stats).unwrap(); + tokio::fs::write(json_file_path, stats_str).await?; + + tokio::fs::remove_file(&local_layer_path).await?; + Ok(stats) + } + let semaphore = semaphore.clone(); + let file_key = file_key.to_owned(); + let storage = storage.clone(); + let dest_path = dest_path.to_owned(); + let file_name = file_name.to_owned(); + tasks.spawn(async move { + let stats = stats( + semaphore, + local_layer_path.to_owned(), + json_file_path.to_owned(), + dest_path, + storage, + file_key, + ) + .await; + println!("Statistics for {file_name}: {stats:#?}\n"); + }); } + while let Some(_res) = tasks.join_next().await {} Ok(()) }