mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 21:42:56 +00:00
test: cleanup remote_timeline_client tests (#5013)
I will have to change these as I change remote_timeline_client api in #4938. So a bit of cleanup, handle my comments which were just resolved during initial review. Cleanup: - use unwrap in tests instead of mixed `?` and `unwrap` - use `Handle` instead of `&'static Reactor` to make the RemoteTimelineClient more natural - use arrays in tests - use plain `#[tokio::test]`
This commit is contained in:
@@ -222,7 +222,6 @@ use std::sync::{Arc, Mutex};
|
||||
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
|
||||
use std::ops::DerefMut;
|
||||
use tokio::runtime::Runtime;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
use tracing::{info_span, Instrument};
|
||||
use utils::lsn::Lsn;
|
||||
@@ -311,7 +310,7 @@ pub enum PersistIndexPartWithDeletedFlagError {
|
||||
pub struct RemoteTimelineClient {
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
runtime: &'static Runtime,
|
||||
runtime: tokio::runtime::Handle,
|
||||
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -338,7 +337,7 @@ impl RemoteTimelineClient {
|
||||
) -> RemoteTimelineClient {
|
||||
RemoteTimelineClient {
|
||||
conf,
|
||||
runtime: &BACKGROUND_RUNTIME,
|
||||
runtime: BACKGROUND_RUNTIME.handle().to_owned(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
storage_impl: remote_storage,
|
||||
@@ -994,7 +993,7 @@ impl RemoteTimelineClient {
|
||||
let tenant_id = self.tenant_id;
|
||||
let timeline_id = self.timeline_id;
|
||||
task_mgr::spawn(
|
||||
self.runtime.handle(),
|
||||
&self.runtime,
|
||||
TaskKind::RemoteUploadTask,
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
@@ -1347,7 +1346,7 @@ mod tests {
|
||||
context::RequestContext,
|
||||
tenant::{
|
||||
harness::{TenantHarness, TIMELINE_ID},
|
||||
Tenant,
|
||||
Tenant, Timeline,
|
||||
},
|
||||
DEFAULT_PG_VERSION,
|
||||
};
|
||||
@@ -1356,7 +1355,6 @@ mod tests {
|
||||
collections::HashSet,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use tokio::runtime::EnterGuard;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
|
||||
@@ -1406,35 +1404,25 @@ mod tests {
|
||||
}
|
||||
|
||||
struct TestSetup {
|
||||
runtime: &'static tokio::runtime::Runtime,
|
||||
entered_runtime: EnterGuard<'static>,
|
||||
harness: TenantHarness,
|
||||
tenant: Arc<Tenant>,
|
||||
timeline: Arc<Timeline>,
|
||||
tenant_ctx: RequestContext,
|
||||
remote_fs_dir: PathBuf,
|
||||
client: Arc<RemoteTimelineClient>,
|
||||
}
|
||||
|
||||
impl TestSetup {
|
||||
fn new(test_name: &str) -> anyhow::Result<Self> {
|
||||
async fn new(test_name: &str) -> anyhow::Result<Self> {
|
||||
// Use a current-thread runtime in the test
|
||||
let runtime = Box::leak(Box::new(
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?,
|
||||
));
|
||||
let entered_runtime = runtime.enter();
|
||||
|
||||
let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
// create an empty timeline directory
|
||||
let _ = runtime.block_on(tenant.create_test_timeline(
|
||||
TIMELINE_ID,
|
||||
Lsn(8),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
))?;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
@@ -1456,7 +1444,7 @@ mod tests {
|
||||
|
||||
let client = Arc::new(RemoteTimelineClient {
|
||||
conf: harness.conf,
|
||||
runtime,
|
||||
runtime: tokio::runtime::Handle::current(),
|
||||
tenant_id: harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
storage_impl: storage,
|
||||
@@ -1468,10 +1456,9 @@ mod tests {
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
runtime,
|
||||
entered_runtime,
|
||||
harness,
|
||||
tenant,
|
||||
timeline,
|
||||
tenant_ctx: ctx,
|
||||
remote_fs_dir,
|
||||
client,
|
||||
@@ -1480,8 +1467,8 @@ mod tests {
|
||||
}
|
||||
|
||||
// Test scheduling
|
||||
#[test]
|
||||
fn upload_scheduling() -> anyhow::Result<()> {
|
||||
#[tokio::test]
|
||||
async fn upload_scheduling() {
|
||||
// Test outline:
|
||||
//
|
||||
// Schedule upload of a bunch of layers. Check that they are started immediately, not queued
|
||||
@@ -1497,25 +1484,26 @@ mod tests {
|
||||
// Schedule index upload. Check that it's queued
|
||||
|
||||
let TestSetup {
|
||||
runtime,
|
||||
entered_runtime: _entered_runtime,
|
||||
harness,
|
||||
tenant: _tenant,
|
||||
timeline: _timeline,
|
||||
tenant_ctx: _tenant_ctx,
|
||||
remote_fs_dir,
|
||||
client,
|
||||
} = TestSetup::new("upload_scheduling").unwrap();
|
||||
} = TestSetup::new("upload_scheduling").await.unwrap();
|
||||
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
println!("workdir: {}", harness.conf.workdir.display());
|
||||
|
||||
let remote_timeline_dir =
|
||||
remote_fs_dir.join(timeline_path.strip_prefix(&harness.conf.workdir)?);
|
||||
remote_fs_dir.join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
|
||||
println!("remote_timeline_dir: {}", remote_timeline_dir.display());
|
||||
|
||||
let metadata = dummy_metadata(Lsn(0x10));
|
||||
client.init_upload_queue_for_empty_remote(&metadata)?;
|
||||
client
|
||||
.init_upload_queue_for_empty_remote(&metadata)
|
||||
.unwrap();
|
||||
|
||||
// Create a couple of dummy files, schedule upload for them
|
||||
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
|
||||
@@ -1524,26 +1512,32 @@ mod tests {
|
||||
let content_1 = dummy_contents("foo");
|
||||
let content_2 = dummy_contents("bar");
|
||||
let content_3 = dummy_contents("baz");
|
||||
std::fs::write(
|
||||
timeline_path.join(layer_file_name_1.file_name()),
|
||||
&content_1,
|
||||
)?;
|
||||
std::fs::write(
|
||||
timeline_path.join(layer_file_name_2.file_name()),
|
||||
&content_2,
|
||||
)?;
|
||||
std::fs::write(timeline_path.join(layer_file_name_3.file_name()), content_3)?;
|
||||
|
||||
client.schedule_layer_file_upload(
|
||||
&layer_file_name_1,
|
||||
&LayerFileMetadata::new(content_1.len() as u64),
|
||||
)?;
|
||||
client.schedule_layer_file_upload(
|
||||
&layer_file_name_2,
|
||||
&LayerFileMetadata::new(content_2.len() as u64),
|
||||
)?;
|
||||
for (filename, content) in [
|
||||
(&layer_file_name_1, &content_1),
|
||||
(&layer_file_name_2, &content_2),
|
||||
(&layer_file_name_3, &content_3),
|
||||
] {
|
||||
std::fs::write(timeline_path.join(filename.file_name()), content).unwrap();
|
||||
}
|
||||
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_1,
|
||||
&LayerFileMetadata::new(content_1.len() as u64),
|
||||
)
|
||||
.unwrap();
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_2,
|
||||
&LayerFileMetadata::new(content_2.len() as u64),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Check that they are started immediately, not queued
|
||||
//
|
||||
// this works because we running within block_on, so any futures are now queued up until
|
||||
// our next await point.
|
||||
{
|
||||
let mut guard = client.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut().unwrap();
|
||||
@@ -1557,7 +1551,9 @@ mod tests {
|
||||
|
||||
// Schedule upload of index. Check that it is queued
|
||||
let metadata = dummy_metadata(Lsn(0x20));
|
||||
client.schedule_index_upload_for_metadata_update(&metadata)?;
|
||||
client
|
||||
.schedule_index_upload_for_metadata_update(&metadata)
|
||||
.unwrap();
|
||||
{
|
||||
let mut guard = client.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut().unwrap();
|
||||
@@ -1566,7 +1562,7 @@ mod tests {
|
||||
}
|
||||
|
||||
// Wait for the uploads to finish
|
||||
runtime.block_on(client.wait_completion())?;
|
||||
client.wait_completion().await.unwrap();
|
||||
{
|
||||
let mut guard = client.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut().unwrap();
|
||||
@@ -1576,7 +1572,7 @@ mod tests {
|
||||
}
|
||||
|
||||
// Download back the index.json, and check that the list of files is correct
|
||||
let index_part = match runtime.block_on(client.download_index_file())? {
|
||||
let index_part = match client.download_index_file().await.unwrap() {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
|
||||
MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
|
||||
};
|
||||
@@ -1588,17 +1584,19 @@ mod tests {
|
||||
&layer_file_name_2.file_name(),
|
||||
],
|
||||
);
|
||||
let downloaded_metadata = index_part.parse_metadata()?;
|
||||
let downloaded_metadata = index_part.parse_metadata().unwrap();
|
||||
assert_eq!(downloaded_metadata, metadata);
|
||||
|
||||
// Schedule upload and then a deletion. Check that the deletion is queued
|
||||
let content_baz = dummy_contents("baz");
|
||||
std::fs::write(timeline_path.join("baz"), &content_baz)?;
|
||||
client.schedule_layer_file_upload(
|
||||
&layer_file_name_3,
|
||||
&LayerFileMetadata::new(content_baz.len() as u64),
|
||||
)?;
|
||||
client.schedule_layer_file_deletion(&[layer_file_name_1.clone()])?;
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_3,
|
||||
&LayerFileMetadata::new(content_3.len() as u64),
|
||||
)
|
||||
.unwrap();
|
||||
client
|
||||
.schedule_layer_file_deletion(&[layer_file_name_1.clone()])
|
||||
.unwrap();
|
||||
{
|
||||
let mut guard = client.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut().unwrap();
|
||||
@@ -1620,7 +1618,7 @@ mod tests {
|
||||
);
|
||||
|
||||
// Finish them
|
||||
runtime.block_on(client.wait_completion())?;
|
||||
client.wait_completion().await.unwrap();
|
||||
|
||||
assert_remote_files(
|
||||
&[
|
||||
@@ -1630,23 +1628,24 @@ mod tests {
|
||||
],
|
||||
&remote_timeline_dir,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bytes_unfinished_gauge_for_layer_file_uploads() -> anyhow::Result<()> {
|
||||
#[tokio::test]
|
||||
async fn bytes_unfinished_gauge_for_layer_file_uploads() {
|
||||
// Setup
|
||||
|
||||
let TestSetup {
|
||||
runtime,
|
||||
harness,
|
||||
tenant: _tenant,
|
||||
timeline: _timeline,
|
||||
client,
|
||||
..
|
||||
} = TestSetup::new("metrics")?;
|
||||
} = TestSetup::new("metrics").await.unwrap();
|
||||
|
||||
let metadata = dummy_metadata(Lsn(0x10));
|
||||
client.init_upload_queue_for_empty_remote(&metadata)?;
|
||||
client
|
||||
.init_upload_queue_for_empty_remote(&metadata)
|
||||
.unwrap();
|
||||
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
@@ -1655,7 +1654,8 @@ mod tests {
|
||||
std::fs::write(
|
||||
timeline_path.join(layer_file_name_1.file_name()),
|
||||
&content_1,
|
||||
)?;
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct BytesStartedFinished {
|
||||
@@ -1681,14 +1681,16 @@ mod tests {
|
||||
|
||||
let init = get_bytes_started_stopped();
|
||||
|
||||
client.schedule_layer_file_upload(
|
||||
&layer_file_name_1,
|
||||
&LayerFileMetadata::new(content_1.len() as u64),
|
||||
)?;
|
||||
client
|
||||
.schedule_layer_file_upload(
|
||||
&layer_file_name_1,
|
||||
&LayerFileMetadata::new(content_1.len() as u64),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let pre = get_bytes_started_stopped();
|
||||
|
||||
runtime.block_on(client.wait_completion())?;
|
||||
client.wait_completion().await.unwrap();
|
||||
|
||||
let post = get_bytes_started_stopped();
|
||||
|
||||
@@ -1716,7 +1718,5 @@ mod tests {
|
||||
finished: Some(content_1.len())
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user