Use DefaultCredentialsChain AWS authentication in remote_storage (#8440)

PR #8299 has switched the storage scrubber to use
`DefaultCredentialsChain`. Now we do this for `remote_storage`, as it
allows us to use `remote_storage` from inside kubernetes. Most of the
diff is due to `GenericRemoteStorage::from_config` becoming `async fn`.
This commit is contained in:
Arpad Müller
2024-07-19 21:19:30 +02:00
committed by GitHub
parent 3d582b212a
commit 4e547e6274
23 changed files with 219 additions and 156 deletions

View File

@@ -443,7 +443,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
impl GenericRemoteStorage {
pub fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
let timeout = storage_config.timeout;
Ok(match &storage_config.storage {
RemoteStorageKind::LocalFs { local_path: path } => {
@@ -458,7 +458,7 @@ impl GenericRemoteStorage {
std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout)?))
Self::AwsS3(Arc::new(S3Bucket::new(s3_config, timeout).await?))
}
RemoteStorageKind::AzureContainer(azure_config) => {
let storage_account = azure_config

View File

@@ -16,16 +16,10 @@ use std::{
use anyhow::{anyhow, Context as _};
use aws_config::{
environment::credentials::EnvironmentVariableCredentialsProvider,
imds::credentials::ImdsCredentialsProvider,
meta::credentials::CredentialsProviderChain,
profile::ProfileFileCredentialsProvider,
provider_config::ProviderConfig,
default_provider::credentials::DefaultCredentialsChain,
retry::{RetryConfigBuilder, RetryMode},
web_identity_token::WebIdentityTokenCredentialsProvider,
BehaviorVersion,
};
use aws_credential_types::provider::SharedCredentialsProvider;
use aws_sdk_s3::{
config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep},
error::SdkError,
@@ -76,40 +70,27 @@ struct GetObjectRequest {
}
impl S3Bucket {
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
pub fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
pub async fn new(remote_storage_config: &S3Config, timeout: Duration) -> anyhow::Result<Self> {
tracing::debug!(
"Creating s3 remote storage for S3 bucket {}",
remote_storage_config.bucket_name
);
let region = Some(Region::new(remote_storage_config.bucket_region.clone()));
let region = Region::new(remote_storage_config.bucket_region.clone());
let region_opt = Some(region.clone());
let provider_conf = ProviderConfig::without_region().with_region(region.clone());
let credentials_provider = {
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
CredentialsProviderChain::first_try(
"env",
EnvironmentVariableCredentialsProvider::new(),
)
// uses "AWS_PROFILE" / `aws sso login --profile <profile>`
.or_else(
"profile-sso",
ProfileFileCredentialsProvider::builder()
.configure(&provider_conf)
.build(),
)
// uses "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
// needed to access remote extensions bucket
.or_else(
"token",
WebIdentityTokenCredentialsProvider::builder()
.configure(&provider_conf)
.build(),
)
// uses imds v2
.or_else("imds", ImdsCredentialsProvider::builder().build())
};
// https://docs.aws.amazon.com/sdkref/latest/guide/standardized-credentials.html
// https://docs.rs/aws-config/latest/aws_config/default_provider/credentials/struct.DefaultCredentialsChain.html
// Incomplete list of auth methods used by this:
// * "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
// * "AWS_PROFILE" / `aws sso login --profile <profile>`
// * "AWS_WEB_IDENTITY_TOKEN_FILE", "AWS_ROLE_ARN", "AWS_ROLE_SESSION_NAME"
// * http (ECS/EKS) container credentials
// * imds v2
let credentials_provider = DefaultCredentialsChain::builder()
.region(region)
.build()
.await;
// AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
@@ -118,9 +99,9 @@ impl S3Bucket {
#[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
BehaviorVersion::v2023_11_09(),
)
.region(region)
.region(region_opt)
.identity_cache(IdentityCache::lazy().build())
.credentials_provider(SharedCredentialsProvider::new(credentials_provider))
.credentials_provider(credentials_provider)
.sleep_impl(SharedAsyncSleep::from(sleep_impl));
let sdk_config: aws_config::SdkConfig = std::thread::scope(|s| {
@@ -1041,8 +1022,8 @@ mod tests {
use crate::{RemotePath, S3Bucket, S3Config};
#[test]
fn relative_path() {
#[tokio::test]
async fn relative_path() {
let all_paths = ["", "some/path", "some/path/"];
let all_paths: Vec<RemotePath> = all_paths
.iter()
@@ -1085,8 +1066,9 @@ mod tests {
max_keys_per_list_response: Some(5),
upload_storage_class: None,
};
let storage =
S3Bucket::new(&config, std::time::Duration::ZERO).expect("remote storage init");
let storage = S3Bucket::new(&config, std::time::Duration::ZERO)
.await
.expect("remote storage init");
for (test_path_idx, test_path) in all_paths.iter().enumerate() {
let result = storage.relative_path_to_s3_object(test_path);
let expected = expected_outputs[prefix_idx][test_path_idx];

View File

@@ -31,6 +31,7 @@ struct EnabledAzure {
impl EnabledAzure {
async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
let client = create_azure_client(max_keys_in_list_response)
.await
.context("Azure client creation")
.expect("Azure client creation failed");
@@ -187,7 +188,7 @@ impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
}
}
fn create_azure_client(
async fn create_azure_client(
max_keys_per_list_response: Option<i32>,
) -> anyhow::Result<Arc<GenericRemoteStorage>> {
use rand::Rng;
@@ -221,6 +222,8 @@ fn create_azure_client(
timeout: Duration::from_secs(120),
};
Ok(Arc::new(
GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
GenericRemoteStorage::from_config(&remote_storage_config)
.await
.context("remote storage init")?,
))
}

View File

@@ -197,6 +197,7 @@ struct EnabledS3 {
impl EnabledS3 {
async fn setup(max_keys_in_list_response: Option<i32>) -> Self {
let client = create_s3_client(max_keys_in_list_response)
.await
.context("S3 client creation")
.expect("S3 client creation failed");
@@ -352,7 +353,7 @@ impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs {
}
}
fn create_s3_client(
async fn create_s3_client(
max_keys_per_list_response: Option<i32>,
) -> anyhow::Result<Arc<GenericRemoteStorage>> {
use rand::Rng;
@@ -385,7 +386,9 @@ fn create_s3_client(
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};
Ok(Arc::new(
GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?,
GenericRemoteStorage::from_config(&remote_storage_config)
.await
.context("remote storage init")?,
))
}

View File

@@ -179,7 +179,7 @@ async fn main() -> anyhow::Result<()> {
.get("remote_storage")
.expect("need remote_storage");
let config = RemoteStorageConfig::from_toml(toml_item)?;
let storage = remote_storage::GenericRemoteStorage::from_config(&config);
let storage = remote_storage::GenericRemoteStorage::from_config(&config).await;
let cancel = CancellationToken::new();
storage
.unwrap()

View File

@@ -385,7 +385,7 @@ fn start_pageserver(
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;
let remote_storage = BACKGROUND_RUNTIME.block_on(create_remote_storage_client(conf))?;
// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
@@ -701,7 +701,7 @@ fn start_pageserver(
}
}
fn create_remote_storage_client(
async fn create_remote_storage_client(
conf: &'static PageServerConf,
) -> anyhow::Result<GenericRemoteStorage> {
let config = if let Some(config) = &conf.remote_storage_config {
@@ -711,7 +711,7 @@ fn create_remote_storage_client(
};
// Create the client
let mut remote_storage = GenericRemoteStorage::from_config(config)?;
let mut remote_storage = GenericRemoteStorage::from_config(config).await?;
// If `test_remote_failures` is non-zero, wrap the client with a
// wrapper that simulates failures.

View File

@@ -96,7 +96,7 @@ pub async fn collect_metrics(
.expect("Failed to create http client with timeout");
let bucket_client = if let Some(bucket_config) = metric_collection_bucket {
match GenericRemoteStorage::from_config(bucket_config) {
match GenericRemoteStorage::from_config(bucket_config).await {
Ok(client) => Some(client),
Err(e) => {
// Non-fatal error: if we were given an invalid config, we will proceed

View File

@@ -828,9 +828,9 @@ mod test {
}
}
fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
let harness = TenantHarness::create(test_name)?;
let harness = TenantHarness::create(test_name).await?;
// We do not load() the harness: we only need its config and remote_storage
@@ -844,7 +844,9 @@ mod test {
},
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
let storage = GenericRemoteStorage::from_config(&storage_config)
.await
.unwrap();
let mock_control_plane = MockControlPlane::new();
@@ -922,7 +924,9 @@ mod test {
#[tokio::test]
async fn deletion_queue_smoke() -> anyhow::Result<()> {
// Basic test that the deletion queue processes the deletions we pass into it
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
let ctx = setup("deletion_queue_smoke")
.await
.expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;
@@ -992,7 +996,9 @@ mod test {
#[tokio::test]
async fn deletion_queue_validation() -> anyhow::Result<()> {
let ctx = setup("deletion_queue_validation").expect("Failed test setup");
let ctx = setup("deletion_queue_validation")
.await
.expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;
@@ -1051,7 +1057,9 @@ mod test {
#[tokio::test]
async fn deletion_queue_recovery() -> anyhow::Result<()> {
// Basic test that the deletion queue processes the deletions we pass into it
let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
let mut ctx = setup("deletion_queue_recovery")
.await
.expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;

View File

@@ -2031,7 +2031,7 @@ mod tests {
#[tokio::test]
async fn aux_files_round_trip() -> anyhow::Result<()> {
let name = "aux_files_round_trip";
let harness = TenantHarness::create(name)?;
let harness = TenantHarness::create(name).await?;
pub const TIMELINE_ID: TimelineId =
TimelineId::from_array(hex!("11223344556677881122334455667788"));

View File

@@ -3797,7 +3797,7 @@ pub(crate) mod harness {
}
impl TenantHarness {
pub fn create_custom(
pub async fn create_custom(
test_name: &'static str,
tenant_conf: TenantConf,
tenant_id: TenantId,
@@ -3833,7 +3833,7 @@ pub(crate) mod harness {
},
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};
let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
let remote_storage = GenericRemoteStorage::from_config(&config).await.unwrap();
let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
Ok(Self {
@@ -3848,7 +3848,7 @@ pub(crate) mod harness {
})
}
pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
pub async fn create(test_name: &'static str) -> anyhow::Result<Self> {
// Disable automatic GC and compaction to make the unit tests more deterministic.
// The tests perform them manually if needed.
let tenant_conf = TenantConf {
@@ -3865,6 +3865,7 @@ pub(crate) mod harness {
shard,
Generation::new(0xdeadbeef),
)
.await
}
pub fn span(&self) -> tracing::Span {
@@ -4001,7 +4002,7 @@ mod tests {
#[tokio::test]
async fn test_basic() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_basic").await?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -4048,7 +4049,8 @@ mod tests {
#[tokio::test]
async fn no_duplicate_timelines() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")
.await?
.load()
.await;
let _ = tenant
@@ -4080,7 +4082,7 @@ mod tests {
async fn test_branch() -> anyhow::Result<()> {
use std::str::from_utf8;
let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_branch").await?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -4202,7 +4204,8 @@ mod tests {
#[tokio::test]
async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")
.await?
.load()
.await;
let tline = tenant
@@ -4249,7 +4252,8 @@ mod tests {
#[tokio::test]
async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")
.await?
.load()
.await;
@@ -4304,7 +4308,8 @@ mod tests {
#[tokio::test]
async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")
.await?
.load()
.await;
let tline = tenant
@@ -4361,7 +4366,8 @@ mod tests {
#[tokio::test]
async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")
.await?
.load()
.await;
let tline = tenant
@@ -4391,10 +4397,10 @@ mod tests {
}
#[tokio::test]
async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
.load()
.await;
let (tenant, ctx) = TenantHarness::create("test_parent_keeps_data_forever_after_branching")
.await?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -4432,7 +4438,7 @@ mod tests {
#[tokio::test]
async fn timeline_load() -> anyhow::Result<()> {
const TEST_NAME: &str = "timeline_load";
let harness = TenantHarness::create(TEST_NAME)?;
let harness = TenantHarness::create(TEST_NAME).await?;
{
let (tenant, ctx) = harness.load().await;
let tline = tenant
@@ -4459,7 +4465,7 @@ mod tests {
#[tokio::test]
async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
const TEST_NAME: &str = "timeline_load_with_ancestor";
let harness = TenantHarness::create(TEST_NAME)?;
let harness = TenantHarness::create(TEST_NAME).await?;
// create two timelines
{
let (tenant, ctx) = harness.load().await;
@@ -4507,7 +4513,10 @@ mod tests {
#[tokio::test]
async fn delta_layer_dumping() -> anyhow::Result<()> {
use storage_layer::AsLayerDesc;
let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_layer_dumping")
.await?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -4534,7 +4543,7 @@ mod tests {
#[tokio::test]
async fn test_images() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_images").await?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -4705,7 +4714,7 @@ mod tests {
//
#[tokio::test]
async fn test_bulk_insert() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_bulk_insert")?;
let harness = TenantHarness::create("test_bulk_insert").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
@@ -4736,7 +4745,7 @@ mod tests {
// so the search can stop at the first delta layer and doesn't traverse any deeper.
#[tokio::test]
async fn test_get_vectored() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored")?;
let harness = TenantHarness::create("test_get_vectored").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
@@ -4814,7 +4823,7 @@ mod tests {
#[tokio::test]
async fn test_get_vectored_aux_files() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored_aux_files")?;
let harness = TenantHarness::create("test_get_vectored_aux_files").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
@@ -4900,7 +4909,8 @@ mod tests {
TenantId::generate(),
ShardIdentity::unsharded(),
Generation::new(0xdeadbeef),
)?;
)
.await?;
let (tenant, ctx) = harness.load().await;
let mut current_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
@@ -5043,7 +5053,7 @@ mod tests {
// ```
#[tokio::test]
async fn test_get_vectored_ancestor_descent() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored_on_lsn_axis")?;
let harness = TenantHarness::create("test_get_vectored_on_lsn_axis").await?;
let (tenant, ctx) = harness.load().await;
let start_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
@@ -5192,7 +5202,7 @@ mod tests {
name: &'static str,
compaction_algorithm: CompactionAlgorithm,
) -> anyhow::Result<()> {
let mut harness = TenantHarness::create(name)?;
let mut harness = TenantHarness::create(name).await?;
harness.tenant_conf.compaction_algorithm = CompactionAlgorithmSettings {
kind: compaction_algorithm,
};
@@ -5276,7 +5286,8 @@ mod tests {
#[tokio::test]
async fn test_traverse_branches() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")
.await?
.load()
.await;
let mut tline = tenant
@@ -5366,7 +5377,8 @@ mod tests {
#[tokio::test]
async fn test_traverse_ancestors() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")
.await?
.load()
.await;
let mut tline = tenant
@@ -5432,7 +5444,8 @@ mod tests {
#[tokio::test]
async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")
.await?
.load()
.await;
@@ -5501,7 +5514,7 @@ mod tests {
#[tokio::test]
async fn test_create_guard_crash() -> anyhow::Result<()> {
let name = "test_create_guard_crash";
let harness = TenantHarness::create(name)?;
let harness = TenantHarness::create(name).await?;
{
let (tenant, ctx) = harness.load().await;
let tline = tenant
@@ -5554,7 +5567,7 @@ mod tests {
name: &'static str,
compaction_algorithm: CompactionAlgorithm,
) -> anyhow::Result<()> {
let mut harness = TenantHarness::create(name)?;
let mut harness = TenantHarness::create(name).await?;
harness.tenant_conf.compaction_algorithm = CompactionAlgorithmSettings {
kind: compaction_algorithm,
};
@@ -5578,7 +5591,7 @@ mod tests {
#[tokio::test]
async fn test_metadata_scan() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_scan")?;
let harness = TenantHarness::create("test_metadata_scan").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
@@ -5697,7 +5710,7 @@ mod tests {
#[tokio::test]
async fn test_metadata_compaction_trigger() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_compaction_trigger")?;
let harness = TenantHarness::create("test_metadata_compaction_trigger").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
@@ -5756,7 +5769,9 @@ mod tests {
#[tokio::test]
async fn test_branch_copies_dirty_aux_file_flag() {
let harness = TenantHarness::create("test_branch_copies_dirty_aux_file_flag").unwrap();
let harness = TenantHarness::create("test_branch_copies_dirty_aux_file_flag")
.await
.unwrap();
// the default aux file policy to switch is v1 if not set by the admins
assert_eq!(
@@ -5858,7 +5873,9 @@ mod tests {
#[tokio::test]
async fn aux_file_policy_switch() {
let mut harness = TenantHarness::create("aux_file_policy_switch").unwrap();
let mut harness = TenantHarness::create("aux_file_policy_switch")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::CrossValidation; // set to cross-validation mode
let (tenant, ctx) = harness.load().await;
@@ -6032,7 +6049,9 @@ mod tests {
#[tokio::test]
async fn aux_file_policy_force_switch() {
let mut harness = TenantHarness::create("aux_file_policy_force_switch").unwrap();
let mut harness = TenantHarness::create("aux_file_policy_force_switch")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V1;
let (tenant, ctx) = harness.load().await;
@@ -6093,7 +6112,9 @@ mod tests {
#[tokio::test]
async fn aux_file_policy_auto_detect() {
let mut harness = TenantHarness::create("aux_file_policy_auto_detect").unwrap();
let mut harness = TenantHarness::create("aux_file_policy_auto_detect")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V2; // set to cross-validation mode
let (tenant, ctx) = harness.load().await;
@@ -6156,7 +6177,7 @@ mod tests {
#[tokio::test]
async fn test_metadata_image_creation() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_image_creation")?;
let harness = TenantHarness::create("test_metadata_image_creation").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
@@ -6255,7 +6276,7 @@ mod tests {
#[tokio::test]
async fn test_vectored_missing_data_key_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_missing_data_key_reads")?;
let harness = TenantHarness::create("test_vectored_missing_data_key_reads").await?;
let (tenant, ctx) = harness.load().await;
let base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
@@ -6327,7 +6348,7 @@ mod tests {
#[tokio::test]
async fn test_vectored_missing_metadata_key_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_missing_metadata_key_reads")?;
let harness = TenantHarness::create("test_vectored_missing_metadata_key_reads").await?;
let (tenant, ctx) = harness.load().await;
let base_key = Key::from_hex("620000000033333333444444445500000000").unwrap();
@@ -6419,7 +6440,7 @@ mod tests {
#[tokio::test]
async fn test_metadata_tombstone_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_tombstone_reads")?;
let harness = TenantHarness::create("test_metadata_tombstone_reads").await?;
let (tenant, ctx) = harness.load().await;
let key0 = Key::from_hex("620000000033333333444444445500000000").unwrap();
let key1 = Key::from_hex("620000000033333333444444445500000001").unwrap();
@@ -6499,7 +6520,9 @@ mod tests {
#[tokio::test]
async fn test_metadata_tombstone_image_creation() {
let harness = TenantHarness::create("test_metadata_tombstone_image_creation").unwrap();
let harness = TenantHarness::create("test_metadata_tombstone_image_creation")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let key0 = Key::from_hex("620000000033333333444444445500000000").unwrap();
@@ -6571,8 +6594,9 @@ mod tests {
#[tokio::test]
async fn test_metadata_tombstone_empty_image_creation() {
let harness =
TenantHarness::create("test_metadata_tombstone_empty_image_creation").unwrap();
let harness = TenantHarness::create("test_metadata_tombstone_empty_image_creation")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let key1 = Key::from_hex("620000000033333333444444445500000001").unwrap();
@@ -6635,7 +6659,7 @@ mod tests {
#[tokio::test]
async fn test_simple_bottom_most_compaction_images() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction_images")?;
let harness = TenantHarness::create("test_simple_bottom_most_compaction_images").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -6843,7 +6867,7 @@ mod tests {
#[tokio::test]
async fn test_neon_test_record() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_neon_test_record")?;
let harness = TenantHarness::create("test_neon_test_record").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -6924,7 +6948,7 @@ mod tests {
#[tokio::test]
async fn test_lsn_lease() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_lsn_lease")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_lsn_lease").await?.load().await;
let key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let end_lsn = Lsn(0x100);
@@ -7013,7 +7037,7 @@ mod tests {
#[tokio::test]
async fn test_simple_bottom_most_compaction_deltas() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction_deltas")?;
let harness = TenantHarness::create("test_simple_bottom_most_compaction_deltas").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {

View File

@@ -2698,7 +2698,9 @@ mod tests {
// Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
// wait for it to complete before proceeding.
let h = TenantHarness::create("shutdown_awaits_in_progress_tenant").unwrap();
let h = TenantHarness::create("shutdown_awaits_in_progress_tenant")
.await
.unwrap();
let (t, _ctx) = h.load().await;
// harness loads it to active, which is forced and nothing is running on the tenant

View File

@@ -2128,7 +2128,7 @@ mod tests {
impl TestSetup {
async fn new(test_name: &str) -> anyhow::Result<Self> {
let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
let harness = TenantHarness::create(test_name)?;
let harness = TenantHarness::create(test_name).await?;
let (tenant, ctx) = harness.load().await;
let timeline = tenant

View File

@@ -1934,7 +1934,7 @@ pub(crate) mod test {
#[tokio::test]
async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read")?;
let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
let (tenant, ctx) = harness.load().await;
let timeline_id = TimelineId::generate();
@@ -2034,7 +2034,9 @@ pub(crate) mod test {
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke").unwrap();
let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
.await
.unwrap();
let (tenant, ctx) = h.load().await;
let ctx = &ctx;
let timeline = tenant
@@ -2312,7 +2314,7 @@ pub(crate) mod test {
#[tokio::test]
async fn delta_layer_iterator() {
let harness = TenantHarness::create("delta_layer_iterator").unwrap();
let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant

View File

@@ -1111,6 +1111,7 @@ mod test {
ShardIdentity::unsharded(),
get_next_gen(),
)
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let timeline = tenant
@@ -1177,6 +1178,7 @@ mod test {
// But here, all we care about is that the gen number is unique.
get_next_gen(),
)
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let timeline = tenant
@@ -1308,7 +1310,7 @@ mod test {
#[tokio::test]
async fn image_layer_iterator() {
let harness = TenantHarness::create("image_layer_iterator").unwrap();
let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant

View File

@@ -22,7 +22,7 @@ const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_s
async fn smoke_test() {
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("smoke_test").unwrap();
let h = TenantHarness::create("smoke_test").await.unwrap();
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
let (tenant, _) = h.load().await;
@@ -176,7 +176,9 @@ async fn evict_and_wait_on_wanted_deleted() {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("evict_and_wait_on_wanted_deleted").unwrap();
let h = TenantHarness::create("evict_and_wait_on_wanted_deleted")
.await
.unwrap();
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
let (tenant, ctx) = h.load().await;
@@ -258,7 +260,9 @@ fn read_wins_pending_eviction() {
rt.block_on(async move {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("read_wins_pending_eviction").unwrap();
let h = TenantHarness::create("read_wins_pending_eviction")
.await
.unwrap();
let (tenant, ctx) = h.load().await;
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
@@ -390,7 +394,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
rt.block_on(async move {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create(name).unwrap();
let h = TenantHarness::create(name).await.unwrap();
let (tenant, ctx) = h.load().await;
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
@@ -559,8 +563,9 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
#[tokio::test(start_paused = true)]
async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
let handle = tokio::runtime::Handle::current();
let h =
TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction").unwrap();
let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
.await
.unwrap();
let (tenant, ctx) = h.load().await;
let timeline = tenant
@@ -636,7 +641,9 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
#[tokio::test(start_paused = true)]
async fn evict_and_wait_does_not_wait_for_download() {
// let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download").unwrap();
let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download")
.await
.unwrap();
let (tenant, ctx) = h.load().await;
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
@@ -733,7 +740,9 @@ async fn eviction_cancellation_on_drop() {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("eviction_cancellation_on_drop").unwrap();
let h = TenantHarness::create("eviction_cancellation_on_drop")
.await
.unwrap();
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
let (tenant, ctx) = h.load().await;

View File

@@ -293,7 +293,9 @@ mod tests {
use crate::repository::Value;
use bytes::Bytes;
let harness = TenantHarness::create("merge_iterator_merge_in_between").unwrap();
let harness = TenantHarness::create("merge_iterator_merge_in_between")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant
@@ -356,7 +358,9 @@ mod tests {
use crate::repository::Value;
use bytes::Bytes;
let harness = TenantHarness::create("merge_iterator_delta_merge").unwrap();
let harness = TenantHarness::create("merge_iterator_delta_merge")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant
@@ -430,7 +434,9 @@ mod tests {
use crate::repository::Value;
use bytes::Bytes;
let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge").unwrap();
let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant

View File

@@ -6046,8 +6046,9 @@ mod tests {
#[tokio::test]
async fn two_layer_eviction_attempts_at_the_same_time() {
let harness =
TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
let harness = TenantHarness::create("two_layer_eviction_attempts_at_the_same_time")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let timeline = tenant

View File

@@ -1118,7 +1118,7 @@ mod tests {
#[tokio::test]
async fn no_connection_no_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("no_connection_no_candidate")?;
let harness = TenantHarness::create("no_connection_no_candidate").await?;
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
@@ -1151,7 +1151,7 @@ mod tests {
#[tokio::test]
async fn connection_no_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("connection_no_candidate")?;
let harness = TenantHarness::create("connection_no_candidate").await?;
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
@@ -1216,7 +1216,7 @@ mod tests {
#[tokio::test]
async fn no_connection_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("no_connection_candidate")?;
let harness = TenantHarness::create("no_connection_candidate").await?;
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
@@ -1279,7 +1279,7 @@ mod tests {
#[tokio::test]
async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
let harness = TenantHarness::create("candidate_with_many_connection_failures")?;
let harness = TenantHarness::create("candidate_with_many_connection_failures").await?;
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
@@ -1319,7 +1319,7 @@ mod tests {
#[tokio::test]
async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?;
let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate").await?;
let mut state = dummy_state(&harness).await;
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();
@@ -1385,7 +1385,8 @@ mod tests {
#[tokio::test]
async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("timeout_connection_threshold_current_candidate")?;
let harness =
TenantHarness::create("timeout_connection_threshold_current_candidate").await?;
let mut state = dummy_state(&harness).await;
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();
@@ -1448,7 +1449,7 @@ mod tests {
#[tokio::test]
async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate")?;
let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate").await?;
let mut state = dummy_state(&harness).await;
let current_lsn = Lsn(100_000).align();
let new_lsn = Lsn(100_100).align();
@@ -1550,7 +1551,7 @@ mod tests {
// and pageserver should prefer to connect to it.
let test_az = Some("test_az".to_owned());
let harness = TenantHarness::create("switch_to_same_availability_zone")?;
let harness = TenantHarness::create("switch_to_same_availability_zone").await?;
let mut state = dummy_state(&harness).await;
state.conf.availability_zone.clone_from(&test_az);
let current_lsn = Lsn(100_000).align();

View File

@@ -1754,7 +1754,7 @@ mod tests {
#[tokio::test]
async fn test_relsize() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -1975,7 +1975,10 @@ mod tests {
// and then created it again within the same layer.
#[tokio::test]
async fn test_drop_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_drop_extend")
.await?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -2046,7 +2049,10 @@ mod tests {
// and then extended it again within the same layer.
#[tokio::test]
async fn test_truncate_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
.await?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -2188,7 +2194,7 @@ mod tests {
/// split into multiple 1 GB segments in Postgres.
#[tokio::test]
async fn test_large_rel() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -2296,7 +2302,7 @@ mod tests {
let startpoint = Lsn::from_hex("14AEC08").unwrap();
let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
let harness = TenantHarness::create("test_ingest_real_wal").unwrap();
let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
let (tenant, ctx) = harness.load().await;
let remote_initdb_path =

View File

@@ -181,8 +181,9 @@ pub async fn worker(
let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
let rx = rx.map(RequestData::from);
let storage =
GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?;
let storage = GenericRemoteStorage::from_config(&remote_storage_config)
.await
.context("remote storage init")?;
let properties = WriterProperties::builder()
.set_data_page_size_limit(config.parquet_upload_page_size)
@@ -217,6 +218,7 @@ pub async fn worker(
let storage_disconnect =
GenericRemoteStorage::from_config(&disconnect_events_storage_config)
.await
.context("remote storage for disconnect events init")?;
let parquet_config_disconnect = parquet_config.clone();
tokio::try_join!(
@@ -545,7 +547,9 @@ mod tests {
},
timeout: std::time::Duration::from_secs(120),
};
let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap();
let storage = GenericRemoteStorage::from_config(&remote_storage_config)
.await
.unwrap();
worker_inner(storage, rx, config).await.unwrap();

View File

@@ -357,11 +357,15 @@ pub async fn task_backup(
info!("metrics backup has shut down");
}
// Even if the remote storage is not configured, we still want to clear the metrics.
let storage = backup_config
.remote_storage_config
.as_ref()
.map(|config| GenericRemoteStorage::from_config(config).context("remote storage init"))
.transpose()?;
let storage = if let Some(config) = backup_config.remote_storage_config.as_ref() {
Some(
GenericRemoteStorage::from_config(config)
.await
.context("remote storage init")?,
)
} else {
None
};
let mut ticker = tokio::time::interval(backup_config.interval);
let mut prev = Utc::now();
let hostname = hostname::get()?.as_os_str().to_string_lossy().into_owned();

View File

@@ -418,7 +418,7 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let timeline_collector = safekeeper::metrics::TimelineCollector::new();
metrics::register_internal(Box::new(timeline_collector))?;
wal_backup::init_remote_storage(&conf);
wal_backup::init_remote_storage(&conf).await;
// Keep handles to main tasks to die if any of them disappears.
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =

View File

@@ -22,7 +22,7 @@ use tokio::fs::File;
use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::watch;
use tokio::sync::{watch, OnceCell};
use tokio::time::sleep;
use tracing::*;
@@ -33,8 +33,6 @@ use crate::timeline::{PeerInfo, WalResidentTimeline};
use crate::timeline_manager::{Manager, StateSnapshot};
use crate::{SafeKeeperConf, WAL_BACKUP_RUNTIME};
use once_cell::sync::OnceCell;
const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
@@ -167,7 +165,7 @@ fn determine_offloader(
}
}
static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::const_new();
// Storage must be configured and initialized when this is called.
fn get_configured_remote_storage() -> &'static GenericRemoteStorage {
@@ -178,14 +176,22 @@ fn get_configured_remote_storage() -> &'static GenericRemoteStorage {
.unwrap()
}
pub fn init_remote_storage(conf: &SafeKeeperConf) {
pub async fn init_remote_storage(conf: &SafeKeeperConf) {
// TODO: refactor REMOTE_STORAGE to avoid using global variables, and provide
// dependencies to all tasks instead.
REMOTE_STORAGE.get_or_init(|| {
conf.remote_storage
.as_ref()
.map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
});
REMOTE_STORAGE
.get_or_init(|| async {
if let Some(conf) = conf.remote_storage.as_ref() {
Some(
GenericRemoteStorage::from_config(conf)
.await
.expect("failed to create remote storage"),
)
} else {
None
}
})
.await;
}
struct WalBackupTask {