Use rusoto lib for S3 relish_storage impl

This commit is contained in:
Kirill Bulatov
2021-10-29 23:21:40 +03:00
committed by Kirill Bulatov
parent 8e2a6661e9
commit db63fa64ae
5 changed files with 135 additions and 3532 deletions

3394
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -18,6 +18,7 @@ log = "0.4.14"
clap = "3.0"
daemonize = "0.4.1"
tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] }
tokio-util = { version = "0.7", features = ["io"] }
postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" }
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" }
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="2949d98df52587d562986aad155dd4e889e408b7" }
@@ -34,7 +35,6 @@ serde_with = "1.12.0"
toml_edit = { version = "0.13", features = ["easy"] }
scopeguard = "1.1.0"
async-trait = "0.1"
const_format = "0.2.21"
tracing = "0.1.27"
tracing-futures = "0.2"
@@ -45,7 +45,9 @@ once_cell = "1.8.0"
crossbeam-utils = "0.8.5"
fail = "0.5.0"
rust-s3 = { version = "0.28", default-features = false, features = ["no-verify-ssl", "tokio-rustls-tls"] }
rusoto_core = "0.47"
rusoto_s3 = "0.47"
async-trait = "0.1"
async-compression = {version = "0.3", features = ["zstd", "tokio"]}
postgres_ffi = { path = "../postgres_ffi" }

View File

