Move testing pageserver libpq cmds to HTTP api (#2429)

Closes #2422.

The APIs have been feature gated with the `testing_api!` macro so that
they return 400s when support hasn't been compiled in.
This commit is contained in:
sharnoff
2022-09-20 11:28:12 -07:00
committed by GitHub
parent 4b25b9652a
commit 4a3b3ff11d
29 changed files with 352 additions and 227 deletions

View File

@@ -13,4 +13,4 @@ opt-level = 3
opt-level = 1
[alias]
build_testing = ["build", "--features", "failpoints"]
build_testing = ["build", "--features", "testing"]

View File

@@ -100,11 +100,11 @@ jobs:
run: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_FEATURES="--features failpoints"
CARGO_FEATURES="--features testing"
CARGO_FLAGS="--locked --timings $CARGO_FEATURES"
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=""
CARGO_FEATURES="--features failpoints,profiling"
CARGO_FEATURES="--features testing,profiling"
CARGO_FLAGS="--locked --timings --release $CARGO_FEATURES"
fi
echo "cov_prefix=${cov_prefix}" >> $GITHUB_ENV

View File

@@ -222,7 +222,12 @@ Ensure your dependencies are installed as described [here](https://github.com/ne
```sh
git clone --recursive https://github.com/neondatabase/neon.git
make # builds also postgres and installs it to ./pg_install
# either:
CARGO_BUILD_FLAGS="--features=testing" make
# or:
make debug
./scripts/pytest
```

View File

@@ -5,10 +5,10 @@ edition = "2021"
[features]
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
# Feature that enables a special API, fail_point! macro (adds some runtime cost)
# to run tests on outage conditions
failpoints = ["fail/failpoints"]
profiling = ["pprof"]
[dependencies]

View File

@@ -87,8 +87,8 @@ fn main() -> anyhow::Result<()> {
if arg_matches.is_present("enabled-features") {
let features: &[&str] = &[
#[cfg(feature = "failpoints")]
"failpoints",
#[cfg(feature = "testing")]
"testing",
#[cfg(feature = "profiling")]
"profiling",
];

View File

@@ -160,3 +160,21 @@ pub struct TimelineInfo {
pub local: Option<LocalTimelineInfo>,
pub remote: Option<RemoteTimelineInfo>,
}
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
/// Information for configuring a single fail point
#[derive(Debug, Serialize, Deserialize)]
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
pub actions: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TimelineGcRequest {
pub gc_horizon: Option<u64>,
}

View File

@@ -29,6 +29,12 @@ use utils::{
lsn::Lsn,
};
// Imports only used for testing APIs
#[cfg(feature = "testing")]
use super::models::{ConfigureFailpointsRequest, TimelineGcRequest};
#[cfg(feature = "testing")]
use crate::CheckpointConfig;
struct State {
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
@@ -661,6 +667,103 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
json_response(StatusCode::OK, ())
}
#[cfg(any(feature = "testing", feature = "failpoints"))]
async fn failpoints_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(
"Cannot manage failpoints because pageserver was compiled without failpoints support"
.to_owned(),
));
}
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
for fp in failpoints {
info!("cfg failpoint: {} {}", fp.name, fp.actions);
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = if fp.actions == "exit" {
fail::cfg_callback(fp.name, || {
info!("Exit requested by failpoint");
std::process::exit(1);
})
} else {
fail::cfg(fp.name, &fp.actions)
};
if let Err(err_msg) = cfg_result {
return Err(ApiError::BadRequest(format!(
"Failed to configure failpoints: {err_msg}"
)));
}
}
json_response(StatusCode::OK, ())
}
// Run GC immediately on given timeline.
// FIXME: This is just for tests. See test_runner/regress/test_gc.py.
// This probably should require special authentication or a global flag to
// enable, I don't think we want to or need to allow regular clients to invoke
// GC.
// @hllinnaka in commits ec44f4b29, 3aca717f3
#[cfg(feature = "testing")]
async fn timeline_gc_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
// FIXME: currently this will return a 500 error on bad tenant id; it should be 4XX
let repo = tenant_mgr::get_tenant(tenant_id, false)?;
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
let _span_guard =
info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id).entered();
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| repo.get_gc_horizon());
// Use tenant's pitr setting
let pitr = repo.get_pitr_interval();
let result = repo.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)?;
json_response(StatusCode::OK, result)
}
// Run compaction immediately on given timeline.
// FIXME This is just for tests. Don't expect this to be exposed to
// the users or the api.
// @dhammika in commit a0781f229
#[cfg(feature = "testing")]
async fn timeline_compact_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let repo = tenant_mgr::get_tenant(tenant_id, true)?;
// FIXME: currently this will return a 500 error on bad timeline id; it should be 4XX
let timeline = repo.get_timeline(timeline_id).with_context(|| {
format!("No timeline {timeline_id} in repository for tenant {tenant_id}")
})?;
timeline.compact()?;
json_response(StatusCode::OK, ())
}
// Run checkpoint immediately on given timeline.
#[cfg(feature = "testing")]
async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let repo = tenant_mgr::get_tenant(tenant_id, true)?;
// FIXME: currently this will return a 500 error on bad timeline id; it should be 4XX
let timeline = repo.get_timeline(timeline_id).with_context(|| {
format!("No timeline {timeline_id} in repository for tenant {tenant_id}")
})?;
timeline.checkpoint(CheckpointConfig::Forced)?;
json_response(StatusCode::OK, ())
}
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(
StatusCode::NOT_FOUND,
@@ -687,12 +790,38 @@ pub fn make_router(
}))
}
macro_rules! testing_api {
($handler_desc:literal, $handler:path $(,)?) => {{
#[cfg(not(feature = "testing"))]
async fn cfg_disabled(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
Err(ApiError::BadRequest(
concat!(
"Cannot ",
$handler_desc,
" because pageserver was compiled without testing APIs",
)
.to_owned(),
))
}
#[cfg(feature = "testing")]
let handler = $handler;
#[cfg(not(feature = "testing"))]
let handler = cfg_disabled;
handler
}};
}
Ok(router
.data(Arc::new(
State::new(conf, auth, remote_index, remote_storage)
.context("Failed to initialize router state")?,
))
.get("/v1/status", status_handler)
.put(
"/v1/failpoints",
testing_api!("manage failpoints", failpoints_handler),
)
.get("/v1/tenant", tenant_list_handler)
.post("/v1/tenant", tenant_create_handler)
.get("/v1/tenant/:tenant_id", tenant_status)
@@ -705,6 +834,18 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler,
)
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc",
testing_api!("run timeline GC", timeline_gc_handler),
)
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/compact",
testing_api!("run timeline compaction", timeline_compact_handler),
)
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
testing_api!("run timeline checkpoint", timeline_checkpoint_handler),
)
.delete(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_delete_handler,

View File

@@ -27,7 +27,7 @@ use utils::{
lsn::Lsn,
postgres_backend::AuthType,
postgres_backend_async::{self, PostgresBackend},
pq_proto::{BeMessage, FeMessage, RowDescriptor, SINGLE_COL_ROWDESC},
pq_proto::{BeMessage, FeMessage, RowDescriptor},
simple_rcu::RcuReadGuard,
};
@@ -1005,31 +1005,6 @@ impl postgres_backend_async::Handler for PageServerHandler {
// important because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("failpoints ") {
ensure!(fail::has_failpoints(), "Cannot manage failpoints because pageserver was compiled without failpoints support");
let (_, failpoints) = query_string.split_at("failpoints ".len());
for failpoint in failpoints.split(';') {
if let Some((name, actions)) = failpoint.split_once('=') {
info!("cfg failpoint: {} {}", name, actions);
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
if actions == "exit" {
fail::cfg_callback(name, || {
info!("Exit requested by failpoint");
std::process::exit(1);
})
.unwrap();
} else {
fail::cfg(name, actions).unwrap();
}
} else {
bail!("Invalid failpoints format");
}
}
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("show ") {
// show <tenant_id>
let (_, params_raw) = query_string.split_at("show ".len());
@@ -1072,94 +1047,6 @@ impl postgres_backend_async::Handler for PageServerHandler {
Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("do_gc ") {
// Run GC immediately on given timeline.
// FIXME: This is just for tests. See test_runner/regress/test_gc.py.
// This probably should require special authentication or a global flag to
// enable, I don't think we want to or need to allow regular clients to invoke
// GC.
// do_gc <tenant_id> <timeline_id> <gc_horizon>
let re = Regex::new(r"^do_gc ([[:xdigit:]]+)\s([[:xdigit:]]+)($|\s)([[:digit:]]+)?")
.unwrap();
let caps = re
.captures(query_string)
.with_context(|| format!("invalid do_gc: '{}'", query_string))?;
let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
let _span_guard =
info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id).entered();
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
let gc_horizon: u64 = caps
.get(4)
.map(|h| h.as_str().parse())
.unwrap_or_else(|| Ok(tenant.get_gc_horizon()))?;
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
let result = tenant.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)?;
pgb.write_message(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layers_total"),
RowDescriptor::int8_col(b"layers_needed_by_cutoff"),
RowDescriptor::int8_col(b"layers_needed_by_pitr"),
RowDescriptor::int8_col(b"layers_needed_by_branches"),
RowDescriptor::int8_col(b"layers_not_updated"),
RowDescriptor::int8_col(b"layers_removed"),
RowDescriptor::int8_col(b"elapsed"),
]))?
.write_message(&BeMessage::DataRow(&[
Some(result.layers_total.to_string().as_bytes()),
Some(result.layers_needed_by_cutoff.to_string().as_bytes()),
Some(result.layers_needed_by_pitr.to_string().as_bytes()),
Some(result.layers_needed_by_branches.to_string().as_bytes()),
Some(result.layers_not_updated.to_string().as_bytes()),
Some(result.layers_removed.to_string().as_bytes()),
Some(result.elapsed.as_millis().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("compact ") {
// Run compaction immediately on given timeline.
// FIXME This is just for tests. Don't expect this to be exposed to
// the users or the api.
// compact <tenant_id> <timeline_id>
let re = Regex::new(r"^compact ([[:xdigit:]]+)\s([[:xdigit:]]+)($|\s)?").unwrap();
let caps = re
.captures(query_string)
.with_context(|| format!("Invalid compact: '{}'", query_string))?;
let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
let timeline = get_local_timeline(tenant_id, timeline_id)?;
timeline.compact()?;
pgb.write_message(&SINGLE_COL_ROWDESC)?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("checkpoint ") {
// Run checkpoint immediately on given timeline.
// checkpoint <tenant_id> <timeline_id>
let re = Regex::new(r"^checkpoint ([[:xdigit:]]+)\s([[:xdigit:]]+)($|\s)?").unwrap();
let caps = re
.captures(query_string)
.with_context(|| format!("invalid checkpoint command: '{}'", query_string))?;
let tenant_id = TenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = TimelineId::from_str(caps.get(2).unwrap().as_str())?;
let timeline = get_local_timeline(tenant_id, timeline_id)?;
// Checkpoint the timeline and also compact it (due to `CheckpointConfig::Forced`).
timeline.checkpoint(CheckpointConfig::Forced)?;
pgb.write_message(&SINGLE_COL_ROWDESC)?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("get_lsn_by_timestamp ") {
// Locate LSN of last transaction with timestamp less or equal than sppecified
// TODO lazy static

View File

@@ -176,7 +176,7 @@ impl Value {
///
/// Result of performing GC
///
#[derive(Default)]
#[derive(Default, Serialize)]
pub struct GcResult {
pub layers_total: u64,
pub layers_needed_by_cutoff: u64,
@@ -185,9 +185,18 @@ pub struct GcResult {
pub layers_not_updated: u64,
pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
#[serde(serialize_with = "serialize_duration_as_millis")]
pub elapsed: Duration,
}
// helper function for `GcResult`, serializing a `Duration` as an integer number of milliseconds
fn serialize_duration_as_millis<S>(d: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
d.as_millis().serialize(serializer)
}
impl AddAssign for GcResult {
fn add_assign(&mut self, other: Self) {
self.layers_total += other.layers_total;

View File

@@ -6,9 +6,9 @@ Prerequisites:
- Correctly configured Python, see [`/docs/sourcetree.md`](/docs/sourcetree.md#using-python)
- Neon and Postgres binaries
- See the root [README.md](/README.md) for build directions
If you want to test tests with failpoints, you would need to add `--features failpoints` to Rust code build commands.
If you want to test tests with test-only APIs, you would need to add `--features testing` to Rust code build commands.
For convenience, repository cargo config contains `build_testing` alias, that serves as a subcommand, adding the required feature flags.
Usage example: `cargo build_testing --release` is equivalent to `cargo build --features failpoints --release`
Usage example: `cargo build_testing --release` is equivalent to `cargo build --features testing --release`
- Tests can be run from the git tree; or see the environment variables
below to run from other directories.
- The neon git repo, including the postgres submodule

View File

@@ -964,6 +964,24 @@ class NeonPageserverHttpClient(requests.Session):
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]) -> None:
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
return res_json
def tenant_list(self) -> List[Dict[Any, Any]]:
res = self.get(f"http://localhost:{self.port}/v1/tenant")
self.verbose_error(res)
@@ -1061,6 +1079,45 @@ class NeonPageserverHttpClient(requests.Session):
assert res_json is None
return res_json
def timeline_gc(
self, tenant_id: TenantId, timeline_id: TimelineId, gc_horizon: Optional[int]
) -> dict[str, Any]:
log.info(
f"Requesting GC: tenant {tenant_id}, timeline {timeline_id}, gc_horizon {repr(gc_horizon)}"
)
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/do_gc",
json={"gc_horizon": gc_horizon},
)
log.info(f"Got GC request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is not None
assert isinstance(res_json, dict)
return res_json
def timeline_compact(self, tenant_id: TenantId, timeline_id: TimelineId):
log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact"
)
log.info(f"Got compact request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
return res_json
def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint"
)
log.info(f"Got checkpoint request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
return res_json
def get_metrics(self) -> str:
res = self.get(f"http://localhost:{self.port}/metrics")
self.verbose_error(res)

View File

@@ -9,6 +9,7 @@ from fixtures.utils import query_scalar
#
def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
# Override defaults, 1M gc_horizon and 4M checkpoint_distance.
# Extend compaction_period and gc_period to disable background compaction and gc.
@@ -23,7 +24,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
}
)
env.pageserver.safe_psql("failpoints flush-frozen-before-sync=sleep(10000)")
pageserver_http.configure_failpoints(("flush-frozen-before-sync", "sleep(10000)"))
pg_branch0 = env.postgres.create_start("main", tenant_id=tenant)
branch0_cur = pg_branch0.connect().cursor()
@@ -92,9 +93,9 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
log.info(f"LSN after 300k rows: {lsn_300}")
# Run compaction on branch1.
compact = f"compact {tenant} {branch1_timeline} {lsn_200}"
compact = f"compact {tenant} {branch1_timeline}"
log.info(compact)
env.pageserver.safe_psql(compact)
pageserver_http.timeline_compact(tenant, branch1_timeline)
assert query_scalar(branch0_cur, "SELECT count(*) FROM foo") == 100000

View File

@@ -9,9 +9,10 @@ from fixtures.neon_fixtures import NeonEnv
def test_basebackup_error(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_basebackup_error", "empty")
pageserver_http = env.pageserver.http_client()
# Introduce failpoint
env.pageserver.safe_psql("failpoints basebackup-before-control-file=return")
pageserver_http.configure_failpoints(("basebackup-before-control-file", "return"))
with pytest.raises(Exception, match="basebackup-before-control-file"):
env.postgres.create_start("test_basebackup_error")

View File

@@ -47,6 +47,7 @@ from fixtures.utils import query_scalar
# could not find data for key ... at LSN ..., for request at LSN ...
def test_branch_and_gc(neon_simple_env: NeonEnv):
env = neon_simple_env
pageserver_http_client = env.pageserver.http_client()
tenant, _ = env.neon_cli.create_tenant(
conf={
@@ -84,7 +85,7 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
# Set the GC horizon so that lsn1 is inside the horizon, which means
# we can create a new branch starting from lsn1.
env.pageserver.safe_psql(f"do_gc {tenant} {timeline_main} {lsn2 - lsn1 + 1024}")
pageserver_http_client.timeline_gc(tenant, timeline_main, lsn2 - lsn1 + 1024)
env.neon_cli.create_branch(
"test_branch", "test_main", tenant_id=tenant, ancestor_start_lsn=lsn1
@@ -113,6 +114,8 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
# For more details, see discussion in https://github.com/neondatabase/neon/pull/2101#issuecomment-1185273447.
def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
env = neon_simple_env
pageserver_http_client = env.pageserver.http_client()
# Disable background GC but set the `pitr_interval` to be small, so GC can delete something
tenant, _ = env.neon_cli.create_tenant(
conf={
@@ -147,10 +150,10 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
# Use `failpoint=sleep` and `threading` to make the GC iteration triggers *before* the
# branch creation task but the individual timeline GC iteration happens *after*
# the branch creation task.
env.pageserver.safe_psql("failpoints before-timeline-gc=sleep(2000)")
pageserver_http_client.configure_failpoints(("before-timeline-gc", "sleep(2000)"))
def do_gc():
env.pageserver.safe_psql(f"do_gc {tenant} {b0} 0")
pageserver_http_client.timeline_gc(tenant, b0, 0)
thread = threading.Thread(target=do_gc, daemon=True)
thread.start()
@@ -161,7 +164,7 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
time.sleep(1.0)
# The starting LSN is invalid as the corresponding record is scheduled to be removed by in-queue GC.
with pytest.raises(Exception, match="invalid branch start lsn"):
with pytest.raises(Exception, match="invalid branch start lsn: .*"):
env.neon_cli.create_branch("b1", "b0", tenant_id=tenant, ancestor_start_lsn=lsn)
thread.join()

View File

@@ -1,4 +1,3 @@
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@@ -96,7 +95,7 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
assert pg.safe_psql("SELECT 1")[0][0] == 1
# branch at pre-initdb lsn
with pytest.raises(Exception, match="invalid branch start lsn"):
with pytest.raises(Exception, match="invalid branch start lsn: .*"):
env.neon_cli.create_branch("test_branch_preinitdb", ancestor_start_lsn=Lsn("0/42"))
# branch at pre-ancestor lsn
@@ -106,13 +105,11 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
)
# check that we cannot create branch based on garbage collected data
with env.pageserver.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
# call gc to advace latest_gc_cutoff_lsn
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
with env.pageserver.http_client() as pageserver_http:
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
print_gc_result(gc_result)
with pytest.raises(Exception, match="invalid branch start lsn"):
with pytest.raises(Exception, match="invalid branch start lsn: .*"):
# this gced_lsn is pretty random, so if gc is disabled this woudln't fail
env.neon_cli.create_branch(
"test_branch_create_fail", "test_branch_behind", ancestor_start_lsn=gced_lsn

View File

@@ -113,13 +113,14 @@ def test_create_multiple_timelines_parallel(neon_simple_env: NeonEnv):
def test_fix_broken_timelines_on_startup(neon_simple_env: NeonEnv):
env = neon_simple_env
pageserver_http = env.pageserver.http_client()
tenant_id, _ = env.neon_cli.create_tenant()
old_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
# Introduce failpoint when creating a new timeline
env.pageserver.safe_psql("failpoints before-checkpoint-new-timeline=return")
pageserver_http.configure_failpoints(("before-checkpoint-new-timeline", "return"))
with pytest.raises(Exception, match="before-checkpoint-new-timeline"):
_ = env.neon_cli.create_timeline("test_fix_broken_timelines", tenant_id)

View File

@@ -1,4 +1,5 @@
import asyncio
import concurrent.futures
import random
from fixtures.log_helper import log
@@ -30,10 +31,15 @@ async def update_table(pg: Postgres):
# Perform aggressive GC with 0 horizon
async def gc(env: NeonEnv, timeline: TimelineId):
psconn = await env.pageserver.connect_async()
pageserver_http = env.pageserver.http_client()
while updates_performed < updates_to_perform:
await psconn.execute(f"do_gc {env.initial_tenant} {timeline} 0")
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
while updates_performed < updates_to_perform:
await loop.run_in_executor(
pool, lambda: pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
)
# At the same time, run UPDATEs and GC

View File

@@ -270,8 +270,7 @@ def _import(
assert os.path.getsize(tar_output_file) == os.path.getsize(new_tar_output_file)
# Check that gc works
psconn = env.pageserver.connect()
pscur = psconn.cursor()
pscur.execute(f"do_gc {tenant} {timeline} 0")
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_gc(tenant, timeline, 0)
return tar_output_file

View File

@@ -1,4 +1,3 @@
import psycopg2.extras
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.types import TimelineId
@@ -29,8 +28,7 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
# Get the timeline ID of our branch. We need it for the 'do_gc' command
timeline = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
psconn = env.pageserver.connect()
pscur = psconn.cursor(cursor_factory=psycopg2.extras.DictCursor)
pageserver_http = env.pageserver.http_client()
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers.
@@ -61,9 +59,8 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
# Make a lot of updates on a single row, generating a lot of WAL. Trigger
# garbage collections so that the page server will remove old page versions.
for i in range(10):
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
gcrow = pscur.fetchone()
print_gc_result(gcrow)
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
print_gc_result(gc_result)
for j in range(100):
cur.execute("UPDATE foo SET val = val + 1 WHERE id = 1;")

View File

@@ -1,6 +1,3 @@
from contextlib import closing
import psycopg2.extras
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.types import TimelineId
@@ -54,13 +51,11 @@ def test_pitr_gc(neon_env_builder: NeonEnvBuilder):
log.info(f"LSN after 10000 rows: {debug_lsn} xid {debug_xid}")
# run GC
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
pscur.execute(f"compact {env.initial_tenant} {timeline}")
# perform aggressive GC. Data still should be kept because of the PITR setting.
pscur.execute(f"do_gc {env.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
with env.pageserver.http_client() as pageserver_http:
pageserver_http.timeline_compact(env.initial_tenant, timeline)
# perform aggressive GC. Data still should be kept because of the PITR setting.
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
print_gc_result(gc_result)
# Branch at the point where only 100 rows were inserted
# It must have been preserved by PITR setting

View File

@@ -106,6 +106,7 @@ def test_readonly_node(neon_simple_env: NeonEnv):
# Similar test, but with more data, and we force checkpoints
def test_timetravel(neon_simple_env: NeonEnv):
env = neon_simple_env
pageserver_http_client = env.pageserver.http_client()
env.neon_cli.create_branch("test_timetravel", "empty")
pg = env.postgres.create_start("test_timetravel")
@@ -136,7 +137,7 @@ def test_timetravel(neon_simple_env: NeonEnv):
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# run checkpoint manually to force a new layer file
env.pageserver.safe_psql(f"checkpoint {tenant_id} {timeline_id}")
pageserver_http_client.timeline_checkpoint(tenant_id, timeline_id)
##### Restart pageserver
env.postgres.stop_all()

View File

@@ -1,7 +1,6 @@
import time
from contextlib import closing
import psycopg2.extras
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@@ -19,8 +18,8 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
f = env.neon_cli.pageserver_enabled_features()
assert (
"failpoints" in f["features"]
), "Build pageserver with --features=failpoints option to run this test"
"testing" in f["features"]
), "Build pageserver with --features=testing option to run this test"
neon_env_builder.start()
# Create a branch for us
@@ -31,26 +30,28 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
# Create and initialize test table
cur.execute("CREATE TABLE foo(x bigint)")
cur.execute("INSERT INTO foo VALUES (generate_series(1,100000))")
with env.pageserver.http_client() as pageserver_http:
# Create and initialize test table
cur.execute("CREATE TABLE foo(x bigint)")
cur.execute("INSERT INTO foo VALUES (generate_series(1,100000))")
# Sleep for some time to let checkpoint create image layers
time.sleep(2)
# Sleep for some time to let checkpoint create image layers
time.sleep(2)
# Configure failpoints
pscur.execute(
"failpoints flush-frozen-before-sync=sleep(2000);checkpoint-after-sync=exit"
)
# Configure failpoints
pageserver_http.configure_failpoints(
[
("flush-frozen-before-sync", "sleep(2000)"),
("checkpoint-after-sync", "exit"),
]
)
# Do some updates until pageserver is crashed
try:
while True:
cur.execute("update foo set x=x+1")
except Exception as err:
log.info(f"Expected server crash {err}")
# Do some updates until pageserver is crashed
try:
while True:
cur.execute("update foo set x=x+1")
except Exception as err:
log.info(f"Expected server crash {err}")
log.info("Wait before server restart")
env.pageserver.stop()

View File

@@ -57,6 +57,7 @@ def test_remote_storage_backup_and_restore(
##### First start, insert secret data and upload it to the remote storage
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
pg = env.postgres.create_start("main")
client = env.pageserver.http_client()
@@ -80,7 +81,7 @@ def test_remote_storage_backup_and_restore(
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
env.pageserver.safe_psql(f"checkpoint {tenant_id} {timeline_id}")
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
log.info(f"waiting for checkpoint {checkpoint_number} upload")
# wait until pageserver successfully uploaded a checkpoint to remote storage
@@ -99,7 +100,7 @@ def test_remote_storage_backup_and_restore(
env.pageserver.start()
# Introduce failpoint in download
env.pageserver.safe_psql("failpoints remote-storage-download-pre-rename=return")
pageserver_http.configure_failpoints(("remote-storage-download-pre-rename", "return"))
client.tenant_attach(tenant_id)

View File

@@ -1,16 +1,21 @@
from threading import Thread
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverApiException
from fixtures.neon_fixtures import (
NeonEnvBuilder,
NeonPageserverApiException,
NeonPageserverHttpClient,
)
from fixtures.types import TenantId, TimelineId
def do_gc_target(env: NeonEnv, tenant_id: TenantId, timeline_id: TimelineId):
def do_gc_target(
pageserver_http: NeonPageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
):
"""Hack to unblock main, see https://github.com/neondatabase/neon/issues/2211"""
try:
env.pageserver.safe_psql(f"do_gc {tenant_id} {timeline_id} 0")
pageserver_http.timeline_gc(tenant_id, timeline_id, 0)
except Exception as e:
log.error("do_gc failed: %s", e)
@@ -44,13 +49,13 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
# gc should not try to even start
with pytest.raises(
expected_exception=psycopg2.DatabaseError, match="gc target timeline does not exist"
expected_exception=NeonPageserverApiException, match="gc target timeline does not exist"
):
bogus_timeline_id = TimelineId.generate()
env.pageserver.safe_psql(f"do_gc {tenant_id} {bogus_timeline_id} 0")
pageserver_http.timeline_gc(tenant_id, bogus_timeline_id, 0)
# try to concurrently run gc and detach
gc_thread = Thread(target=lambda: do_gc_target(env, tenant_id, timeline_id))
gc_thread = Thread(target=lambda: do_gc_target(pageserver_http, tenant_id, timeline_id))
gc_thread.start()
last_error = None
@@ -73,6 +78,6 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
assert not (env.repo_dir / "tenants" / str(tenant_id)).exists()
with pytest.raises(
expected_exception=psycopg2.DatabaseError, match=f"Tenant {tenant_id} not found"
expected_exception=NeonPageserverApiException, match=f"Tenant {tenant_id} not found"
):
env.pageserver.safe_psql(f"do_gc {tenant_id} {timeline_id} 0")
pageserver_http.timeline_gc(tenant_id, timeline_id, 0)

View File

@@ -147,14 +147,13 @@ def populate_branch(
def ensure_checkpoint(
pageserver_cur,
pageserver_http: NeonPageserverHttpClient,
tenant_id: TenantId,
timeline_id: TimelineId,
current_lsn: Lsn,
):
# run checkpoint manually to be sure that data landed in remote storage
pageserver_cur.execute(f"checkpoint {tenant_id} {timeline_id}")
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
# wait until pageserver successfully uploaded a checkpoint to remote storage
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
@@ -324,22 +323,19 @@ def test_tenant_relocation(
# this requirement introduces a problem
# if user creates a branch during migration
# it wont appear on the new pageserver
with pg_cur(env.pageserver) as cur:
ensure_checkpoint(
cur,
pageserver_http=pageserver_http,
tenant_id=tenant_id,
timeline_id=timeline_id_main,
current_lsn=current_lsn_main,
)
ensure_checkpoint(
pageserver_http=pageserver_http,
tenant_id=tenant_id,
timeline_id=timeline_id_main,
current_lsn=current_lsn_main,
)
ensure_checkpoint(
cur,
pageserver_http=pageserver_http,
tenant_id=tenant_id,
timeline_id=timeline_id_second,
current_lsn=current_lsn_second,
)
ensure_checkpoint(
pageserver_http=pageserver_http,
tenant_id=tenant_id,
timeline_id=timeline_id_second,
current_lsn=current_lsn_second,
)
log.info("inititalizing new pageserver")
# bootstrap second pageserver

View File

@@ -19,7 +19,8 @@ def test_tenant_creation_fails(neon_simple_env: NeonEnv):
)
initial_tenant_dirs = set([d for d in tenants_dir.iterdir()])
neon_simple_env.pageserver.safe_psql("failpoints tenant-creation-before-tmp-rename=return")
pageserver_http = neon_simple_env.pageserver.http_client()
pageserver_http.configure_failpoints(("tenant-creation-before-tmp-rename", "return"))
with pytest.raises(Exception, match="tenant-creation-before-tmp-rename"):
_ = neon_simple_env.neon_cli.create_tenant()

View File

@@ -91,5 +91,5 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, current_lsn)
# run final checkpoint manually to flush all the data to remote storage
env.pageserver.safe_psql(f"checkpoint {tenant_id} {timeline_id}")
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)

View File

@@ -238,6 +238,7 @@ def test_timeline_physical_size_init(neon_simple_env: NeonEnv):
def test_timeline_physical_size_post_checkpoint(neon_simple_env: NeonEnv):
env = neon_simple_env
pageserver_http = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_physical_size_post_checkpoint")
pg = env.postgres.create_start("test_timeline_physical_size_post_checkpoint")
@@ -251,7 +252,7 @@ def test_timeline_physical_size_post_checkpoint(neon_simple_env: NeonEnv):
)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}")
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
assert_physical_size(env, env.initial_tenant, new_timeline_id)
@@ -264,6 +265,7 @@ def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder
)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_physical_size_post_compaction")
pg = env.postgres.create_start("test_timeline_physical_size_post_compaction")
@@ -278,8 +280,8 @@ def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder
)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}")
env.pageserver.safe_psql(f"compact {env.initial_tenant} {new_timeline_id}")
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
pageserver_http.timeline_compact(env.initial_tenant, new_timeline_id)
assert_physical_size(env, env.initial_tenant, new_timeline_id)
@@ -290,6 +292,7 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
neon_env_builder.pageserver_config_override = "tenant_config={checkpoint_distance=100000, compaction_period='10m', gc_period='10m', pitr_interval='1s'}"
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_physical_size_post_gc")
pg = env.postgres.create_start("test_timeline_physical_size_post_gc")
@@ -304,7 +307,7 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}")
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
pg.safe_psql(
"""
@@ -315,9 +318,9 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}")
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"do_gc {env.initial_tenant} {new_timeline_id} 0")
pageserver_http.timeline_gc(env.initial_tenant, new_timeline_id, gc_horizon=None)
assert_physical_size(env, env.initial_tenant, new_timeline_id)
@@ -326,6 +329,7 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
# Test the metrics.
def test_timeline_size_metrics(neon_simple_env: NeonEnv):
env = neon_simple_env
pageserver_http = env.pageserver.http_client()
new_timeline_id = env.neon_cli.create_branch("test_timeline_size_metrics")
pg = env.postgres.create_start("test_timeline_size_metrics")
@@ -340,7 +344,7 @@ def test_timeline_size_metrics(neon_simple_env: NeonEnv):
)
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
env.pageserver.safe_psql(f"checkpoint {env.initial_tenant} {new_timeline_id}")
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
# get the metrics and parse the metric for the current timeline's physical size
metrics = env.pageserver.http_client().get_metrics()
@@ -382,6 +386,7 @@ def test_tenant_physical_size(neon_simple_env: NeonEnv):
random.seed(100)
env = neon_simple_env
pageserver_http = env.pageserver.http_client()
client = env.pageserver.http_client()
tenant, timeline = env.neon_cli.create_tenant()
@@ -405,7 +410,7 @@ def test_tenant_physical_size(neon_simple_env: NeonEnv):
)
wait_for_last_flush_lsn(env, pg, tenant, timeline)
env.pageserver.safe_psql(f"checkpoint {tenant} {timeline}")
pageserver_http.timeline_checkpoint(tenant, timeline)
timeline_total_size += get_timeline_physical_size(timeline)

View File

@@ -59,9 +59,7 @@ def wait_lsn_force_checkpoint(
)
# force checkpoint to advance remote_consistent_lsn
with closing(ps.connect(**pageserver_conn_options)) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
ps.http_client(auth_token).timeline_checkpoint(tenant_id, timeline_id)
# ensure that remote_consistent_lsn is advanced
wait_for_upload(