mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 14:00:38 +00:00
Support prefixes when working with s3 buckets
This commit is contained in:
committed by
Kirill Bulatov
parent
8ab4c8a050
commit
4b3b19f444
@@ -147,6 +147,10 @@ bucket_name = 'some-sample-bucket'
|
||||
# Name of the region where the bucket is located at
|
||||
bucket_region = 'eu-north-1'
|
||||
|
||||
# A "subfolder" in the bucket, to use the same bucket separately by multiple pageservers at once.
|
||||
# Optional, pageserver uses entire bucket if the prefix is not specified.
|
||||
prefix_in_bucket = '/some/prefix/'
|
||||
|
||||
# Access key to connect to the bucket ("login" part of the credentials)
|
||||
access_key_id = 'SOMEKEYAAAAASADSAH*#'
|
||||
|
||||
|
||||
@@ -129,13 +129,13 @@ There are the following implementations present:
|
||||
* local filesystem — to use in tests mainly
|
||||
* AWS S3 - to use in production
|
||||
|
||||
Implementation details are covered in the [backup readme](./src/remote_storage/README.md) and corresponding Rust file docs.
|
||||
Implementation details are covered in the [backup readme](./src/remote_storage/README.md) and corresponding Rust file docs, parameters documentation can be found at [settings docs](../docs/settings.md).
|
||||
|
||||
The backup service is disabled by default and can be enabled to interact with a single remote storage.
|
||||
|
||||
CLI examples:
|
||||
* Local FS: `${PAGESERVER_BIN} -c "remote_storage={local_path='/some/local/path/'}"`
|
||||
* AWS S3 : `${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1',access_key_id='SOMEKEYAAAAASADSAH*#',secret_access_key='SOMEsEcReTsd292v'}"`
|
||||
* AWS S3 : `${PAGESERVER_BIN} -c "remote_storage={bucket_name='some-sample-bucket',bucket_region='eu-north-1', prefix_in_bucket='/test_prefix/',access_key_id='SOMEKEYAAAAASADSAH*#',secret_access_key='SOMEsEcReTsd292v'}"`
|
||||
|
||||
For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS.
|
||||
For local S3 installations, refer to the their documentation for name format and credentials.
|
||||
@@ -154,6 +154,7 @@ or
|
||||
[remote_storage]
|
||||
bucket_name = 'some-sample-bucket'
|
||||
bucket_region = 'eu-north-1'
|
||||
prefix_in_bucket = '/test_prefix/'
|
||||
access_key_id = 'SOMEKEYAAAAASADSAH*#'
|
||||
secret_access_key = 'SOMEsEcReTsd292v'
|
||||
```
|
||||
|
||||
@@ -135,6 +135,8 @@ pub struct S3Config {
|
||||
pub bucket_name: String,
|
||||
/// The region where the bucket is located at.
|
||||
pub bucket_region: String,
|
||||
/// A "subfolder" in the bucket, to use the same bucket separately by multiple pageservers at once.
|
||||
pub prefix_in_bucket: Option<String>,
|
||||
/// "Login" to use when connecting to bucket.
|
||||
/// Can be empty for cases like AWS k8s IAM
|
||||
/// where we can allow certain pods to connect
|
||||
@@ -149,6 +151,7 @@ impl std::fmt::Debug for S3Config {
|
||||
f.debug_struct("S3Config")
|
||||
.field("bucket_name", &self.bucket_name)
|
||||
.field("bucket_region", &self.bucket_region)
|
||||
.field("prefix_in_bucket", &self.prefix_in_bucket)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@@ -332,18 +335,26 @@ impl PageServerConf {
|
||||
bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
|
||||
}
|
||||
(None, Some(bucket_name), Some(bucket_region)) => RemoteStorageKind::AwsS3(S3Config {
|
||||
bucket_name: bucket_name.as_str().unwrap().to_string(),
|
||||
bucket_region: bucket_region.as_str().unwrap().to_string(),
|
||||
bucket_name: parse_toml_string("bucket_name", bucket_name)?,
|
||||
bucket_region: parse_toml_string("bucket_region", bucket_region)?,
|
||||
access_key_id: toml
|
||||
.get("access_key_id")
|
||||
.map(|x| x.as_str().unwrap().to_string()),
|
||||
.map(|access_key_id| parse_toml_string("access_key_id", access_key_id))
|
||||
.transpose()?,
|
||||
secret_access_key: toml
|
||||
.get("secret_access_key")
|
||||
.map(|x| x.as_str().unwrap().to_string()),
|
||||
.map(|secret_access_key| {
|
||||
parse_toml_string("secret_access_key", secret_access_key)
|
||||
})
|
||||
.transpose()?,
|
||||
prefix_in_bucket: toml
|
||||
.get("prefix_in_bucket")
|
||||
.map(|prefix_in_bucket| parse_toml_string("prefix_in_bucket", prefix_in_bucket))
|
||||
.transpose()?,
|
||||
}),
|
||||
(Some(local_path), None, None) => {
|
||||
RemoteStorageKind::LocalFs(PathBuf::from(local_path.as_str().unwrap()))
|
||||
}
|
||||
(Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from(
|
||||
parse_toml_string("local_path", local_path)?,
|
||||
)),
|
||||
(Some(_), Some(_), _) => bail!("local_path and bucket_name are mutually exclusive"),
|
||||
};
|
||||
|
||||
@@ -585,6 +596,7 @@ pg_distrib_dir='{}'
|
||||
|
||||
let bucket_name = "some-sample-bucket".to_string();
|
||||
let bucket_region = "eu-north-1".to_string();
|
||||
let prefix_in_bucket = "test_prefix".to_string();
|
||||
let access_key_id = "SOMEKEYAAAAASADSAH*#".to_string();
|
||||
let secret_access_key = "SOMEsEcReTsd292v".to_string();
|
||||
let max_concurrent_sync = NonZeroUsize::new(111).unwrap();
|
||||
@@ -597,13 +609,14 @@ max_concurrent_sync = {}
|
||||
max_sync_errors = {}
|
||||
bucket_name = '{}'
|
||||
bucket_region = '{}'
|
||||
prefix_in_bucket = '{}'
|
||||
access_key_id = '{}'
|
||||
secret_access_key = '{}'"#,
|
||||
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, access_key_id, secret_access_key
|
||||
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, prefix_in_bucket, access_key_id, secret_access_key
|
||||
),
|
||||
format!(
|
||||
"remote_storage={{max_concurrent_sync = {}, max_sync_errors = {}, bucket_name='{}', bucket_region='{}', access_key_id='{}', secret_access_key='{}'}}",
|
||||
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, access_key_id, secret_access_key
|
||||
"remote_storage={{max_concurrent_sync={}, max_sync_errors={}, bucket_name='{}', bucket_region='{}', prefix_in_bucket='{}', access_key_id='{}', secret_access_key='{}'}}",
|
||||
max_concurrent_sync, max_sync_errors, bucket_name, bucket_region, prefix_in_bucket, access_key_id, secret_access_key
|
||||
),
|
||||
];
|
||||
|
||||
@@ -637,6 +650,7 @@ pg_distrib_dir='{}'
|
||||
bucket_region: bucket_region.clone(),
|
||||
access_key_id: Some(access_key_id.clone()),
|
||||
secret_access_key: Some(secret_access_key.clone()),
|
||||
prefix_in_bucket: Some(prefix_in_bucket.clone())
|
||||
}),
|
||||
},
|
||||
"Remote storage config should correctly parse the S3 config"
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
//! There are a few components the storage machinery consists of:
|
||||
//! * [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
|
||||
//! * [`local_fs`] allows to use local file system as an external storage
|
||||
//! * [`rust_s3`] uses AWS S3 bucket entirely as an external storage
|
||||
//! * [`rust_s3`] uses AWS S3 bucket as an external storage
|
||||
//!
|
||||
//! * synchronization logic at [`storage_sync`] module that keeps pageserver state (both runtime one and the workdir files) and storage state in sync.
|
||||
//! Synchronization internals are split into submodules
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
//! AWS S3 storage wrapper around `rust_s3` library.
|
||||
//! Currently does not allow multiple pageservers to use the same bucket concurrently: objects are
|
||||
//! placed in the root of the bucket.
|
||||
//!
|
||||
//! Respects `prefix_in_bucket` property from [`S3Config`],
|
||||
//! allowing multiple pageservers to independently work with the same S3 bucket, if
|
||||
//! their bucket prefixes are both specified and different.
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
@@ -23,8 +25,26 @@ impl S3ObjectKey {
|
||||
&self.0
|
||||
}
|
||||
|
||||
fn download_destination(&self, pageserver_workdir: &Path) -> PathBuf {
|
||||
pageserver_workdir.join(self.0.split(S3_FILE_SEPARATOR).collect::<PathBuf>())
|
||||
fn download_destination(
|
||||
&self,
|
||||
pageserver_workdir: &Path,
|
||||
prefix_to_strip: Option<&str>,
|
||||
) -> PathBuf {
|
||||
let path_without_prefix = match prefix_to_strip {
|
||||
Some(prefix) => self.0.strip_prefix(prefix).unwrap_or_else(|| {
|
||||
panic!(
|
||||
"Could not strip prefix '{}' from S3 object key '{}'",
|
||||
prefix, self.0
|
||||
)
|
||||
}),
|
||||
None => &self.0,
|
||||
};
|
||||
|
||||
pageserver_workdir.join(
|
||||
path_without_prefix
|
||||
.split(S3_FILE_SEPARATOR)
|
||||
.collect::<PathBuf>(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +52,7 @@ impl S3ObjectKey {
|
||||
pub struct S3 {
|
||||
pageserver_workdir: &'static Path,
|
||||
bucket: Bucket,
|
||||
prefix_in_bucket: Option<String>,
|
||||
}
|
||||
|
||||
impl S3 {
|
||||
@@ -49,6 +70,20 @@ impl S3 {
|
||||
None,
|
||||
)
|
||||
.context("Failed to create the s3 credentials")?;
|
||||
|
||||
let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
|
||||
let mut prefix = prefix;
|
||||
while prefix.starts_with(S3_FILE_SEPARATOR) {
|
||||
prefix = &prefix[1..]
|
||||
}
|
||||
|
||||
let mut prefix = prefix.to_string();
|
||||
while prefix.ends_with(S3_FILE_SEPARATOR) {
|
||||
prefix.pop();
|
||||
}
|
||||
prefix
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
bucket: Bucket::new_with_path_style(
|
||||
aws_config.bucket_name.as_str(),
|
||||
@@ -57,6 +92,7 @@ impl S3 {
|
||||
)
|
||||
.context("Failed to create the s3 bucket")?,
|
||||
pageserver_workdir,
|
||||
prefix_in_bucket,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -67,7 +103,7 @@ impl RemoteStorage for S3 {
|
||||
|
||||
fn storage_path(&self, local_path: &Path) -> anyhow::Result<Self::StoragePath> {
|
||||
let relative_path = strip_path_prefix(self.pageserver_workdir, local_path)?;
|
||||
let mut key = String::new();
|
||||
let mut key = self.prefix_in_bucket.clone().unwrap_or_default();
|
||||
for segment in relative_path {
|
||||
key.push(S3_FILE_SEPARATOR);
|
||||
key.push_str(&segment.to_string_lossy());
|
||||
@@ -76,13 +112,14 @@ impl RemoteStorage for S3 {
|
||||
}
|
||||
|
||||
fn local_path(&self, storage_path: &Self::StoragePath) -> anyhow::Result<PathBuf> {
|
||||
Ok(storage_path.download_destination(self.pageserver_workdir))
|
||||
Ok(storage_path
|
||||
.download_destination(self.pageserver_workdir, self.prefix_in_bucket.as_deref()))
|
||||
}
|
||||
|
||||
async fn list(&self) -> anyhow::Result<Vec<Self::StoragePath>> {
|
||||
let list_response = self
|
||||
.bucket
|
||||
.list(String::new(), None)
|
||||
.list(self.prefix_in_bucket.clone().unwrap_or_default(), None)
|
||||
.await
|
||||
.context("Failed to list s3 objects")?;
|
||||
|
||||
@@ -225,7 +262,7 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
local_path,
|
||||
key.download_destination(&repo_harness.conf.workdir),
|
||||
key.download_destination(&repo_harness.conf.workdir, None),
|
||||
"Download destination should consist of s3 path joined with the pageserver workdir prefix"
|
||||
);
|
||||
|
||||
@@ -239,14 +276,18 @@ mod tests {
|
||||
let segment_1 = "matching";
|
||||
let segment_2 = "file";
|
||||
let local_path = &repo_harness.conf.workdir.join(segment_1).join(segment_2);
|
||||
|
||||
let storage = dummy_storage(&repo_harness.conf.workdir);
|
||||
|
||||
let expected_key = S3ObjectKey(format!(
|
||||
"{SEPARATOR}{}{SEPARATOR}{}",
|
||||
"{}{SEPARATOR}{}{SEPARATOR}{}",
|
||||
storage.prefix_in_bucket.as_deref().unwrap_or_default(),
|
||||
segment_1,
|
||||
segment_2,
|
||||
SEPARATOR = S3_FILE_SEPARATOR,
|
||||
));
|
||||
|
||||
let actual_key = dummy_storage(&repo_harness.conf.workdir)
|
||||
let actual_key = storage
|
||||
.storage_path(local_path)
|
||||
.expect("Matching path should map to S3 path normally");
|
||||
assert_eq!(
|
||||
@@ -308,18 +349,30 @@ mod tests {
|
||||
let timeline_dir = repo_harness.timeline_path(&TIMELINE_ID);
|
||||
let relative_timeline_path = timeline_dir.strip_prefix(&repo_harness.conf.workdir)?;
|
||||
|
||||
let s3_key = create_s3_key(&relative_timeline_path.join("not a metadata"));
|
||||
let s3_key = create_s3_key(
|
||||
&relative_timeline_path.join("not a metadata"),
|
||||
storage.prefix_in_bucket.as_deref(),
|
||||
);
|
||||
assert_eq!(
|
||||
s3_key.download_destination(&repo_harness.conf.workdir),
|
||||
s3_key.download_destination(
|
||||
&repo_harness.conf.workdir,
|
||||
storage.prefix_in_bucket.as_deref()
|
||||
),
|
||||
storage
|
||||
.local_path(&s3_key)
|
||||
.expect("For a valid input, valid S3 info should be parsed"),
|
||||
"Should be able to parse metadata out of the correctly named remote delta file"
|
||||
);
|
||||
|
||||
let s3_key = create_s3_key(&relative_timeline_path.join(METADATA_FILE_NAME));
|
||||
let s3_key = create_s3_key(
|
||||
&relative_timeline_path.join(METADATA_FILE_NAME),
|
||||
storage.prefix_in_bucket.as_deref(),
|
||||
);
|
||||
assert_eq!(
|
||||
s3_key.download_destination(&repo_harness.conf.workdir),
|
||||
s3_key.download_destination(
|
||||
&repo_harness.conf.workdir,
|
||||
storage.prefix_in_bucket.as_deref()
|
||||
),
|
||||
storage
|
||||
.local_path(&s3_key)
|
||||
.expect("For a valid input, valid S3 info should be parsed"),
|
||||
@@ -356,18 +409,18 @@ mod tests {
|
||||
Credentials::anonymous().unwrap(),
|
||||
)
|
||||
.unwrap(),
|
||||
prefix_in_bucket: Some("dummy_prefix/".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn create_s3_key(relative_file_path: &Path) -> S3ObjectKey {
|
||||
S3ObjectKey(
|
||||
relative_file_path
|
||||
.iter()
|
||||
.fold(String::new(), |mut path_string, segment| {
|
||||
path_string.push(S3_FILE_SEPARATOR);
|
||||
path_string.push_str(segment.to_str().unwrap());
|
||||
path_string
|
||||
}),
|
||||
)
|
||||
fn create_s3_key(relative_file_path: &Path, prefix: Option<&str>) -> S3ObjectKey {
|
||||
S3ObjectKey(relative_file_path.iter().fold(
|
||||
prefix.unwrap_or_default().to_string(),
|
||||
|mut path_string, segment| {
|
||||
path_string.push(S3_FILE_SEPARATOR);
|
||||
path_string.push_str(segment.to_str().unwrap());
|
||||
path_string
|
||||
},
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user