@@ -5,7 +5,7 @@
//! There are a few components the storage machinery consists of:
//! * [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
//! * [`local_fs`] allows to use local file system as an external storage
//! * [`rust_s3`] uses AWS S3 bucket as an external storage
//! * [`s3_bucket`] uses AWS S3 bucket as an external storage
//!
//! * synchronization logic at [`storage_sync`] module that keeps pageserver state (both runtime one and the workdir files) and storage state in sync.
//! Synchronization internals are split into submodules
@@ -82,7 +82,7 @@
//! The sync queue processing also happens in batches, so the sync tasks can wait in the queue for some time.
mod local_fs;
mod rust_s3;
mod s3_bucket;
mod storage_sync;
use std::{
@@ -98,7 +98,7 @@ use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
pub use self::storage_sync::index::{RemoteIndex, TimelineIndexEntry};
pub use self::storage_sync::{schedule_timeline_checkpoint_upload, schedule_timeline_download};
use self::{local_fs::LocalFs, rust_s3::S3};
use self::{local_fs::LocalFs, s3_bucket::S3Bucket};
use crate::layered_repository::ephemeral_file::is_ephemeral_file;
use crate::{
config::{PageServerConf, RemoteStorageKind},
@@ -151,7 +151,7 @@ pub fn start_local_timeline_sync(
storage_sync::spawn_storage_sync_thread(
config,
local_timeline_files,
S3::new(s3_config, &config.workdir)?,
S3Bucket::new(s3_config, &config.workdir)?,
storage_config.max_concurrent_sync,
storage_config.max_sync_errors,
)

View File

@@ -46,18 +46,6 @@ This could be avoided by a background thread/future storing the serialized index
No file checksum assertion is done currently, but should be (AWS S3 returns file checksums during the `list` operation)
* sad rust-s3 api
rust-s3 is not very pleasant to use:
1. it returns `anyhow::Result` and it's hard to distinguish "missing file" cases from "no connection" one, for instance
2. at least one function it its API that we need (`get_object_stream`) has `async` keyword and blocks (!), see details [here](https://github.com/zenithdb/zenith/pull/752#discussion_r728373091)
3. it's a prerelease library with unclear maintenance status
4. noisy on debug level
But it's already used in the project, so for now it's reused to avoid bloating the dependency tree.
Based on previous evaluation, even `rusoto-s3` could be a better choice over this library, but needs further benchmarking.
* gc is ignored
So far, we don't adjust the remote storage based on GC thread loop results, only checkpointer loop affects the remote storage.

View File

@@ -1,4 +1,4 @@
//! AWS S3 storage wrapper around `rust_s3` library.
//! AWS S3 storage wrapper around `rusoto` library.
//!
//! Respects `prefix_in_bucket` property from [`S3Config`],
//! allowing multiple pageservers to independently work with the same S3 bucket, if
@@ -7,9 +7,17 @@
use std::path::{Path, PathBuf};
use anyhow::Context;
use s3::{bucket::Bucket, creds::Credentials, region::Region};
use tokio::io::{self, AsyncWriteExt};
use tracing::debug;
use rusoto_core::{
credential::{InstanceMetadataProvider, StaticProvider},
HttpClient, Region,
};
use rusoto_s3::{
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client,
StreamingBody, S3,
};
use tokio::io;
use tokio_util::io::ReaderStream;
use tracing::{debug, trace};
use crate::{
config::S3Config,
@@ -50,38 +58,50 @@ impl S3ObjectKey {
}
/// AWS S3 storage.
pub struct S3 {
pub struct S3Bucket {
pageserver_workdir: &'static Path,
bucket: Bucket,
client: S3Client,
bucket_name: String,
prefix_in_bucket: Option<String>,
}
impl S3 {
/// Creates the storage, errors if incorrect AWS S3 configuration provided.
impl S3Bucket {
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
pub fn new(aws_config: &S3Config, pageserver_workdir: &'static Path) -> anyhow::Result<Self> {
// TODO kb check this
// Keeping a single client may cause issues due to timeouts.
// https://github.com/rusoto/rusoto/issues/1686
debug!(
"Creating s3 remote storage around bucket {}",
"Creating s3 remote storage for S3 bucket {}",
aws_config.bucket_name
);
let region = match aws_config.endpoint.clone() {
Some(endpoint) => Region::Custom {
endpoint,
region: aws_config.bucket_region.clone(),
Some(custom_endpoint) => Region::Custom {
name: aws_config.bucket_region.clone(),
endpoint: custom_endpoint,
},
None => aws_config
.bucket_region
.parse::<Region>()
.context("Failed to parse the s3 region from config")?,
};
let credentials = Credentials::new(
aws_config.access_key_id.as_deref(),
aws_config.secret_access_key.as_deref(),
None,
None,
None,
)
.context("Failed to create the s3 credentials")?;
let request_dispatcher = HttpClient::new().context("Failed to create S3 http client")?;
let client = if aws_config.access_key_id.is_none() && aws_config.secret_access_key.is_none()
{
trace!("Using IAM-based AWS access");
S3Client::new_with(request_dispatcher, InstanceMetadataProvider::new(), region)
} else {
trace!("Using credentials-based AWS access");
S3Client::new_with(
request_dispatcher,
StaticProvider::new_minimal(
aws_config.access_key_id.clone().unwrap_or_default(),
aws_config.secret_access_key.clone().unwrap_or_default(),
),
region,
)
};
let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
let mut prefix = prefix;
@@ -97,20 +117,16 @@ impl S3 {
});
Ok(Self {
bucket: Bucket::new_with_path_style(
aws_config.bucket_name.as_str(),
region,
credentials,
)
.context("Failed to create the s3 bucket")?,
client,
pageserver_workdir,
bucket_name: aws_config.bucket_name.clone(),
prefix_in_bucket,
})
}
}
#[async_trait::async_trait]
impl RemoteStorage for S3 {
impl RemoteStorage for S3Bucket {
type StoragePath = S3ObjectKey;
fn storage_path(&self, local_path: &Path) -> anyhow::Result<Self::StoragePath> {
@@ -129,48 +145,50 @@ impl RemoteStorage for S3 {
}
async fn list(&self) -> anyhow::Result<Vec<Self::StoragePath>> {
let list_response = self
.bucket
.list(self.prefix_in_bucket.clone().unwrap_or_default(), None)
.await
.context("Failed to list s3 objects")?;
let mut document_keys = Vec::new();
Ok(list_response
.into_iter()
.flat_map(|response| response.contents)
.map(|s3_object| S3ObjectKey(s3_object.key))
.collect())
let mut continuation_token = None;
loop {
let fetch_response = self
.client
.list_objects_v2(ListObjectsV2Request {
bucket: self.bucket_name.clone(),
prefix: self.prefix_in_bucket.clone(),
continuation_token,
..ListObjectsV2Request::default()
})
.await?;
document_keys.extend(
fetch_response
.contents
.unwrap_or_default()
.into_iter()
.filter_map(|o| Some(S3ObjectKey(o.key?))),
);
match fetch_response.continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok(document_keys)
}
async fn upload(
&self,
mut from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
to: &Self::StoragePath,
) -> anyhow::Result<()> {
let mut upload_contents = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
io::copy(&mut from, &mut upload_contents)
.await
.context("Failed to read the upload contents")?;
upload_contents
.flush()
.await
.context("Failed to read the upload contents")?;
let upload_contents = upload_contents.into_inner().into_inner();
let (_, code) = self
.bucket
.put_object(to.key(), &upload_contents)
.await
.with_context(|| format!("Failed to create s3 object with key {}", to.key()))?;
if code != 200 {
Err(anyhow::format_err!(
"Received non-200 exit code during creating object with key '{}', code: {}",
to.key(),
code
))
} else {
Ok(())
}
self.client
.put_object(PutObjectRequest {
body: Some(StreamingBody::new(ReaderStream::new(from))),
bucket: self.bucket_name.clone(),
key: to.key().to_owned(),
..PutObjectRequest::default()
})
.await?;
Ok(())
}
async fn download(
@@ -178,25 +196,21 @@ impl RemoteStorage for S3 {
from: &Self::StoragePath,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<()> {
let (data, code) = self
.bucket
.get_object(from.key())
.await
.with_context(|| format!("Failed to download s3 object with key {}", from.key()))?;
if code != 200 {
Err(anyhow::format_err!(
"Received non-200 exit code during downloading object, code: {}",
code
))
} else {
// we don't have to write vector into the destination this way, `to_write_all` would be enough.
// but we want to prepare for migration on `rusoto`, that has a streaming HTTP body instead here, with
// which it makes more sense to use `io::copy`.
io::copy(&mut data.as_slice(), to)
.await
.context("Failed to write downloaded data into the destination buffer")?;
Ok(())
let object_output = self
.client
.get_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.key().to_owned(),
..GetObjectRequest::default()
})
.await?;
if let Some(body) = object_output.body {
let mut from = io::BufReader::new(body.into_async_read());
io::copy(&mut from, to).await?;
}
Ok(())
}
async fn download_range(
@@ -209,40 +223,37 @@ impl RemoteStorage for S3 {
// S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
// and needs both ends to be exclusive
let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
let (data, code) = self
.bucket
.get_object_range(from.key(), start_inclusive, end_inclusive)
.await
.with_context(|| format!("Failed to download s3 object with key {}", from.key()))?;
if code != 206 {
Err(anyhow::format_err!(
"Received non-206 exit code during downloading object range, code: {}",
code
))
} else {
// see `download` function above for the comment on why `Vec<u8>` buffer is copied this way
io::copy(&mut data.as_slice(), to)
.await
.context("Failed to write downloaded range into the destination buffer")?;
Ok(())
let range = Some(match end_inclusive {
Some(end_inclusive) => format!("bytes={}-{}", start_inclusive, end_inclusive),
None => format!("bytes={}-", start_inclusive),
});
let object_output = self
.client
.get_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.key().to_owned(),
range,
..GetObjectRequest::default()
})
.await?;
if let Some(body) = object_output.body {
let mut from = io::BufReader::new(body.into_async_read());
io::copy(&mut from, to).await?;
}
Ok(())
}
async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()> {
let (_, code) = self
.bucket
.delete_object(path.key())
.await
.with_context(|| format!("Failed to delete s3 object with key {}", path.key()))?;
if code != 204 {
Err(anyhow::format_err!(
"Received non-204 exit code during deleting object with key '{}', code: {}",
path.key(),
code
))
} else {
Ok(())
}
self.client
.delete_object(DeleteObjectRequest {
bucket: self.bucket_name.clone(),
key: path.key().to_owned(),
..DeleteObjectRequest::default()
})
.await?;
Ok(())
}
}
@@ -314,7 +325,7 @@ mod tests {
#[test]
fn storage_path_negatives() -> anyhow::Result<()> {
#[track_caller]
fn storage_path_error(storage: &S3, mismatching_path: &Path) -> String {
fn storage_path_error(storage: &S3Bucket, mismatching_path: &Path) -> String {
match storage.storage_path(mismatching_path) {
Ok(wrong_key) => panic!(
"Expected path '{}' to error, but got S3 key: {:?}",
@@ -412,15 +423,11 @@ mod tests {
Ok(())
}
fn dummy_storage(pageserver_workdir: &'static Path) -> S3 {
S3 {
fn dummy_storage(pageserver_workdir: &'static Path) -> S3Bucket {
S3Bucket {
pageserver_workdir,
bucket: Bucket::new(
"dummy-bucket",
"us-east-1".parse().unwrap(),
Credentials::anonymous().unwrap(),
)
.unwrap(),
client: S3Client::new("us-east-1".parse().unwrap()),
bucket_name: "dummy-bucket".to_string(),
prefix_in_bucket: Some("dummy_prefix/".to_string()),
}
}