Use DefaultCredentialsChain AWS authentication in remote_storage (#8440)

PR #8299 has switched the storage scrubber to use
`DefaultCredentialsChain`. Now we do this for `remote_storage`, as it
allows us to use `remote_storage` from inside kubernetes. Most of the
diff is due to `GenericRemoteStorage::from_config` becoming `async fn`.
This commit is contained in:
Arpad Müller
2024-07-19 21:19:30 +02:00
committed by Christian Schwarz
parent 21b3a191bf
commit 31bfeaf934
23 changed files with 219 additions and 156 deletions

View File

@@ -385,7 +385,7 @@ fn start_pageserver(
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;
let remote_storage = BACKGROUND_RUNTIME.block_on(create_remote_storage_client(conf))?;
// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
@@ -701,7 +701,7 @@ fn start_pageserver(
}
}
fn create_remote_storage_client(
async fn create_remote_storage_client(
conf: &'static PageServerConf,
) -> anyhow::Result<GenericRemoteStorage> {
let config = if let Some(config) = &conf.remote_storage_config {
@@ -711,7 +711,7 @@ fn create_remote_storage_client(
};
// Create the client
let mut remote_storage = GenericRemoteStorage::from_config(config)?;
let mut remote_storage = GenericRemoteStorage::from_config(config).await?;
// If `test_remote_failures` is non-zero, wrap the client with a
// wrapper that simulates failures.

View File

@@ -96,7 +96,7 @@ pub async fn collect_metrics(
.expect("Failed to create http client with timeout");
let bucket_client = if let Some(bucket_config) = metric_collection_bucket {
match GenericRemoteStorage::from_config(bucket_config) {
match GenericRemoteStorage::from_config(bucket_config).await {
Ok(client) => Some(client),
Err(e) => {
// Non-fatal error: if we were given an invalid config, we will proceed

View File

@@ -828,9 +828,9 @@ mod test {
}
}
fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
let harness = TenantHarness::create(test_name)?;
let harness = TenantHarness::create(test_name).await?;
// We do not load() the harness: we only need its config and remote_storage
@@ -844,7 +844,9 @@ mod test {
},
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
let storage = GenericRemoteStorage::from_config(&storage_config)
.await
.unwrap();
let mock_control_plane = MockControlPlane::new();
@@ -922,7 +924,9 @@ mod test {
#[tokio::test]
async fn deletion_queue_smoke() -> anyhow::Result<()> {
// Basic test that the deletion queue processes the deletions we pass into it
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
let ctx = setup("deletion_queue_smoke")
.await
.expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;
@@ -992,7 +996,9 @@ mod test {
#[tokio::test]
async fn deletion_queue_validation() -> anyhow::Result<()> {
let ctx = setup("deletion_queue_validation").expect("Failed test setup");
let ctx = setup("deletion_queue_validation")
.await
.expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;
@@ -1051,7 +1057,9 @@ mod test {
#[tokio::test]
async fn deletion_queue_recovery() -> anyhow::Result<()> {
// Basic test that the deletion queue processes the deletions we pass into it
let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
let mut ctx = setup("deletion_queue_recovery")
.await
.expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;

View File

@@ -2031,7 +2031,7 @@ mod tests {
#[tokio::test]
async fn aux_files_round_trip() -> anyhow::Result<()> {
let name = "aux_files_round_trip";
let harness = TenantHarness::create(name)?;
let harness = TenantHarness::create(name).await?;
pub const TIMELINE_ID: TimelineId =
TimelineId::from_array(hex!("11223344556677881122334455667788"));

View File

@@ -3797,7 +3797,7 @@ pub(crate) mod harness {
}
impl TenantHarness {
pub fn create_custom(
pub async fn create_custom(
test_name: &'static str,
tenant_conf: TenantConf,
tenant_id: TenantId,
@@ -3833,7 +3833,7 @@ pub(crate) mod harness {
},
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};
let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
let remote_storage = GenericRemoteStorage::from_config(&config).await.unwrap();
let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
Ok(Self {
@@ -3848,7 +3848,7 @@ pub(crate) mod harness {
})
}
pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
pub async fn create(test_name: &'static str) -> anyhow::Result<Self> {
// Disable automatic GC and compaction to make the unit tests more deterministic.
// The tests perform them manually if needed.
let tenant_conf = TenantConf {
@@ -3865,6 +3865,7 @@ pub(crate) mod harness {
shard,
Generation::new(0xdeadbeef),
)
.await
}
pub fn span(&self) -> tracing::Span {
@@ -4001,7 +4002,7 @@ mod tests {
#[tokio::test]
async fn test_basic() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_basic").await?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -4048,7 +4049,8 @@ mod tests {
#[tokio::test]
async fn no_duplicate_timelines() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")
.await?
.load()
.await;
let _ = tenant
@@ -4080,7 +4082,7 @@ mod tests {
async fn test_branch() -> anyhow::Result<()> {
use std::str::from_utf8;
let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_branch").await?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -4202,7 +4204,8 @@ mod tests {
#[tokio::test]
async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")
.await?
.load()
.await;
let tline = tenant
@@ -4249,7 +4252,8 @@ mod tests {
#[tokio::test]
async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")
.await?
.load()
.await;
@@ -4304,7 +4308,8 @@ mod tests {
#[tokio::test]
async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")
.await?
.load()
.await;
let tline = tenant
@@ -4361,7 +4366,8 @@ mod tests {
#[tokio::test]
async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")
.await?
.load()
.await;
let tline = tenant
@@ -4391,10 +4397,10 @@ mod tests {
}
#[tokio::test]
async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
.load()
.await;
let (tenant, ctx) = TenantHarness::create("test_parent_keeps_data_forever_after_branching")
.await?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -4432,7 +4438,7 @@ mod tests {
#[tokio::test]
async fn timeline_load() -> anyhow::Result<()> {
const TEST_NAME: &str = "timeline_load";
let harness = TenantHarness::create(TEST_NAME)?;
let harness = TenantHarness::create(TEST_NAME).await?;
{
let (tenant, ctx) = harness.load().await;
let tline = tenant
@@ -4459,7 +4465,7 @@ mod tests {
#[tokio::test]
async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
const TEST_NAME: &str = "timeline_load_with_ancestor";
let harness = TenantHarness::create(TEST_NAME)?;
let harness = TenantHarness::create(TEST_NAME).await?;
// create two timelines
{
let (tenant, ctx) = harness.load().await;
@@ -4507,7 +4513,10 @@ mod tests {
#[tokio::test]
async fn delta_layer_dumping() -> anyhow::Result<()> {
use storage_layer::AsLayerDesc;
let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_layer_dumping")
.await?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -4534,7 +4543,7 @@ mod tests {
#[tokio::test]
async fn test_images() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_images").await?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -4705,7 +4714,7 @@ mod tests {
//
#[tokio::test]
async fn test_bulk_insert() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_bulk_insert")?;
let harness = TenantHarness::create("test_bulk_insert").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
@@ -4736,7 +4745,7 @@ mod tests {
// so the search can stop at the first delta layer and doesn't traverse any deeper.
#[tokio::test]
async fn test_get_vectored() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored")?;
let harness = TenantHarness::create("test_get_vectored").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
@@ -4814,7 +4823,7 @@ mod tests {
#[tokio::test]
async fn test_get_vectored_aux_files() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored_aux_files")?;
let harness = TenantHarness::create("test_get_vectored_aux_files").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
@@ -4900,7 +4909,8 @@ mod tests {
TenantId::generate(),
ShardIdentity::unsharded(),
Generation::new(0xdeadbeef),
)?;
)
.await?;
let (tenant, ctx) = harness.load().await;
let mut current_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
@@ -5043,7 +5053,7 @@ mod tests {
// ```
#[tokio::test]
async fn test_get_vectored_ancestor_descent() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored_on_lsn_axis")?;
let harness = TenantHarness::create("test_get_vectored_on_lsn_axis").await?;
let (tenant, ctx) = harness.load().await;
let start_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
@@ -5192,7 +5202,7 @@ mod tests {
name: &'static str,
compaction_algorithm: CompactionAlgorithm,
) -> anyhow::Result<()> {
let mut harness = TenantHarness::create(name)?;
let mut harness = TenantHarness::create(name).await?;
harness.tenant_conf.compaction_algorithm = CompactionAlgorithmSettings {
kind: compaction_algorithm,
};
@@ -5276,7 +5286,8 @@ mod tests {
#[tokio::test]
async fn test_traverse_branches() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")
.await?
.load()
.await;
let mut tline = tenant
@@ -5366,7 +5377,8 @@ mod tests {
#[tokio::test]
async fn test_traverse_ancestors() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")
.await?
.load()
.await;
let mut tline = tenant
@@ -5432,7 +5444,8 @@ mod tests {
#[tokio::test]
async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")
.await?
.load()
.await;
@@ -5501,7 +5514,7 @@ mod tests {
#[tokio::test]
async fn test_create_guard_crash() -> anyhow::Result<()> {
let name = "test_create_guard_crash";
let harness = TenantHarness::create(name)?;
let harness = TenantHarness::create(name).await?;
{
let (tenant, ctx) = harness.load().await;
let tline = tenant
@@ -5554,7 +5567,7 @@ mod tests {
name: &'static str,
compaction_algorithm: CompactionAlgorithm,
) -> anyhow::Result<()> {
let mut harness = TenantHarness::create(name)?;
let mut harness = TenantHarness::create(name).await?;
harness.tenant_conf.compaction_algorithm = CompactionAlgorithmSettings {
kind: compaction_algorithm,
};
@@ -5578,7 +5591,7 @@ mod tests {
#[tokio::test]
async fn test_metadata_scan() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_scan")?;
let harness = TenantHarness::create("test_metadata_scan").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
@@ -5697,7 +5710,7 @@ mod tests {
#[tokio::test]
async fn test_metadata_compaction_trigger() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_compaction_trigger")?;
let harness = TenantHarness::create("test_metadata_compaction_trigger").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
@@ -5756,7 +5769,9 @@ mod tests {
#[tokio::test]
async fn test_branch_copies_dirty_aux_file_flag() {
let harness = TenantHarness::create("test_branch_copies_dirty_aux_file_flag").unwrap();
let harness = TenantHarness::create("test_branch_copies_dirty_aux_file_flag")
.await
.unwrap();
// the default aux file policy to switch is v1 if not set by the admins
assert_eq!(
@@ -5858,7 +5873,9 @@ mod tests {
#[tokio::test]
async fn aux_file_policy_switch() {
let mut harness = TenantHarness::create("aux_file_policy_switch").unwrap();
let mut harness = TenantHarness::create("aux_file_policy_switch")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::CrossValidation; // set to cross-validation mode
let (tenant, ctx) = harness.load().await;
@@ -6032,7 +6049,9 @@ mod tests {
#[tokio::test]
async fn aux_file_policy_force_switch() {
let mut harness = TenantHarness::create("aux_file_policy_force_switch").unwrap();
let mut harness = TenantHarness::create("aux_file_policy_force_switch")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V1;
let (tenant, ctx) = harness.load().await;
@@ -6093,7 +6112,9 @@ mod tests {
#[tokio::test]
async fn aux_file_policy_auto_detect() {
let mut harness = TenantHarness::create("aux_file_policy_auto_detect").unwrap();
let mut harness = TenantHarness::create("aux_file_policy_auto_detect")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V2; // set to cross-validation mode
let (tenant, ctx) = harness.load().await;
@@ -6156,7 +6177,7 @@ mod tests {
#[tokio::test]
async fn test_metadata_image_creation() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_image_creation")?;
let harness = TenantHarness::create("test_metadata_image_creation").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
@@ -6255,7 +6276,7 @@ mod tests {
#[tokio::test]
async fn test_vectored_missing_data_key_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_missing_data_key_reads")?;
let harness = TenantHarness::create("test_vectored_missing_data_key_reads").await?;
let (tenant, ctx) = harness.load().await;
let base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
@@ -6327,7 +6348,7 @@ mod tests {
#[tokio::test]
async fn test_vectored_missing_metadata_key_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_missing_metadata_key_reads")?;
let harness = TenantHarness::create("test_vectored_missing_metadata_key_reads").await?;
let (tenant, ctx) = harness.load().await;
let base_key = Key::from_hex("620000000033333333444444445500000000").unwrap();
@@ -6419,7 +6440,7 @@ mod tests {
#[tokio::test]
async fn test_metadata_tombstone_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_tombstone_reads")?;
let harness = TenantHarness::create("test_metadata_tombstone_reads").await?;
let (tenant, ctx) = harness.load().await;
let key0 = Key::from_hex("620000000033333333444444445500000000").unwrap();
let key1 = Key::from_hex("620000000033333333444444445500000001").unwrap();
@@ -6499,7 +6520,9 @@ mod tests {
#[tokio::test]
async fn test_metadata_tombstone_image_creation() {
let harness = TenantHarness::create("test_metadata_tombstone_image_creation").unwrap();
let harness = TenantHarness::create("test_metadata_tombstone_image_creation")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let key0 = Key::from_hex("620000000033333333444444445500000000").unwrap();
@@ -6571,8 +6594,9 @@ mod tests {
#[tokio::test]
async fn test_metadata_tombstone_empty_image_creation() {
let harness =
TenantHarness::create("test_metadata_tombstone_empty_image_creation").unwrap();
let harness = TenantHarness::create("test_metadata_tombstone_empty_image_creation")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let key1 = Key::from_hex("620000000033333333444444445500000001").unwrap();
@@ -6635,7 +6659,7 @@ mod tests {
#[tokio::test]
async fn test_simple_bottom_most_compaction_images() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction_images")?;
let harness = TenantHarness::create("test_simple_bottom_most_compaction_images").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -6843,7 +6867,7 @@ mod tests {
#[tokio::test]
async fn test_neon_test_record() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_neon_test_record")?;
let harness = TenantHarness::create("test_neon_test_record").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -6924,7 +6948,7 @@ mod tests {
#[tokio::test]
async fn test_lsn_lease() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_lsn_lease")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_lsn_lease").await?.load().await;
let key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let end_lsn = Lsn(0x100);
@@ -7013,7 +7037,7 @@ mod tests {
#[tokio::test]
async fn test_simple_bottom_most_compaction_deltas() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction_deltas")?;
let harness = TenantHarness::create("test_simple_bottom_most_compaction_deltas").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {

View File

@@ -2698,7 +2698,9 @@ mod tests {
// Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
// wait for it to complete before proceeding.
let h = TenantHarness::create("shutdown_awaits_in_progress_tenant").unwrap();
let h = TenantHarness::create("shutdown_awaits_in_progress_tenant")
.await
.unwrap();
let (t, _ctx) = h.load().await;
// harness loads it to active, which is forced and nothing is running on the tenant

View File

@@ -2128,7 +2128,7 @@ mod tests {
impl TestSetup {
async fn new(test_name: &str) -> anyhow::Result<Self> {
let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
let harness = TenantHarness::create(test_name)?;
let harness = TenantHarness::create(test_name).await?;
let (tenant, ctx) = harness.load().await;
let timeline = tenant

View File

@@ -1934,7 +1934,7 @@ pub(crate) mod test {
#[tokio::test]
async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read")?;
let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
let (tenant, ctx) = harness.load().await;
let timeline_id = TimelineId::generate();
@@ -2034,7 +2034,9 @@ pub(crate) mod test {
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke").unwrap();
let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
.await
.unwrap();
let (tenant, ctx) = h.load().await;
let ctx = &ctx;
let timeline = tenant
@@ -2312,7 +2314,7 @@ pub(crate) mod test {
#[tokio::test]
async fn delta_layer_iterator() {
let harness = TenantHarness::create("delta_layer_iterator").unwrap();
let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant

View File

@@ -1111,6 +1111,7 @@ mod test {
ShardIdentity::unsharded(),
get_next_gen(),
)
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let timeline = tenant
@@ -1177,6 +1178,7 @@ mod test {
// But here, all we care about is that the gen number is unique.
get_next_gen(),
)
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let timeline = tenant
@@ -1308,7 +1310,7 @@ mod test {
#[tokio::test]
async fn image_layer_iterator() {
let harness = TenantHarness::create("image_layer_iterator").unwrap();
let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant

View File

@@ -22,7 +22,7 @@ const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_s
async fn smoke_test() {
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("smoke_test").unwrap();
let h = TenantHarness::create("smoke_test").await.unwrap();
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
let (tenant, _) = h.load().await;
@@ -176,7 +176,9 @@ async fn evict_and_wait_on_wanted_deleted() {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("evict_and_wait_on_wanted_deleted").unwrap();
let h = TenantHarness::create("evict_and_wait_on_wanted_deleted")
.await
.unwrap();
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
let (tenant, ctx) = h.load().await;
@@ -258,7 +260,9 @@ fn read_wins_pending_eviction() {
rt.block_on(async move {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("read_wins_pending_eviction").unwrap();
let h = TenantHarness::create("read_wins_pending_eviction")
.await
.unwrap();
let (tenant, ctx) = h.load().await;
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
@@ -390,7 +394,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
rt.block_on(async move {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create(name).unwrap();
let h = TenantHarness::create(name).await.unwrap();
let (tenant, ctx) = h.load().await;
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
@@ -559,8 +563,9 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
#[tokio::test(start_paused = true)]
async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
let handle = tokio::runtime::Handle::current();
let h =
TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction").unwrap();
let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
.await
.unwrap();
let (tenant, ctx) = h.load().await;
let timeline = tenant
@@ -636,7 +641,9 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
#[tokio::test(start_paused = true)]
async fn evict_and_wait_does_not_wait_for_download() {
// let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download").unwrap();
let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download")
.await
.unwrap();
let (tenant, ctx) = h.load().await;
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
@@ -733,7 +740,9 @@ async fn eviction_cancellation_on_drop() {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("eviction_cancellation_on_drop").unwrap();
let h = TenantHarness::create("eviction_cancellation_on_drop")
.await
.unwrap();
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
let (tenant, ctx) = h.load().await;

View File

@@ -293,7 +293,9 @@ mod tests {
use crate::repository::Value;
use bytes::Bytes;
let harness = TenantHarness::create("merge_iterator_merge_in_between").unwrap();
let harness = TenantHarness::create("merge_iterator_merge_in_between")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant
@@ -356,7 +358,9 @@ mod tests {
use crate::repository::Value;
use bytes::Bytes;
let harness = TenantHarness::create("merge_iterator_delta_merge").unwrap();
let harness = TenantHarness::create("merge_iterator_delta_merge")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant
@@ -430,7 +434,9 @@ mod tests {
use crate::repository::Value;
use bytes::Bytes;
let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge").unwrap();
let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant

View File

@@ -6046,8 +6046,9 @@ mod tests {
#[tokio::test]
async fn two_layer_eviction_attempts_at_the_same_time() {
let harness =
TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
let harness = TenantHarness::create("two_layer_eviction_attempts_at_the_same_time")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let timeline = tenant

View File

@@ -1118,7 +1118,7 @@ mod tests {
#[tokio::test]
async fn no_connection_no_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("no_connection_no_candidate")?;
let harness = TenantHarness::create("no_connection_no_candidate").await?;
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
@@ -1151,7 +1151,7 @@ mod tests {
#[tokio::test]
async fn connection_no_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("connection_no_candidate")?;
let harness = TenantHarness::create("connection_no_candidate").await?;
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
@@ -1216,7 +1216,7 @@ mod tests {
#[tokio::test]
async fn no_connection_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("no_connection_candidate")?;
let harness = TenantHarness::create("no_connection_candidate").await?;
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
@@ -1279,7 +1279,7 @@ mod tests {
#[tokio::test]
async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
let harness = TenantHarness::create("candidate_with_many_connection_failures")?;
let harness = TenantHarness::create("candidate_with_many_connection_failures").await?;
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
@@ -1319,7 +1319,7 @@ mod tests {
#[tokio::test]
async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?;
let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate").await?;
let mut state = dummy_state(&harness).await;
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();
@@ -1385,7 +1385,8 @@ mod tests {
#[tokio::test]
async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("timeout_connection_threshold_current_candidate")?;
let harness =
TenantHarness::create("timeout_connection_threshold_current_candidate").await?;
let mut state = dummy_state(&harness).await;
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();
@@ -1448,7 +1449,7 @@ mod tests {
#[tokio::test]
async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate")?;
let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate").await?;
let mut state = dummy_state(&harness).await;
let current_lsn = Lsn(100_000).align();
let new_lsn = Lsn(100_100).align();
@@ -1550,7 +1551,7 @@ mod tests {
// and pageserver should prefer to connect to it.
let test_az = Some("test_az".to_owned());
let harness = TenantHarness::create("switch_to_same_availability_zone")?;
let harness = TenantHarness::create("switch_to_same_availability_zone").await?;
let mut state = dummy_state(&harness).await;
state.conf.availability_zone.clone_from(&test_az);
let current_lsn = Lsn(100_000).align();

View File

@@ -1754,7 +1754,7 @@ mod tests {
#[tokio::test]
async fn test_relsize() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -1975,7 +1975,10 @@ mod tests {
// and then created it again within the same layer.
#[tokio::test]
async fn test_drop_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_drop_extend")
.await?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -2046,7 +2049,10 @@ mod tests {
// and then extended it again within the same layer.
#[tokio::test]
async fn test_truncate_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
.await?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -2188,7 +2194,7 @@ mod tests {
/// split into multiple 1 GB segments in Postgres.
#[tokio::test]
async fn test_large_rel() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -2296,7 +2302,7 @@ mod tests {
let startpoint = Lsn::from_hex("14AEC08").unwrap();
let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
let harness = TenantHarness::create("test_ingest_real_wal").unwrap();
let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
let (tenant, ctx) = harness.load().await;
let remote_initdb_path =