mirror of
https://github.com/neondatabase/neon.git
synced 2026-04-21 00:10:36 +00:00
Compare commits
2 Commits
problame/l
...
test-ps-ca
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f164085515 | ||
|
|
cc3acd4201 |
@@ -581,31 +581,6 @@ fn start_pageserver(
|
||||
);
|
||||
}
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::BackgroundRuntimeTurnaroundMeasure,
|
||||
None,
|
||||
None,
|
||||
"background runtime turnaround measure",
|
||||
true,
|
||||
async move {
|
||||
let server = hyper::Server::try_bind(&"0.0.0.0:2342".parse().unwrap()).expect("bind");
|
||||
let server = server
|
||||
.serve(hyper::service::make_service_fn(|_| async move {
|
||||
Ok::<_, std::convert::Infallible>(hyper::service::service_fn(
|
||||
move |_: hyper::Request<hyper::Body>| async move {
|
||||
Ok::<_, std::convert::Infallible>(hyper::Response::new(
|
||||
hyper::Body::from(format!("alive")),
|
||||
))
|
||||
},
|
||||
))
|
||||
}))
|
||||
.with_graceful_shutdown(task_mgr::shutdown_watcher());
|
||||
server.await?;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
|
||||
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
|
||||
@@ -292,8 +292,6 @@ pub enum TaskKind {
|
||||
|
||||
DebugTool,
|
||||
|
||||
BackgroundRuntimeTurnaroundMeasure,
|
||||
|
||||
#[cfg(test)]
|
||||
UnitTest,
|
||||
}
|
||||
|
||||
@@ -857,11 +857,11 @@ impl DeltaLayerInner {
|
||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
if actual_summary != expected_summary {
|
||||
// bail!(
|
||||
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
// actual_summary,
|
||||
// expected_summary
|
||||
// );
|
||||
bail!(
|
||||
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
actual_summary,
|
||||
expected_summary
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -450,11 +450,11 @@ impl ImageLayerInner {
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
|
||||
if actual_summary != expected_summary {
|
||||
// bail!(
|
||||
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
// actual_summary,
|
||||
// expected_summary
|
||||
// );
|
||||
bail!(
|
||||
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
actual_summary,
|
||||
expected_summary
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -631,38 +631,38 @@ impl Timeline {
|
||||
) -> anyhow::Result<()> {
|
||||
const ROUNDS: usize = 2;
|
||||
|
||||
// static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
// once_cell::sync::Lazy::new(|| {
|
||||
// let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
// let permits = usize::max(
|
||||
// 1,
|
||||
// // while a lot of the work is done on spawn_blocking, we still do
|
||||
// // repartitioning in the async context. this should give leave us some workers
|
||||
// // unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// // effects of restarts.
|
||||
// //
|
||||
// // 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// // spawn_blocking.
|
||||
// (total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
// );
|
||||
// assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
// assert!(
|
||||
// permits < total_threads,
|
||||
// "need threads avail for shorter work"
|
||||
// );
|
||||
// tokio::sync::Semaphore::new(permits)
|
||||
// });
|
||||
static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
|
||||
once_cell::sync::Lazy::new(|| {
|
||||
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
|
||||
let permits = usize::max(
|
||||
1,
|
||||
// while a lot of the work is done on spawn_blocking, we still do
|
||||
// repartitioning in the async context. this should give leave us some workers
|
||||
// unblocked to be blocked on other work, hopefully easing any outside visible
|
||||
// effects of restarts.
|
||||
//
|
||||
// 6/8 is a guess; previously we ran with unlimited 8 and more from
|
||||
// spawn_blocking.
|
||||
(total_threads * 3).checked_div(4).unwrap_or(0),
|
||||
);
|
||||
assert_ne!(permits, 0, "we will not be adding in permits later");
|
||||
assert!(
|
||||
permits < total_threads,
|
||||
"need threads avail for shorter work"
|
||||
);
|
||||
tokio::sync::Semaphore::new(permits)
|
||||
});
|
||||
|
||||
// // this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// // compaction task goes over it's period (20s) which is quite often in production.
|
||||
// let _permit = tokio::select! {
|
||||
// permit = CONCURRENT_COMPACTIONS.acquire() => {
|
||||
// permit
|
||||
// },
|
||||
// _ = cancel.cancelled() => {
|
||||
// return Ok(());
|
||||
// }
|
||||
// };
|
||||
// this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// compaction task goes over it's period (20s) which is quite often in production.
|
||||
let _permit = tokio::select! {
|
||||
permit = CONCURRENT_COMPACTIONS.acquire() => {
|
||||
permit
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
|
||||
|
||||
63
test_runner/fixtures/sk_utils.py
Normal file
63
test_runner/fixtures/sk_utils.py
Normal file
@@ -0,0 +1,63 @@
|
||||
# safekeeper utils
|
||||
|
||||
import os
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Safekeeper, get_dir_size
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
|
||||
# my grammar guard startles at the name, but let's consider it is a shortening from "is it true that ...."
|
||||
def is_segs_not_exist(segs, http_cli, tenant_id, timeline_id):
|
||||
segs_existense = [f"{f}: {os.path.exists(f)}" for f in segs]
|
||||
log.info(
|
||||
f"waiting for segments removal, sk info: {http_cli.timeline_status(tenant_id=tenant_id, timeline_id=timeline_id)}, segments_existence: {segs_existense}"
|
||||
)
|
||||
return all(not os.path.exists(p) for p in segs)
|
||||
|
||||
|
||||
def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
|
||||
http_cli = sk.http_client()
|
||||
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"sk status is {tli_status}")
|
||||
return tli_status.flush_lsn >= lsn
|
||||
|
||||
|
||||
def is_commit_lsn_equals_flush_lsn(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
http_cli = sk.http_client()
|
||||
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"sk status is {tli_status}")
|
||||
return tli_status.flush_lsn == tli_status.commit_lsn
|
||||
|
||||
|
||||
def is_segment_offloaded(
|
||||
sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, seg_end: Lsn
|
||||
):
|
||||
http_cli = sk.http_client()
|
||||
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"sk status is {tli_status}")
|
||||
return tli_status.backup_lsn >= seg_end
|
||||
|
||||
|
||||
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
|
||||
http_cli = sk.http_client()
|
||||
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id)))
|
||||
sk_wal_size_mb = sk_wal_size / 1024 / 1024
|
||||
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
|
||||
return sk_wal_size_mb <= target_size_mb
|
||||
|
||||
|
||||
# Wait for something, defined as f() returning True, raising error if this
|
||||
# doesn't happen without timeout seconds.
|
||||
# TODO: unite with wait_until preserving good logs
|
||||
def wait(f, desc, timeout=30):
|
||||
started_at = time.time()
|
||||
while True:
|
||||
if f():
|
||||
break
|
||||
elapsed = time.time() - started_at
|
||||
if elapsed > timeout:
|
||||
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for {desc}")
|
||||
time.sleep(0.5)
|
||||
@@ -1,32 +0,0 @@
|
||||
import queue
|
||||
import threading
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
|
||||
from fixtures.types import TenantId
|
||||
|
||||
|
||||
def test_pageserver_startup_many_tenants(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# below doesn't work because summaries contain tenant and timeline ids and we check for them
|
||||
|
||||
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
|
||||
pshttp = env.pageserver.http_client()
|
||||
ep = env.endpoints.create_start("main")
|
||||
ep.safe_psql("create table foo(b text)")
|
||||
for i in range(0, 8):
|
||||
ep.safe_psql("insert into foo(b) values ('some text')")
|
||||
# pg_bin.run_capture(["pgbench", "-i", "-s1", ep.connstr()])
|
||||
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
|
||||
pshttp.timeline_checkpoint(tenant_id, timeline_id)
|
||||
ep.stop_and_destroy()
|
||||
|
||||
env.pageserver.stop()
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
|
||||
tenant_dir = env.repo_dir / "tenants" / str(env.initial_tenant)
|
||||
|
||||
for i in range(0, 20_000):
|
||||
import shutil
|
||||
|
||||
shutil.copytree(tenant_dir, tenant_dir.parent / str(TenantId.generate()))
|
||||
@@ -1,4 +1,8 @@
|
||||
import time
|
||||
from functools import partial
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.sk_utils import is_commit_lsn_equals_flush_lsn, wait
|
||||
|
||||
|
||||
# Test safekeeper sync and pageserver catch up
|
||||
@@ -9,7 +13,9 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_pageserver_catchup_while_compute_down")
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
timeline_id = env.neon_cli.create_branch("test_pageserver_catchup_while_compute_down")
|
||||
# Make shared_buffers large to ensure we won't query pageserver while it is down.
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_pageserver_catchup_while_compute_down", config_lines=["shared_buffers=512MB"]
|
||||
@@ -45,12 +51,26 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
|
||||
FROM generate_series(1, 10000) g
|
||||
"""
|
||||
)
|
||||
endpoint.stop()
|
||||
|
||||
# wait until safekeepers catch up. This forces/tests fast path which avoids
|
||||
# sync-safekeepers on next compute start.
|
||||
for sk in env.safekeepers:
|
||||
wait(
|
||||
partial(is_commit_lsn_equals_flush_lsn, sk, tenant_id, timeline_id),
|
||||
"commit_lsn to reach flush_lsn",
|
||||
)
|
||||
|
||||
# stop safekeepers gracefully
|
||||
env.safekeepers[0].stop()
|
||||
env.safekeepers[1].stop()
|
||||
env.safekeepers[2].stop()
|
||||
|
||||
# Wait until in flight messages to broker arrive so pageserver won't know
|
||||
# where to connect if timeline is not activated on safekeeper after restart
|
||||
# -- we had such a bug once.
|
||||
time.sleep(1)
|
||||
|
||||
# start everything again
|
||||
# safekeepers must synchronize and pageserver must catch up
|
||||
env.pageserver.start()
|
||||
@@ -59,7 +79,7 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
|
||||
env.safekeepers[2].start()
|
||||
|
||||
# restart compute node
|
||||
endpoint.stop_and_destroy().create_start("test_pageserver_catchup_while_compute_down")
|
||||
endpoint = env.endpoints.create_start("test_pageserver_catchup_while_compute_down")
|
||||
|
||||
# Ensure that basebackup went correct and pageserver returned all data
|
||||
pg_conn = endpoint.connect()
|
||||
|
||||
@@ -40,6 +40,13 @@ from fixtures.remote_storage import (
|
||||
RemoteStorageUsers,
|
||||
available_remote_storages,
|
||||
)
|
||||
from fixtures.sk_utils import (
|
||||
is_flush_lsn_caught_up,
|
||||
is_segment_offloaded,
|
||||
is_segs_not_exist,
|
||||
is_wal_trimmed,
|
||||
wait,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import get_dir_size, query_scalar, start_in_background
|
||||
|
||||
@@ -385,54 +392,11 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
|
||||
# wait till first segment is removed on all safekeepers
|
||||
wait(
|
||||
lambda first_segments=first_segments: all(not os.path.exists(p) for p in first_segments),
|
||||
partial(is_segs_not_exist, first_segments, http_cli, tenant_id, timeline_id),
|
||||
"first segment get removed",
|
||||
wait_f=lambda http_cli=http_cli, tenant_id=tenant_id, timeline_id=timeline_id: log.info(
|
||||
f"waiting for segments removal, sk info: {http_cli.timeline_status(tenant_id=tenant_id, timeline_id=timeline_id)}"
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
# Wait for something, defined as f() returning True, raising error if this
|
||||
# doesn't happen without timeout seconds, and calling wait_f while waiting.
|
||||
def wait(f, desc, timeout=30, wait_f=None):
|
||||
started_at = time.time()
|
||||
while True:
|
||||
if f():
|
||||
break
|
||||
elapsed = time.time() - started_at
|
||||
if elapsed > timeout:
|
||||
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for {desc}")
|
||||
time.sleep(0.5)
|
||||
if wait_f is not None:
|
||||
wait_f()
|
||||
|
||||
|
||||
def is_segment_offloaded(
|
||||
sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, seg_end: Lsn
|
||||
):
|
||||
http_cli = sk.http_client()
|
||||
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"sk status is {tli_status}")
|
||||
return tli_status.backup_lsn >= seg_end
|
||||
|
||||
|
||||
def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
|
||||
http_cli = sk.http_client()
|
||||
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
log.info(f"sk status is {tli_status}")
|
||||
return tli_status.flush_lsn >= lsn
|
||||
|
||||
|
||||
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
|
||||
http_cli = sk.http_client()
|
||||
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id)))
|
||||
sk_wal_size_mb = sk_wal_size / 1024 / 1024
|
||||
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
|
||||
return sk_wal_size_mb <= target_size_mb
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
||||
def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
|
||||
Reference in New Issue
Block a user