Introduce aws-sdk-rust as rusoto S3 replacement (#2802)

- `aws-smithy-http`: Needed because of `SdkBody` see
https://github.com/awslabs/smithy-rs/issues/1759
- `aws-types`: Needed because of `SharedCredentialsProvider`, the
recommended way from aws is something like
`aws_config::from_env().region("us-east-1").load().await` but that is
problematic because of:

- `sync -> async ` in the creation of S3Client and i don't want to
change the signature of any method in this class.
- We do not need the four default steps in
https://github.com/awslabs/aws-sdk-rust/blob/main/sdk/aws-config/src/default_provider/credentials.rs#L235

- `Hyper`: Similar to what's currently doing Rusoto in
https://github.com/rusoto/rusoto/blob/master/rusoto/signature/src/signature.rs#L59
to stream the body, see also
https://github.com/awslabs/aws-sdk-rust/discussions/361

Co-authored-by: andres <andres.rodriguez@outlook.es>
This commit is contained in:
Andrés
2022-11-16 10:28:37 +01:00
committed by GitHub
parent 1d105727cb
commit c4b417ecdb
4 changed files with 478 additions and 389 deletions

View File

@@ -9,8 +9,11 @@ async-trait = "0.1"
metrics = { version = "0.1", path = "../metrics" }
utils = { version = "0.1", path = "../utils" }
once_cell = "1.13.0"
rusoto_core = "0.48"
rusoto_s3 = "0.48"
aws-smithy-http = "0.51.0"
aws-types = "0.51.0"
aws-config = { version = "0.51.0", default-features = false }
aws-sdk-s3 = "0.21.0"
hyper = { version = "0.14", features = ["stream"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
tokio = { version = "1.17", features = ["sync", "macros", "fs", "io-util"] }

View File

@@ -5,27 +5,32 @@
//! their bucket prefixes are both specified and different.
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::Context;
use rusoto_core::{
credential::{InstanceMetadataProvider, StaticProvider},
HttpClient, Region, RusotoError,
use aws_config::{
environment::credentials::EnvironmentVariableCredentialsProvider,
imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain,
};
use rusoto_s3::{
DeleteObjectRequest, GetObjectError, GetObjectRequest, ListObjectsV2Request, PutObjectRequest,
S3Client, StreamingBody, S3,
use aws_sdk_s3::{
config::Config,
error::{GetObjectError, GetObjectErrorKind},
types::{ByteStream, SdkError},
Client, Endpoint, Region,
};
use aws_smithy_http::body::SdkBody;
use aws_types::credentials::SharedCredentialsProvider;
use hyper::Body;
use tokio::{io, sync::Semaphore};
use tokio_util::io::ReaderStream;
use tracing::debug;
use super::StorageMetadata;
use crate::{
strip_path_prefix, Download, DownloadError, RemoteObjectId, RemoteStorage, S3Config,
REMOTE_STORAGE_PREFIX_SEPARATOR,
};
use super::StorageMetadata;
pub(super) mod metrics {
use metrics::{register_int_counter_vec, IntCounterVec};
use once_cell::sync::Lazy;
@@ -116,7 +121,7 @@ fn download_destination(
/// AWS S3 storage.
pub struct S3Bucket {
workdir: PathBuf,
client: S3Client,
client: Client,
bucket_name: String,
prefix_in_bucket: Option<String>,
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
@@ -125,6 +130,13 @@ pub struct S3Bucket {
concurrency_limiter: Semaphore,
}
#[derive(Default)]
struct GetObjectRequest {
bucket: String,
key: String,
range: Option<String>,
}
impl S3Bucket {
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
pub fn new(aws_config: &S3Config, workdir: PathBuf) -> anyhow::Result<Self> {
@@ -132,43 +144,25 @@ impl S3Bucket {
"Creating s3 remote storage for S3 bucket {}",
aws_config.bucket_name
);
let region = match aws_config.endpoint.clone() {
Some(custom_endpoint) => Region::Custom {
name: aws_config.bucket_region.clone(),
endpoint: custom_endpoint,
},
None => aws_config
.bucket_region
.parse::<Region>()
.context("Failed to parse the s3 region from config")?,
};
let request_dispatcher = HttpClient::new().context("Failed to create S3 http client")?;
let provider = CredentialsProviderChain::first_try(
"Environment",
EnvironmentVariableCredentialsProvider::new(),
)
.or_else("IAM", ImdsCredentialsProvider::builder().build());
let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
// session token is used when authorizing through sso
// which is typically the case when testing locally on developer machine
let session_token = std::env::var("AWS_SESSION_TOKEN").ok();
let mut config_builder = Config::builder()
.region(Region::new(aws_config.bucket_region.clone()))
.credentials_provider(SharedCredentialsProvider::new(provider));
let client = if access_key_id.is_none() && secret_access_key.is_none() {
debug!("Using IAM-based AWS access");
S3Client::new_with(request_dispatcher, InstanceMetadataProvider::new(), region)
} else {
debug!(
"Using credentials-based AWS access. Session token is set: {}",
session_token.is_some()
if let Some(custom_endpoint) = aws_config.endpoint.clone() {
let endpoint = Endpoint::immutable(
custom_endpoint
.parse()
.expect("Failed to parse S3 custom endpoint"),
);
S3Client::new_with(
request_dispatcher,
StaticProvider::new(
access_key_id.unwrap_or_default(),
secret_access_key.unwrap_or_default(),
session_token,
None,
),
region,
)
};
config_builder.set_endpoint_resolver(Some(Arc::new(endpoint)));
}
let client = Client::from_conf(config_builder.build());
let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
let mut prefix = prefix;
@@ -202,20 +196,33 @@ impl S3Bucket {
metrics::inc_get_object();
match self.client.get_object(request).await {
Ok(object_output) => match object_output.body {
None => {
metrics::inc_get_object_fail();
Err(DownloadError::Other(anyhow::anyhow!(
"Got no body for the S3 object given"
)))
}
Some(body) => Ok(Download {
metadata: object_output.metadata.map(StorageMetadata),
download_stream: Box::pin(io::BufReader::new(body.into_async_read())),
}),
},
Err(RusotoError::Service(GetObjectError::NoSuchKey(_))) => Err(DownloadError::NotFound),
let get_object = self
.client
.get_object()
.bucket(request.bucket)
.key(request.key)
.set_range(request.range)
.send()
.await;
match get_object {
Ok(object_output) => {
let metadata = object_output.metadata().cloned().map(StorageMetadata);
Ok(Download {
metadata,
download_stream: Box::pin(io::BufReader::new(
object_output.body.into_async_read(),
)),
})
}
Err(SdkError::ServiceError {
err:
GetObjectError {
kind: GetObjectErrorKind::NoSuchKey(..),
..
},
..
}) => Err(DownloadError::NotFound),
Err(e) => {
metrics::inc_get_object_fail();
Err(DownloadError::Other(anyhow::anyhow!(
@@ -261,12 +268,11 @@ impl RemoteStorage for S3Bucket {
let fetch_response = self
.client
.list_objects_v2(ListObjectsV2Request {
bucket: self.bucket_name.clone(),
prefix: self.prefix_in_bucket.clone(),
continuation_token,
..ListObjectsV2Request::default()
})
.list_objects_v2()
.bucket(self.bucket_name.clone())
.set_prefix(self.prefix_in_bucket.clone())
.set_continuation_token(continuation_token)
.send()
.await
.map_err(|e| {
metrics::inc_list_objects_fail();
@@ -322,13 +328,12 @@ impl RemoteStorage for S3Bucket {
let fetch_response = self
.client
.list_objects_v2(ListObjectsV2Request {
bucket: self.bucket_name.clone(),
prefix: list_prefix.clone(),
continuation_token,
delimiter: Some(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()),
..ListObjectsV2Request::default()
})
.list_objects_v2()
.bucket(self.bucket_name.clone())
.set_prefix(list_prefix.clone())
.set_continuation_token(continuation_token)
.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string())
.send()
.await
.map_err(|e| {
metrics::inc_list_objects_fail();
@@ -366,17 +371,18 @@ impl RemoteStorage for S3Bucket {
.context("Concurrency limiter semaphore got closed during S3 upload")?;
metrics::inc_put_object();
let body = Body::wrap_stream(ReaderStream::new(from));
let bytes_stream = ByteStream::new(SdkBody::from(body));
self.client
.put_object(PutObjectRequest {
body: Some(StreamingBody::new_with_size(
ReaderStream::new(from),
from_size_bytes,
)),
bucket: self.bucket_name.clone(),
key: to.0.to_owned(),
metadata: metadata.map(|m| m.0),
..PutObjectRequest::default()
})
.put_object()
.bucket(self.bucket_name.clone())
.key(to.0.to_owned())
.set_metadata(metadata.map(|m| m.0))
.content_length(from_size_bytes.try_into()?)
.body(bytes_stream)
.send()
.await
.map_err(|e| {
metrics::inc_put_object_fail();
@@ -412,7 +418,6 @@ impl RemoteStorage for S3Bucket {
bucket: self.bucket_name.clone(),
key: from.0.to_owned(),
range,
..GetObjectRequest::default()
})
.await
}
@@ -427,11 +432,10 @@ impl RemoteStorage for S3Bucket {
metrics::inc_delete_object();
self.client
.delete_object(DeleteObjectRequest {
bucket: self.bucket_name.clone(),
key: remote_object_id.0.to_owned(),
..DeleteObjectRequest::default()
})
.delete_object()
.bucket(self.bucket_name.clone())
.key(remote_object_id.0.to_owned())
.send()
.await
.map_err(|e| {
metrics::inc_delete_object_fail();
@@ -600,7 +604,7 @@ mod tests {
fn dummy_storage(workdir: PathBuf) -> S3Bucket {
S3Bucket {
workdir,
client: S3Client::new("us-east-1".parse().unwrap()),
client: Client::new(&aws_config::SdkConfig::builder().build()),
bucket_name: "dummy-bucket".to_string(),
prefix_in_bucket: Some("dummy_prefix/".to_string()),
concurrency_limiter: Semaphore::new(1),