Compare commits

...

3 Commits

6 changed files with 100 additions and 41 deletions

View File

@@ -581,6 +581,31 @@ fn start_pageserver(
);
}
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::BackgroundRuntimeTurnaroundMeasure,
None,
None,
"background runtime turnaround measure",
true,
async move {
let server = hyper::Server::try_bind(&"0.0.0.0:2342".parse().unwrap()).expect("bind");
let server = server
.serve(hyper::service::make_service_fn(|_| async move {
Ok::<_, std::convert::Infallible>(hyper::service::service_fn(
move |_: hyper::Request<hyper::Body>| async move {
Ok::<_, std::convert::Infallible>(hyper::Response::new(
hyper::Body::from(format!("alive")),
))
},
))
}))
.with_graceful_shutdown(task_mgr::shutdown_watcher());
server.await?;
Ok(())
},
);
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
// All started up! Now just sit and wait for shutdown signal.

View File

@@ -292,6 +292,8 @@ pub enum TaskKind {
DebugTool,
BackgroundRuntimeTurnaroundMeasure,
#[cfg(test)]
UnitTest,
}

View File

@@ -857,11 +857,11 @@ impl DeltaLayerInner {
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
// bail!(
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
// actual_summary,
// expected_summary
// );
}
}

View File

@@ -450,11 +450,11 @@ impl ImageLayerInner {
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
// bail!(
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
// actual_summary,
// expected_summary
// );
}
}

View File

@@ -631,38 +631,38 @@ impl Timeline {
) -> anyhow::Result<()> {
const ROUNDS: usize = 2;
static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
once_cell::sync::Lazy::new(|| {
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
let permits = usize::max(
1,
// while a lot of the work is done on spawn_blocking, we still do
// repartitioning in the async context. this should give leave us some workers
// unblocked to be blocked on other work, hopefully easing any outside visible
// effects of restarts.
//
// 6/8 is a guess; previously we ran with unlimited 8 and more from
// spawn_blocking.
(total_threads * 3).checked_div(4).unwrap_or(0),
);
assert_ne!(permits, 0, "we will not be adding in permits later");
assert!(
permits < total_threads,
"need threads avail for shorter work"
);
tokio::sync::Semaphore::new(permits)
});
// static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
// once_cell::sync::Lazy::new(|| {
// let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
// let permits = usize::max(
// 1,
// // while a lot of the work is done on spawn_blocking, we still do
// // repartitioning in the async context. this should give leave us some workers
// // unblocked to be blocked on other work, hopefully easing any outside visible
// // effects of restarts.
// //
// // 6/8 is a guess; previously we ran with unlimited 8 and more from
// // spawn_blocking.
// (total_threads * 3).checked_div(4).unwrap_or(0),
// );
// assert_ne!(permits, 0, "we will not be adding in permits later");
// assert!(
// permits < total_threads,
// "need threads avail for shorter work"
// );
// tokio::sync::Semaphore::new(permits)
// });
// this wait probably never needs any "long time spent" logging, because we already nag if
// compaction task goes over it's period (20s) which is quite often in production.
let _permit = tokio::select! {
permit = CONCURRENT_COMPACTIONS.acquire() => {
permit
},
_ = cancel.cancelled() => {
return Ok(());
}
};
// // this wait probably never needs any "long time spent" logging, because we already nag if
// // compaction task goes over it's period (20s) which is quite often in production.
// let _permit = tokio::select! {
// permit = CONCURRENT_COMPACTIONS.acquire() => {
// permit
// },
// _ = cancel.cancelled() => {
// return Ok(());
// }
// };
let last_record_lsn = self.get_last_record_lsn();

View File

@@ -0,0 +1,32 @@
import queue
import threading
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.types import TenantId
def test_pageserver_startup_many_tenants(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
# below doesn't work because summaries contain tenant and timeline ids and we check for them
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
pshttp = env.pageserver.http_client()
ep = env.endpoints.create_start("main")
ep.safe_psql("create table foo(b text)")
for i in range(0, 8):
ep.safe_psql("insert into foo(b) values ('some text')")
# pg_bin.run_capture(["pgbench", "-i", "-s1", ep.connstr()])
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
pshttp.timeline_checkpoint(tenant_id, timeline_id)
ep.stop_and_destroy()
env.pageserver.stop()
for sk in env.safekeepers:
sk.stop()
tenant_dir = env.repo_dir / "tenants" / str(env.initial_tenant)
for i in range(0, 20_000):
import shutil
shutil.copytree(tenant_dir, tenant_dir.parent / str(TenantId.generate()))