mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Add the ability to configure GenericRemoteStorage for the scrubber (#9652)
Earlier work (#7547) has made the scrubber internally generic, but one could only configure it to use S3 storage. This is the final piece to make (most of, snapshotting still requires S3) the scrubber be able to be configured via GenericRemoteStorage. I.e. you can now set an env var like: ``` REMOTE_STORAGE_CONFIG='remote_storage = { bucket_name = "neon-dev-safekeeper-us-east-2d", bucket_region = "us-east-2" } ``` and the scrubber will read it instead.
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3578,7 +3578,6 @@ dependencies = [
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -26,6 +26,16 @@ pub struct RemoteStorageConfig {
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
impl RemoteStorageKind {
|
||||
pub fn bucket_name(&self) -> Option<&str> {
|
||||
match self {
|
||||
RemoteStorageKind::LocalFs { .. } => None,
|
||||
RemoteStorageKind::AwsS3(config) => Some(&config.bucket_name),
|
||||
RemoteStorageKind::AzureContainer(config) => Some(&config.container_name),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn default_timeout() -> Duration {
|
||||
RemoteStorageConfig::DEFAULT_TIMEOUT
|
||||
}
|
||||
@@ -178,6 +188,14 @@ impl RemoteStorageConfig {
|
||||
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<RemoteStorageConfig> {
|
||||
Ok(utils::toml_edit_ext::deserialize_item(toml)?)
|
||||
}
|
||||
|
||||
pub fn from_toml_str(input: &str) -> anyhow::Result<RemoteStorageConfig> {
|
||||
let toml_document = toml_edit::DocumentMut::from_str(input)?;
|
||||
if let Some(item) = toml_document.get("remote_storage") {
|
||||
return Self::from_toml(item);
|
||||
}
|
||||
Self::from_toml(toml_document.as_item())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -185,8 +203,7 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
fn parse(input: &str) -> anyhow::Result<RemoteStorageConfig> {
|
||||
let toml = input.parse::<toml_edit::DocumentMut>().unwrap();
|
||||
RemoteStorageConfig::from_toml(toml.as_item())
|
||||
RemoteStorageConfig::from_toml_str(input)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -18,7 +18,6 @@ postgres_ffi.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml_edit.workspace = true
|
||||
utils.workspace = true
|
||||
svg_fmt.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -174,11 +174,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
println!("specified prefix '{}' failed validation", cmd.prefix);
|
||||
return Ok(());
|
||||
};
|
||||
let toml_document = toml_edit::DocumentMut::from_str(&cmd.config_toml_str)?;
|
||||
let toml_item = toml_document
|
||||
.get("remote_storage")
|
||||
.expect("need remote_storage");
|
||||
let config = RemoteStorageConfig::from_toml(toml_item)?;
|
||||
let config = RemoteStorageConfig::from_toml_str(&cmd.config_toml_str)?;
|
||||
let storage = remote_storage::GenericRemoteStorage::from_config(&config).await;
|
||||
let cancel = CancellationToken::new();
|
||||
storage
|
||||
|
||||
@@ -106,9 +106,9 @@ pub async fn find_large_objects(
|
||||
}
|
||||
}
|
||||
|
||||
let bucket_name = target.bucket_name();
|
||||
let desc_str = target.desc_str();
|
||||
tracing::info!(
|
||||
"Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
|
||||
"Scan of {desc_str} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
|
||||
objects.len()
|
||||
);
|
||||
Ok(LargeObjectListing { objects })
|
||||
|
||||
@@ -177,7 +177,7 @@ async fn find_garbage_inner(
|
||||
}));
|
||||
|
||||
// Enumerate Tenants in S3, and check if each one exists in Console
|
||||
tracing::info!("Finding all tenants in bucket {}...", bucket_config.bucket);
|
||||
tracing::info!("Finding all tenants in {}...", bucket_config.desc_str());
|
||||
let tenants = stream_tenants(&remote_client, &target);
|
||||
let tenants_checked = tenants.map_ok(|t| {
|
||||
let api_client = cloud_admin_api_client.clone();
|
||||
@@ -524,7 +524,7 @@ pub async fn purge_garbage(
|
||||
init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
|
||||
|
||||
assert_eq!(
|
||||
&garbage_list.bucket_config.bucket,
|
||||
garbage_list.bucket_config.bucket_name().unwrap(),
|
||||
remote_client.bucket_name().unwrap()
|
||||
);
|
||||
|
||||
|
||||
@@ -29,8 +29,7 @@ use pageserver::tenant::TENANTS_SEGMENT_NAME;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{
|
||||
DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig,
|
||||
RemoteStorageKind, S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
|
||||
RemoteStorageKind, S3Config,
|
||||
};
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -48,7 +47,7 @@ const CLOUD_ADMIN_API_TOKEN_ENV_VAR: &str = "CLOUD_ADMIN_API_TOKEN";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct S3Target {
|
||||
pub bucket_name: String,
|
||||
pub desc_str: String,
|
||||
/// This `prefix_in_bucket` is only equal to the PS/SK config of the same
|
||||
/// name for the RootTarget: other instances of S3Target will have prefix_in_bucket
|
||||
/// with extra parts.
|
||||
@@ -172,7 +171,7 @@ impl RootTarget {
|
||||
};
|
||||
|
||||
S3Target {
|
||||
bucket_name: root.bucket_name.clone(),
|
||||
desc_str: root.desc_str.clone(),
|
||||
prefix_in_bucket: format!(
|
||||
"{}/{TENANTS_SEGMENT_NAME}/{tenant_id}",
|
||||
root.prefix_in_bucket
|
||||
@@ -209,10 +208,10 @@ impl RootTarget {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bucket_name(&self) -> &str {
|
||||
pub fn desc_str(&self) -> &str {
|
||||
match self {
|
||||
Self::Pageserver(root) => &root.bucket_name,
|
||||
Self::Safekeeper(root) => &root.bucket_name,
|
||||
Self::Pageserver(root) => &root.desc_str,
|
||||
Self::Safekeeper(root) => &root.desc_str,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -230,24 +229,61 @@ pub fn remote_timeline_path_id(id: &TenantShardTimelineId) -> RemotePath {
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct BucketConfig {
|
||||
pub region: String,
|
||||
pub bucket: String,
|
||||
pub prefix_in_bucket: Option<String>,
|
||||
}
|
||||
pub struct BucketConfig(RemoteStorageConfig);
|
||||
|
||||
impl BucketConfig {
|
||||
pub fn from_env() -> anyhow::Result<Self> {
|
||||
let region = env::var("REGION").context("'REGION' param retrieval")?;
|
||||
let bucket = env::var("BUCKET").context("'BUCKET' param retrieval")?;
|
||||
let prefix_in_bucket = env::var("BUCKET_PREFIX").ok();
|
||||
|
||||
Ok(Self {
|
||||
region,
|
||||
bucket,
|
||||
prefix_in_bucket,
|
||||
})
|
||||
if let Ok(legacy) = Self::from_env_legacy() {
|
||||
return Ok(legacy);
|
||||
}
|
||||
let config_toml =
|
||||
env::var("REMOTE_STORAGE_CONFIG").context("'REMOTE_STORAGE_CONFIG' retrieval")?;
|
||||
let remote_config = RemoteStorageConfig::from_toml_str(&config_toml)?;
|
||||
Ok(BucketConfig(remote_config))
|
||||
}
|
||||
|
||||
fn from_env_legacy() -> anyhow::Result<Self> {
|
||||
let bucket_region = env::var("REGION").context("'REGION' param retrieval")?;
|
||||
let bucket_name = env::var("BUCKET").context("'BUCKET' param retrieval")?;
|
||||
let prefix_in_bucket = env::var("BUCKET_PREFIX").ok();
|
||||
let endpoint = env::var("AWS_ENDPOINT_URL").ok();
|
||||
// Create a json object which we then deserialize so that we don't
|
||||
// have to repeat all of the S3Config fields.
|
||||
let s3_config_json = serde_json::json!({
|
||||
"bucket_name": bucket_name,
|
||||
"bucket_region": bucket_region,
|
||||
"prefix_in_bucket": prefix_in_bucket,
|
||||
"endpoint": endpoint,
|
||||
});
|
||||
let config: RemoteStorageConfig = serde_json::from_value(s3_config_json)?;
|
||||
Ok(BucketConfig(config))
|
||||
}
|
||||
pub fn desc_str(&self) -> String {
|
||||
match &self.0.storage {
|
||||
RemoteStorageKind::LocalFs { local_path } => {
|
||||
format!("local path {local_path}")
|
||||
}
|
||||
RemoteStorageKind::AwsS3(config) => format!(
|
||||
"bucket {}, region {}",
|
||||
config.bucket_name, config.bucket_region
|
||||
),
|
||||
RemoteStorageKind::AzureContainer(config) => format!(
|
||||
"bucket {}, storage account {:?}, region {}",
|
||||
config.container_name, config.storage_account, config.container_region
|
||||
),
|
||||
}
|
||||
}
|
||||
pub fn bucket_name(&self) -> Option<&str> {
|
||||
self.0.storage.bucket_name()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct BucketConfigLegacy {
|
||||
pub region: String,
|
||||
pub bucket: String,
|
||||
pub prefix_in_bucket: Option<String>,
|
||||
}
|
||||
|
||||
pub struct ControllerClientConfig {
|
||||
@@ -337,13 +373,9 @@ fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str {
|
||||
}
|
||||
}
|
||||
|
||||
fn make_root_target(
|
||||
bucket_name: String,
|
||||
prefix_in_bucket: String,
|
||||
node_kind: NodeKind,
|
||||
) -> RootTarget {
|
||||
fn make_root_target(desc_str: String, prefix_in_bucket: String, node_kind: NodeKind) -> RootTarget {
|
||||
let s3_target = S3Target {
|
||||
bucket_name,
|
||||
desc_str,
|
||||
prefix_in_bucket,
|
||||
delimiter: "/".to_string(),
|
||||
};
|
||||
@@ -354,15 +386,15 @@ fn make_root_target(
|
||||
}
|
||||
|
||||
async fn init_remote_s3(
|
||||
bucket_config: BucketConfig,
|
||||
bucket_config: S3Config,
|
||||
node_kind: NodeKind,
|
||||
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
|
||||
let bucket_region = Region::new(bucket_config.region);
|
||||
let bucket_region = Region::new(bucket_config.bucket_region);
|
||||
let s3_client = Arc::new(init_s3_client(bucket_region).await);
|
||||
let default_prefix = default_prefix_in_bucket(node_kind).to_string();
|
||||
|
||||
let s3_root = make_root_target(
|
||||
bucket_config.bucket,
|
||||
bucket_config.bucket_name,
|
||||
bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
|
||||
node_kind,
|
||||
);
|
||||
@@ -371,33 +403,28 @@ async fn init_remote_s3(
|
||||
}
|
||||
|
||||
async fn init_remote(
|
||||
bucket_config: BucketConfig,
|
||||
mut storage_config: BucketConfig,
|
||||
node_kind: NodeKind,
|
||||
) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> {
|
||||
let endpoint = env::var("AWS_ENDPOINT_URL").ok();
|
||||
let desc_str = storage_config.desc_str();
|
||||
|
||||
let default_prefix = default_prefix_in_bucket(node_kind).to_string();
|
||||
let prefix_in_bucket = Some(bucket_config.prefix_in_bucket.unwrap_or(default_prefix));
|
||||
let storage = S3Config {
|
||||
bucket_name: bucket_config.bucket.clone(),
|
||||
bucket_region: bucket_config.region,
|
||||
prefix_in_bucket,
|
||||
endpoint,
|
||||
concurrency_limit: DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
upload_storage_class: None,
|
||||
};
|
||||
let storage_config = RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::AwsS3(storage),
|
||||
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
|
||||
};
|
||||
|
||||
match &mut storage_config.0.storage {
|
||||
RemoteStorageKind::AwsS3(ref mut config) => {
|
||||
config.prefix_in_bucket.get_or_insert(default_prefix);
|
||||
}
|
||||
RemoteStorageKind::AzureContainer(ref mut config) => {
|
||||
config.prefix_in_container.get_or_insert(default_prefix);
|
||||
}
|
||||
RemoteStorageKind::LocalFs { .. } => (),
|
||||
}
|
||||
|
||||
// We already pass the prefix to the remote client above
|
||||
let prefix_in_root_target = String::new();
|
||||
let root_target = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind);
|
||||
let root_target = make_root_target(desc_str, prefix_in_root_target, node_kind);
|
||||
|
||||
let client = GenericRemoteStorage::from_config(&storage_config).await?;
|
||||
let client = GenericRemoteStorage::from_config(&storage_config.0).await?;
|
||||
Ok((client, root_target))
|
||||
}
|
||||
|
||||
@@ -469,7 +496,7 @@ async fn list_objects_with_retries(
|
||||
}
|
||||
warn!(
|
||||
"list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
|
||||
s3_target.bucket_name,
|
||||
remote_client.bucket_name().unwrap_or_default(),
|
||||
s3_target.prefix_in_bucket,
|
||||
s3_target.delimiter,
|
||||
DisplayErrorContext(e),
|
||||
|
||||
@@ -140,7 +140,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
"{}_{}_{}_{}.log",
|
||||
std::env::args().next().unwrap(),
|
||||
command_log_name,
|
||||
bucket_config.bucket,
|
||||
bucket_config.bucket_name().unwrap_or("nobucket"),
|
||||
chrono::Utc::now().format("%Y_%m_%d__%H_%M_%S")
|
||||
));
|
||||
|
||||
@@ -191,13 +191,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
|
||||
// scrubber they were likely expecting to scan something, and if we see no timelines
|
||||
// at all then it's likely due to some configuration issues like a bad prefix
|
||||
bail!(
|
||||
"No timelines found in bucket {} prefix {}",
|
||||
bucket_config.bucket,
|
||||
bucket_config
|
||||
.prefix_in_bucket
|
||||
.unwrap_or("<none>".to_string())
|
||||
);
|
||||
bail!("No timelines found in {}", bucket_config.desc_str());
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
@@ -396,13 +390,7 @@ pub async fn scan_pageserver_metadata_cmd(
|
||||
// Strictly speaking an empty bucket is a valid bucket, but if someone ran the
|
||||
// scrubber they were likely expecting to scan something, and if we see no timelines
|
||||
// at all then it's likely due to some configuration issues like a bad prefix
|
||||
tracing::error!(
|
||||
"No timelines found in bucket {} prefix {}",
|
||||
bucket_config.bucket,
|
||||
bucket_config
|
||||
.prefix_in_bucket
|
||||
.unwrap_or("<none>".to_string())
|
||||
);
|
||||
tracing::error!("No timelines found in {}", bucket_config.desc_str());
|
||||
if exit_code {
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
@@ -84,10 +84,7 @@ pub async fn scan_safekeeper_metadata(
|
||||
bucket_config: BucketConfig,
|
||||
db_or_list: DatabaseOrList,
|
||||
) -> anyhow::Result<MetadataSummary> {
|
||||
info!(
|
||||
"checking bucket {}, region {}",
|
||||
bucket_config.bucket, bucket_config.region
|
||||
);
|
||||
info!("checking {}", bucket_config.desc_str());
|
||||
|
||||
let (remote_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
|
||||
let console_config = ConsoleConfig::from_env()?;
|
||||
|
||||
@@ -16,7 +16,7 @@ use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use pageserver::tenant::IndexPart;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::{GenericRemoteStorage, S3Config};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TenantId;
|
||||
|
||||
@@ -24,6 +24,7 @@ pub struct SnapshotDownloader {
|
||||
s3_client: Arc<Client>,
|
||||
s3_root: RootTarget,
|
||||
bucket_config: BucketConfig,
|
||||
bucket_config_s3: S3Config,
|
||||
tenant_id: TenantId,
|
||||
output_path: Utf8PathBuf,
|
||||
concurrency: usize,
|
||||
@@ -36,12 +37,17 @@ impl SnapshotDownloader {
|
||||
output_path: Utf8PathBuf,
|
||||
concurrency: usize,
|
||||
) -> anyhow::Result<Self> {
|
||||
let bucket_config_s3 = match &bucket_config.0.storage {
|
||||
remote_storage::RemoteStorageKind::AwsS3(config) => config.clone(),
|
||||
_ => panic!("only S3 configuration is supported for snapshot downloading"),
|
||||
};
|
||||
let (s3_client, s3_root) =
|
||||
init_remote_s3(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
init_remote_s3(bucket_config_s3.clone(), NodeKind::Pageserver).await?;
|
||||
Ok(Self {
|
||||
s3_client,
|
||||
s3_root,
|
||||
bucket_config,
|
||||
bucket_config_s3,
|
||||
tenant_id,
|
||||
output_path,
|
||||
concurrency,
|
||||
@@ -87,7 +93,7 @@ impl SnapshotDownloader {
|
||||
let versions = self
|
||||
.s3_client
|
||||
.list_object_versions()
|
||||
.bucket(self.bucket_config.bucket.clone())
|
||||
.bucket(self.bucket_config_s3.bucket_name.clone())
|
||||
.prefix(&remote_layer_path)
|
||||
.send()
|
||||
.await?;
|
||||
@@ -96,7 +102,7 @@ impl SnapshotDownloader {
|
||||
};
|
||||
download_object_to_file_s3(
|
||||
&self.s3_client,
|
||||
&self.bucket_config.bucket,
|
||||
&self.bucket_config_s3.bucket_name,
|
||||
&remote_layer_path,
|
||||
version.version_id.as_deref(),
|
||||
&local_path,
|
||||
|
||||
Reference in New Issue
Block a user