From 811506aaa2b4f35de3415b6ba98c90200a0b1741 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 17 Feb 2025 22:07:31 +0200 Subject: [PATCH] fast_import: Use rust s3 client for uploading (#10777) This replaces the use of the awscli utility. awscli binary is massive, it added about 200 MB to the docker image size, while the s3 client was already a dependency so using that is essentially free, as far as binary size is concerned. I implemented a simple upload function that tries to keep 10 uploads going in parallel. I believe that's the default behavior of the "aws s3 sync" command too. --- Cargo.lock | 2 + compute/compute-node.Dockerfile | 26 ---- compute_tools/Cargo.toml | 2 + compute_tools/src/bin/fast_import.rs | 30 +++-- .../src/bin/fast_import/aws_s3_sync.rs | 116 +++++++++++++++--- 5 files changed, 122 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4f75fa5733..12c12bc771 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1303,6 +1303,7 @@ dependencies = [ "aws-config", "aws-sdk-kms", "aws-sdk-s3", + "aws-smithy-types", "axum", "base64 0.13.1", "bytes", @@ -1351,6 +1352,7 @@ dependencies = [ "utils", "uuid", "vm_monitor", + "walkdir", "workspace_hack", "zstd", ] diff --git a/compute/compute-node.Dockerfile b/compute/compute-node.Dockerfile index 1236372d27..082dea6f1b 100644 --- a/compute/compute-node.Dockerfile +++ b/compute/compute-node.Dockerfile @@ -1695,29 +1695,6 @@ RUN if [ "$TARGETARCH" = "amd64" ]; then\ && echo "${pgbouncer_exporter_sha256} pgbouncer_exporter" | sha256sum -c -\ && echo "${sql_exporter_sha256} sql_exporter" | sha256sum -c - -######################################################################################### -# -# Layer "awscli" -# -######################################################################################### -FROM build-deps AS awscli -ARG TARGETARCH -RUN set -ex; \ - if [ "${TARGETARCH}" = "amd64" ]; then \ - TARGETARCH_ALT="x86_64"; \ - CHECKSUM="c9a9df3770a3ff9259cb469b6179e02829687a464e0824d5c32d378820b53a00"; \ - elif [ "${TARGETARCH}" = "arm64" ]; then \ - TARGETARCH_ALT="aarch64"; \ - CHECKSUM="8181730be7891582b38b028112e81b4899ca817e8c616aad807c9e9d1289223a"; \ - else \ - echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \ - fi; \ - curl --retry 5 -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \ - echo "${CHECKSUM} /tmp/awscliv2.zip" | sha256sum -c -; \ - unzip /tmp/awscliv2.zip -d /tmp/awscliv2; \ - /tmp/awscliv2/aws/install; \ - rm -rf /tmp/awscliv2.zip /tmp/awscliv2 - ######################################################################################### # # Clean up postgres folder before inclusion @@ -1887,9 +1864,6 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \ mkdir /usr/local/download_extensions && \ chown -R postgres:postgres /usr/local/download_extensions -# aws cli is used by fast_import -COPY --from=awscli /usr/local/aws-cli /usr/local/aws-cli - # pgbouncer and its config COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer COPY --chmod=0666 --chown=postgres compute/etc/pgbouncer.ini /etc/pgbouncer.ini diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index b8828fa49f..81dcf99560 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -14,6 +14,7 @@ base64.workspace = true aws-config.workspace = true aws-sdk-s3.workspace = true aws-sdk-kms.workspace = true +aws-smithy-types.workspace = true anyhow.workspace = true axum = { workspace = true, features = [] } camino.workspace = true @@ -54,6 +55,7 @@ thiserror.workspace = true url.workspace = true uuid.workspace = true prometheus.workspace = true +walkdir.workspace = true postgres_initdb.workspace = true compute_api.workspace = true diff --git a/compute_tools/src/bin/fast_import.rs b/compute_tools/src/bin/fast_import.rs index 4c8d031532..614a93f48b 100644 --- a/compute_tools/src/bin/fast_import.rs +++ b/compute_tools/src/bin/fast_import.rs @@ -421,6 +421,7 @@ async fn run_dump_restore( #[allow(clippy::too_many_arguments)] async fn cmd_pgdata( + s3_client: Option, kms_client: Option, maybe_s3_prefix: Option, maybe_spec: Option, @@ -488,9 +489,13 @@ async fn cmd_pgdata( // Only sync if s3_prefix was specified if let Some(s3_prefix) = maybe_s3_prefix { info!("upload pgdata"); - aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/")) - .await - .context("sync dump directory to destination")?; + aws_s3_sync::upload_dir_recursive( + s3_client.as_ref().unwrap(), + Utf8Path::new(&pgdata_dir), + &s3_prefix.append("/pgdata/"), + ) + .await + .context("sync dump directory to destination")?; info!("write status"); { @@ -499,9 +504,13 @@ async fn cmd_pgdata( let status_file = status_dir.join("pgdata"); std::fs::write(&status_file, serde_json::json!({"done": true}).to_string()) .context("write status file")?; - aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/")) - .await - .context("sync status directory to destination")?; + aws_s3_sync::upload_dir_recursive( + s3_client.as_ref().unwrap(), + &status_dir, + &s3_prefix.append("/status/"), + ) + .await + .context("sync status directory to destination")?; } } @@ -573,18 +582,20 @@ pub(crate) async fn main() -> anyhow::Result<()> { let args = Args::parse(); // Initialize AWS clients only if s3_prefix is specified - let (aws_config, kms_client) = if args.s3_prefix.is_some() { + let (s3_client, kms_client) = if args.s3_prefix.is_some() { let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; + let s3_client = aws_sdk_s3::Client::new(&config); let kms = aws_sdk_kms::Client::new(&config); - (Some(config), Some(kms)) + (Some(s3_client), Some(kms)) } else { (None, None) }; let spec: Option = if let Some(s3_prefix) = &args.s3_prefix { let spec_key = s3_prefix.append("/spec.json"); - let s3_client = aws_sdk_s3::Client::new(aws_config.as_ref().unwrap()); let object = s3_client + .as_ref() + .unwrap() .get_object() .bucket(&spec_key.bucket) .key(spec_key.key) @@ -624,6 +635,7 @@ pub(crate) async fn main() -> anyhow::Result<()> { memory_mb, } => { cmd_pgdata( + s3_client, kms_client, args.s3_prefix, spec, diff --git a/compute_tools/src/bin/fast_import/aws_s3_sync.rs b/compute_tools/src/bin/fast_import/aws_s3_sync.rs index 5fa58c8f87..1be10b36d6 100644 --- a/compute_tools/src/bin/fast_import/aws_s3_sync.rs +++ b/compute_tools/src/bin/fast_import/aws_s3_sync.rs @@ -1,24 +1,102 @@ -use anyhow::Context; -use camino::Utf8Path; +use camino::{Utf8Path, Utf8PathBuf}; +use tokio::task::JoinSet; +use walkdir::WalkDir; use super::s3_uri::S3Uri; -pub(crate) async fn sync(local: &Utf8Path, remote: &S3Uri) -> anyhow::Result<()> { - let mut builder = tokio::process::Command::new("aws"); - builder - .arg("s3") - .arg("sync") - .arg(local.as_str()) - .arg(remote.to_string()); - let st = builder - .spawn() - .context("spawn aws s3 sync")? - .wait() - .await - .context("wait for aws s3 sync")?; - if st.success() { - Ok(()) - } else { - Err(anyhow::anyhow!("aws s3 sync failed")) +use tracing::{info, warn}; + +const MAX_PARALLEL_UPLOADS: usize = 10; + +/// Upload all files from 'local' to 'remote' +pub(crate) async fn upload_dir_recursive( + s3_client: &aws_sdk_s3::Client, + local: &Utf8Path, + remote: &S3Uri, +) -> anyhow::Result<()> { + // Recursively scan directory + let mut dirwalker = WalkDir::new(local) + .into_iter() + .map(|entry| { + let entry = entry?; + let file_type = entry.file_type(); + let path = <&Utf8Path>::try_from(entry.path())?.to_path_buf(); + Ok((file_type, path)) + }) + .filter_map(|e: anyhow::Result<(std::fs::FileType, Utf8PathBuf)>| { + match e { + Ok((file_type, path)) if file_type.is_file() => Some(Ok(path)), + Ok((file_type, _path)) if file_type.is_dir() => { + // The WalkDir iterator will recurse into directories, but we don't want + // to do anything with directories as such. There's no concept of uploading + // an empty directory to S3. + None + } + Ok((file_type, path)) if file_type.is_symlink() => { + // huh, didn't expect a symlink. Can't upload that to S3. Warn and skip. + warn!("cannot upload symlink ({})", path); + None + } + Ok((_file_type, path)) => { + // should not happen + warn!("directory entry has unexpected type ({})", path); + None + } + Err(e) => Some(Err(e)), + } + }); + + // Spawn upload tasks for each file, keeping MAX_PARALLEL_UPLOADS active in + // parallel. + let mut joinset = JoinSet::new(); + loop { + // Could we upload more? + while joinset.len() < MAX_PARALLEL_UPLOADS { + if let Some(full_local_path) = dirwalker.next() { + let full_local_path = full_local_path?; + let relative_local_path = full_local_path + .strip_prefix(local) + .expect("all paths start from the walkdir root"); + let remote_path = remote.append(relative_local_path.as_str()); + info!( + "starting upload of {} to {}", + &full_local_path, &remote_path + ); + let upload_task = upload_file(s3_client.clone(), full_local_path, remote_path); + joinset.spawn(upload_task); + } else { + info!("draining upload tasks"); + break; + } + } + + // Wait for an upload to complete + if let Some(res) = joinset.join_next().await { + let _ = res?; + } else { + // all done! + break; + } } + Ok(()) +} + +pub(crate) async fn upload_file( + s3_client: aws_sdk_s3::Client, + local_path: Utf8PathBuf, + remote: S3Uri, +) -> anyhow::Result<()> { + use aws_smithy_types::byte_stream::ByteStream; + let stream = ByteStream::from_path(&local_path).await?; + + let _result = s3_client + .put_object() + .bucket(remote.bucket) + .key(&remote.key) + .body(stream) + .send() + .await?; + info!("upload of {} to {} finished", &local_path, &remote.key); + + Ok(()) }