mirror of
https://github.com/neondatabase/neon.git
synced 2026-07-04 04:30:38 +00:00
Compare commits
9 Commits
jcsp/issue
...
jcsp/compu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
978e55f746 | ||
|
|
090a789408 | ||
|
|
3d4fe205ba | ||
|
|
f7516df6c1 | ||
|
|
f3d7d23805 | ||
|
|
9f75da7c0a | ||
|
|
f4cc7cae14 | ||
|
|
4f57dc6cc6 | ||
|
|
dc811d1923 |
@@ -179,6 +179,12 @@ runs:
|
||||
aws s3 rm "s3://${BUCKET}/${LOCK_FILE}"
|
||||
fi
|
||||
|
||||
- name: Cache poetry deps
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Store Allure test stat in the DB (new)
|
||||
if: ${{ !cancelled() && inputs.store-test-results-into-db == 'true' }}
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
@@ -86,11 +86,10 @@ runs:
|
||||
fetch-depth: 1
|
||||
|
||||
- name: Cache poetry deps
|
||||
id: cache_poetry
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Install Python deps
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
3
.github/workflows/build_and_test.yml
vendored
3
.github/workflows/build_and_test.yml
vendored
@@ -112,11 +112,10 @@ jobs:
|
||||
fetch-depth: 1
|
||||
|
||||
- name: Cache poetry deps
|
||||
id: cache_poetry
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v1-codestyle-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
key: v2-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Install Python deps
|
||||
run: ./scripts/pysync
|
||||
|
||||
3
.github/workflows/pg_clients.yml
vendored
3
.github/workflows/pg_clients.yml
vendored
@@ -38,11 +38,10 @@ jobs:
|
||||
uses: snok/install-poetry@v1
|
||||
|
||||
- name: Cache poetry deps
|
||||
id: cache_poetry
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
|
||||
key: v2-${{ runner.os }}-python-deps-ubunutu-latest-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Install Python deps
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
23
Cargo.lock
generated
23
Cargo.lock
generated
@@ -289,6 +289,7 @@ dependencies = [
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"postgres_connection",
|
||||
"r2d2",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -1651,6 +1652,7 @@ dependencies = [
|
||||
"diesel_derives",
|
||||
"itoa",
|
||||
"pq-sys",
|
||||
"r2d2",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
@@ -4166,6 +4168,17 @@ dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "r2d2"
|
||||
version = "0.8.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
|
||||
dependencies = [
|
||||
"log",
|
||||
"parking_lot 0.12.1",
|
||||
"scheduled-thread-pool",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.7.3"
|
||||
@@ -4879,6 +4892,15 @@ dependencies = [
|
||||
"windows-sys 0.42.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scheduled-thread-pool"
|
||||
version = "0.2.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
|
||||
dependencies = [
|
||||
"parking_lot 0.12.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
@@ -6807,6 +6829,7 @@ dependencies = [
|
||||
"clap_builder",
|
||||
"crossbeam-utils",
|
||||
"diesel",
|
||||
"diesel_derives",
|
||||
"either",
|
||||
"fail",
|
||||
"futures-channel",
|
||||
|
||||
@@ -111,7 +111,7 @@ USER nonroot:nonroot
|
||||
WORKDIR /home/nonroot
|
||||
|
||||
# Python
|
||||
ENV PYTHON_VERSION=3.9.2 \
|
||||
ENV PYTHON_VERSION=3.9.18 \
|
||||
PYENV_ROOT=/home/nonroot/.pyenv \
|
||||
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
|
||||
RUN set -e \
|
||||
|
||||
@@ -207,6 +207,7 @@ fn maybe_cgexec(cmd: &str) -> Command {
|
||||
|
||||
/// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
|
||||
/// that we give to customers
|
||||
#[instrument(skip_all)]
|
||||
fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
let roles = spec
|
||||
.cluster
|
||||
|
||||
@@ -24,8 +24,9 @@ tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
diesel = { version = "2.1.4", features = ["serde_json", "postgres"] }
|
||||
diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] }
|
||||
diesel_migrations = { version = "2.1.0" }
|
||||
r2d2 = { version = "0.8.10" }
|
||||
|
||||
utils = { path = "../../libs/utils/" }
|
||||
metrics = { path = "../../libs/metrics/" }
|
||||
|
||||
@@ -170,14 +170,14 @@ impl ComputeHook {
|
||||
reconfigure_request: &ComputeHookNotifyRequest,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), NotifyError> {
|
||||
let req = client.request(Method::POST, url);
|
||||
let req = client.request(Method::PUT, url);
|
||||
let req = if let Some(value) = &self.authorization_header {
|
||||
req.header(reqwest::header::AUTHORIZATION, value)
|
||||
} else {
|
||||
req
|
||||
};
|
||||
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"Sending notify request to {} ({:?})",
|
||||
url,
|
||||
reconfigure_request
|
||||
|
||||
@@ -34,9 +34,9 @@ struct Cli {
|
||||
#[arg(short, long)]
|
||||
listen: std::net::SocketAddr,
|
||||
|
||||
/// Path to public key for JWT authentication of clients
|
||||
/// Public key for JWT authentication of clients
|
||||
#[arg(long)]
|
||||
public_key: Option<camino::Utf8PathBuf>,
|
||||
public_key: Option<String>,
|
||||
|
||||
/// Token for authenticating this service with the pageservers it controls
|
||||
#[arg(long)]
|
||||
@@ -159,7 +159,7 @@ impl Secrets {
|
||||
fn load_cli(database_url: &str, args: &Cli) -> anyhow::Result<Self> {
|
||||
let public_key = match &args.public_key {
|
||||
None => None,
|
||||
Some(key_path) => Some(JwtAuth::from_key_path(key_path)?),
|
||||
Some(key) => Some(JwtAuth::from_key(key.clone()).context("Loading public key")?),
|
||||
};
|
||||
Ok(Self {
|
||||
database_url: database_url.to_owned(),
|
||||
@@ -170,6 +170,7 @@ impl Secrets {
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the diesel migrations that are built into this binary
|
||||
async fn migration_run(database_url: &str) -> anyhow::Result<()> {
|
||||
use diesel::PgConnection;
|
||||
use diesel_migrations::{HarnessWithOutput, MigrationHarness};
|
||||
@@ -183,8 +184,18 @@ async fn migration_run(database_url: &str) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
fn main() -> anyhow::Result<()> {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
// We use spawn_blocking for database operations, so require approximately
|
||||
// as many blocking threads as we will open database connections.
|
||||
.max_blocking_threads(Persistence::MAX_CONNECTIONS as usize)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
.block_on(async_main())
|
||||
}
|
||||
|
||||
async fn async_main() -> anyhow::Result<()> {
|
||||
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
|
||||
|
||||
logging::init(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use camino::Utf8Path;
|
||||
use camino::Utf8PathBuf;
|
||||
@@ -44,7 +45,7 @@ use crate::PlacementPolicy;
|
||||
/// updated, and reads of nodes are always from memory, not the database. We only require that
|
||||
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
|
||||
pub struct Persistence {
|
||||
database_url: String,
|
||||
connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
|
||||
|
||||
// In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
|
||||
// test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
|
||||
@@ -64,6 +65,8 @@ pub(crate) enum DatabaseError {
|
||||
Query(#[from] diesel::result::Error),
|
||||
#[error(transparent)]
|
||||
Connection(#[from] diesel::result::ConnectionError),
|
||||
#[error(transparent)]
|
||||
ConnectionPool(#[from] r2d2::Error),
|
||||
#[error("Logical error: {0}")]
|
||||
Logical(String),
|
||||
}
|
||||
@@ -71,9 +74,31 @@ pub(crate) enum DatabaseError {
|
||||
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
|
||||
|
||||
impl Persistence {
|
||||
// The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
|
||||
// normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
|
||||
pub const MAX_CONNECTIONS: u32 = 99;
|
||||
|
||||
// We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
|
||||
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
|
||||
|
||||
pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
|
||||
let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
|
||||
|
||||
// We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
|
||||
// to execute queries (database queries are not generally on latency-sensitive paths).
|
||||
let connection_pool = diesel::r2d2::Pool::builder()
|
||||
.max_size(Self::MAX_CONNECTIONS)
|
||||
.max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
|
||||
.idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
|
||||
// Always keep at least one connection ready to go
|
||||
.min_idle(Some(1))
|
||||
.test_on_check_out(true)
|
||||
.build(manager)
|
||||
.expect("Could not build connection pool");
|
||||
|
||||
Self {
|
||||
database_url,
|
||||
connection_pool,
|
||||
json_path,
|
||||
}
|
||||
}
|
||||
@@ -84,14 +109,10 @@ impl Persistence {
|
||||
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
|
||||
R: Send + 'static,
|
||||
{
|
||||
let database_url = self.database_url.clone();
|
||||
tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
|
||||
// TODO: connection pooling, such as via diesel::r2d2
|
||||
let mut conn = PgConnection::establish(&database_url)?;
|
||||
func(&mut conn)
|
||||
})
|
||||
.await
|
||||
.expect("Task panic")
|
||||
let mut conn = self.connection_pool.get()?;
|
||||
tokio::task::spawn_blocking(move || -> DatabaseResult<R> { func(&mut conn) })
|
||||
.await
|
||||
.expect("Task panic")
|
||||
}
|
||||
|
||||
/// When a node is first registered, persist it before using it for anything
|
||||
|
||||
@@ -103,7 +103,9 @@ impl From<DatabaseError> for ApiError {
|
||||
match err {
|
||||
DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
|
||||
// FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
|
||||
DatabaseError::Connection(_e) => ApiError::ShuttingDown,
|
||||
DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
|
||||
ApiError::ShuttingDown
|
||||
}
|
||||
DatabaseError::Logical(reason) => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(reason))
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ pub struct AttachmentService {
|
||||
listen: String,
|
||||
path: Utf8PathBuf,
|
||||
jwt_token: Option<String>,
|
||||
public_key_path: Option<Utf8PathBuf>,
|
||||
public_key: Option<String>,
|
||||
postgres_port: u16,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
@@ -207,7 +207,7 @@ impl AttachmentService {
|
||||
.pageservers
|
||||
.first()
|
||||
.expect("Config is validated to contain at least one pageserver");
|
||||
let (jwt_token, public_key_path) = match ps_conf.http_auth_type {
|
||||
let (jwt_token, public_key) = match ps_conf.http_auth_type {
|
||||
AuthType::Trust => (None, None),
|
||||
AuthType::NeonJWT => {
|
||||
let jwt_token = env
|
||||
@@ -219,7 +219,26 @@ impl AttachmentService {
|
||||
let public_key_path =
|
||||
camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem"))
|
||||
.unwrap();
|
||||
(Some(jwt_token), Some(public_key_path))
|
||||
|
||||
// This service takes keys as a string rather than as a path to a file/dir: read the key into memory.
|
||||
let public_key = if std::fs::metadata(&public_key_path)
|
||||
.expect("Can't stat public key")
|
||||
.is_dir()
|
||||
{
|
||||
// Our config may specify a directory: this is for the pageserver's ability to handle multiple
|
||||
// keys. We only use one key at a time, so, arbitrarily load the first one in the directory.
|
||||
let mut dir =
|
||||
std::fs::read_dir(&public_key_path).expect("Can't readdir public key path");
|
||||
let dent = dir
|
||||
.next()
|
||||
.expect("Empty key dir")
|
||||
.expect("Error reading key dir");
|
||||
|
||||
std::fs::read_to_string(dent.path()).expect("Can't read public key")
|
||||
} else {
|
||||
std::fs::read_to_string(&public_key_path).expect("Can't read public key")
|
||||
};
|
||||
(Some(jwt_token), Some(public_key))
|
||||
}
|
||||
};
|
||||
|
||||
@@ -228,7 +247,7 @@ impl AttachmentService {
|
||||
path,
|
||||
listen,
|
||||
jwt_token,
|
||||
public_key_path,
|
||||
public_key,
|
||||
postgres_port,
|
||||
client: reqwest::ClientBuilder::new()
|
||||
.build()
|
||||
@@ -453,8 +472,8 @@ impl AttachmentService {
|
||||
args.push(format!("--jwt-token={jwt_token}"));
|
||||
}
|
||||
|
||||
if let Some(public_key_path) = &self.public_key_path {
|
||||
args.push(format!("--public-key={public_key_path}"));
|
||||
if let Some(public_key) = &self.public_key {
|
||||
args.push(format!("--public-key=\"{public_key}\""));
|
||||
}
|
||||
|
||||
if let Some(control_plane_compute_hook_api) = &self.env.control_plane_compute_hook_api {
|
||||
|
||||
@@ -343,6 +343,23 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
modification.commit(&ctx).await?;
|
||||
uncommitted_records = 0;
|
||||
filtered_records = 0;
|
||||
|
||||
//
|
||||
// We should check checkpoint distance after appending each ingest_batch_size bytes because otherwise
|
||||
// layer size can become much larger than `checkpoint_distance`.
|
||||
// It can append because wal-sender is sending WAL using 125kb chucks and some WAL records can cause writing large
|
||||
// amount of data to key-value storage. So performing this check only after processing
|
||||
// all WAL records in the chunk, can cause huge L0 layer files.
|
||||
//
|
||||
timeline
|
||||
.check_checkpoint_distance()
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to check checkpoint distance for timeline {}",
|
||||
timeline.timeline_id
|
||||
)
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4005,16 +4005,13 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
|
||||
|
||||
restored_files = list_files_to_compare(restored_dir_path)
|
||||
|
||||
# Exclude pg_xact files from comparison, because postgres does not reliably write
|
||||
# this out before we reach this point (in spite of our CHECKPOINT above)
|
||||
# See https://github.com/neondatabase/neon/issues/559
|
||||
pgdata_files = list(filter(lambda f: not f.startswith("pg_xact"), pgdata_files))
|
||||
restored_files = list(filter(lambda f: not f.startswith("pg_xact"), restored_files))
|
||||
|
||||
if pgdata_files != restored_files:
|
||||
# filter out files which are downloaded on-demand. This also includes pg_xact, but
|
||||
# we removed those already.
|
||||
pgdata_files = [f for f in pgdata_files if not f.startswith("pg_multixact")]
|
||||
# filter pg_xact and multixact files which are downloaded on demand
|
||||
pgdata_files = [
|
||||
f
|
||||
for f in pgdata_files
|
||||
if not f.startswith("pg_xact") and not f.startswith("pg_multixact")
|
||||
]
|
||||
|
||||
# check that file sets are equal
|
||||
assert pgdata_files == restored_files
|
||||
|
||||
@@ -563,13 +563,13 @@ class PageserverHttpClient(requests.Session):
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
timeline_id: TimelineId,
|
||||
timestamp,
|
||||
timestamp: datetime,
|
||||
):
|
||||
log.info(
|
||||
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
|
||||
)
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp.isoformat()}Z",
|
||||
)
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
|
||||
@@ -26,86 +26,81 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
# apply during config step, like more users, databases, or extensions. By default
|
||||
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
|
||||
# test we only load neon.
|
||||
@pytest.mark.timeout(1000)
|
||||
def test_lazy_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
@pytest.mark.timeout(1800)
|
||||
@pytest.mark.parametrize("slru", ["lazy", "eager"])
|
||||
def test_lazy_startup(slru: str, neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
lazy_tenant, _ = env.neon_cli.create_tenant(
|
||||
lazy_slru_download = "true" if slru == "lazy" else "false"
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"lazy_slru_download": "true",
|
||||
"lazy_slru_download": lazy_slru_download,
|
||||
}
|
||||
)
|
||||
eager_tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"lazy_slru_download": "false",
|
||||
}
|
||||
)
|
||||
tenants = [lazy_tenant, eager_tenant]
|
||||
slru = "lazy"
|
||||
for tenant in tenants:
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
endpoint.safe_psql("CREATE TABLE t (pk integer PRIMARY KEY, x integer)")
|
||||
endpoint.safe_psql("ALTER TABLE t SET (autovacuum_enabled = false)")
|
||||
endpoint.safe_psql("INSERT INTO t VALUES (1, 0)")
|
||||
endpoint.safe_psql(
|
||||
"""
|
||||
CREATE PROCEDURE updating() as
|
||||
$$
|
||||
DECLARE
|
||||
i integer;
|
||||
BEGIN
|
||||
FOR i IN 1..10000000 LOOP
|
||||
UPDATE t SET x = x + 1 WHERE pk=1;
|
||||
COMMIT;
|
||||
END LOOP;
|
||||
END
|
||||
$$ LANGUAGE plpgsql
|
||||
"""
|
||||
)
|
||||
endpoint.safe_psql("SET statement_timeout=0")
|
||||
endpoint.safe_psql("call updating()")
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t (pk integer PRIMARY KEY, x integer)")
|
||||
cur.execute("ALTER TABLE t SET (autovacuum_enabled = false)")
|
||||
cur.execute("INSERT INTO t VALUES (1, 0)")
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE PROCEDURE updating() as
|
||||
$$
|
||||
DECLARE
|
||||
i integer;
|
||||
BEGIN
|
||||
FOR i IN 1..1000000 LOOP
|
||||
UPDATE t SET x = x + 1 WHERE pk=1;
|
||||
COMMIT;
|
||||
END LOOP;
|
||||
END
|
||||
$$ LANGUAGE plpgsql
|
||||
"""
|
||||
)
|
||||
cur.execute("SET statement_timeout=0")
|
||||
cur.execute("call updating()")
|
||||
|
||||
endpoint.stop()
|
||||
|
||||
# We do two iterations so we can see if the second startup is faster. It should
|
||||
# be because the compute node should already be configured with roles, databases,
|
||||
# extensions, etc from the first run.
|
||||
for i in range(2):
|
||||
# Start
|
||||
with zenbenchmark.record_duration(f"{slru}_{i}_start"):
|
||||
endpoint.start()
|
||||
|
||||
with zenbenchmark.record_duration(f"{slru}_{i}_select"):
|
||||
sum = endpoint.safe_psql("select sum(x) from t")[0][0]
|
||||
assert sum == 1000000
|
||||
|
||||
# Get metrics
|
||||
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
|
||||
durations = {
|
||||
"wait_for_spec_ms": f"{slru}_{i}_wait_for_spec",
|
||||
"sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers",
|
||||
"sync_sk_check_ms": f"{slru}_{i}_sync_sk_check",
|
||||
"basebackup_ms": f"{slru}_{i}_basebackup",
|
||||
"start_postgres_ms": f"{slru}_{i}_start_postgres",
|
||||
"config_ms": f"{slru}_{i}_config",
|
||||
"total_startup_ms": f"{slru}_{i}_total_startup",
|
||||
}
|
||||
for key, name in durations.items():
|
||||
value = metrics[key]
|
||||
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
basebackup_bytes = metrics["basebackup_bytes"]
|
||||
zenbenchmark.record(
|
||||
f"{slru}_{i}_basebackup_bytes",
|
||||
basebackup_bytes,
|
||||
"bytes",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
# Stop so we can restart
|
||||
endpoint.stop()
|
||||
|
||||
# We do two iterations so we can see if the second startup is faster. It should
|
||||
# be because the compute node should already be configured with roles, databases,
|
||||
# extensions, etc from the first run.
|
||||
for i in range(2):
|
||||
# Start
|
||||
with zenbenchmark.record_duration(f"{slru}_{i}_start"):
|
||||
endpoint.start()
|
||||
|
||||
with zenbenchmark.record_duration(f"{slru}_{i}_select"):
|
||||
sum = endpoint.safe_psql("select sum(x) from t")[0][0]
|
||||
assert sum == 10000000
|
||||
|
||||
# Get metrics
|
||||
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
|
||||
durations = {
|
||||
"wait_for_spec_ms": f"{slru}_{i}_wait_for_spec",
|
||||
"sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers",
|
||||
"sync_sk_check_ms": f"{slru}_{i}_sync_sk_check",
|
||||
"basebackup_ms": f"{slru}_{i}_basebackup",
|
||||
"start_postgres_ms": f"{slru}_{i}_start_postgres",
|
||||
"config_ms": f"{slru}_{i}_config",
|
||||
"total_startup_ms": f"{slru}_{i}_total_startup",
|
||||
}
|
||||
for key, name in durations.items():
|
||||
value = metrics[key]
|
||||
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
basebackup_bytes = metrics["basebackup_bytes"]
|
||||
zenbenchmark.record(
|
||||
f"{slru}_{i}_basebackup_bytes",
|
||||
basebackup_bytes,
|
||||
"bytes",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
# Stop so we can restart
|
||||
endpoint.stop()
|
||||
|
||||
# Imitate optimizations that console would do for the second start
|
||||
endpoint.respec(skip_pg_catalog_updates=True)
|
||||
slru = "eager"
|
||||
# Imitate optimizations that console would do for the second start
|
||||
endpoint.respec(skip_pg_catalog_updates=True)
|
||||
|
||||
66
test_runner/regress/test_layer_bloating.py
Normal file
66
test_runner/regress/test_layer_bloating.py
Normal file
@@ -0,0 +1,66 @@
|
||||
import os
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
logical_replication_sync,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
if env.pg_version != PgVersion.V16:
|
||||
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
|
||||
|
||||
timeline = env.neon_cli.create_branch("test_logical_replication", "empty")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_logical_replication", config_lines=["log_statement=all"]
|
||||
)
|
||||
|
||||
log.info("postgres is running on 'test_logical_replication' branch")
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# create table...
|
||||
cur.execute("create table t(pk integer primary key)")
|
||||
cur.execute("create publication pub1 for table t")
|
||||
# Create slot to hold WAL
|
||||
cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')")
|
||||
|
||||
# now start subscriber
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create table t(pk integer primary key)")
|
||||
|
||||
connstr = endpoint.connstr().replace("'", "''")
|
||||
log.info(f"ep connstr is {endpoint.connstr()}, subscriber connstr {vanilla_pg.connstr()}")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
|
||||
|
||||
cur.execute(
|
||||
"""create or replace function create_snapshots(n integer) returns void as $$
|
||||
declare
|
||||
i integer;
|
||||
begin
|
||||
for i in 1..n loop
|
||||
perform pg_log_standby_snapshot();
|
||||
end loop;
|
||||
end; $$ language plpgsql"""
|
||||
)
|
||||
cur.execute("set statement_timeout=0")
|
||||
cur.execute("select create_snapshots(10000)")
|
||||
# Wait logical replication to sync
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
time.sleep(10)
|
||||
|
||||
# Check layer file sizes
|
||||
timeline_path = "{}/tenants/{}/timelines/{}/".format(
|
||||
env.pageserver.workdir, env.initial_tenant, timeline
|
||||
)
|
||||
log.info(f"Check {timeline_path}")
|
||||
for filename in os.listdir(timeline_path):
|
||||
if filename.startswith("00000"):
|
||||
log.info(f"layer {filename} size is {os.path.getsize(timeline_path + filename)}")
|
||||
assert os.path.getsize(timeline_path + filename) < 512_000_000
|
||||
@@ -64,18 +64,14 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
# Check edge cases
|
||||
# Timestamp is in the future
|
||||
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||
assert result["kind"] == "future"
|
||||
# make sure that we return a well advanced lsn here
|
||||
assert Lsn(result["lsn"]) > start_lsn
|
||||
|
||||
# Timestamp is in the unreachable past
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||
assert result["kind"] == "past"
|
||||
# make sure that we return the minimum lsn here at the start of the range
|
||||
assert Lsn(result["lsn"]) < start_lsn
|
||||
@@ -83,9 +79,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
# Probe a bunch of timestamps in the valid range
|
||||
for i in range(1, len(tbl), 100):
|
||||
probe_timestamp = tbl[i][1]
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||
assert result["kind"] not in ["past", "nodata"]
|
||||
lsn = result["lsn"]
|
||||
# Call get_lsn_by_timestamp to get the LSN
|
||||
@@ -108,9 +102,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Timestamp is in the unreachable past
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id_child, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id_child, probe_timestamp)
|
||||
assert result["kind"] == "past"
|
||||
# make sure that we return the minimum lsn here at the start of the range
|
||||
assert Lsn(result["lsn"]) >= last_flush_lsn
|
||||
|
||||
@@ -310,7 +310,7 @@ def test_sharding_service_compute_hook(
|
||||
notifications.append(request.json)
|
||||
return Response(status=200)
|
||||
|
||||
httpserver.expect_request("/notify", method="POST").respond_with_handler(handler)
|
||||
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -29,7 +29,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
|
||||
clap = { version = "4", features = ["derive", "string"] }
|
||||
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
|
||||
crossbeam-utils = { version = "0.8" }
|
||||
diesel = { version = "2", features = ["postgres", "serde_json"] }
|
||||
diesel = { version = "2", features = ["postgres", "r2d2", "serde_json"] }
|
||||
either = { version = "1" }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
futures-channel = { version = "0.3", features = ["sink"] }
|
||||
@@ -90,6 +90,7 @@ anyhow = { version = "1", features = ["backtrace"] }
|
||||
bytes = { version = "1", features = ["serde"] }
|
||||
cc = { version = "1", default-features = false, features = ["parallel"] }
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
|
||||
diesel_derives = { version = "2", features = ["32-column-tables", "postgres", "r2d2", "with-deprecated"] }
|
||||
either = { version = "1" }
|
||||
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] }
|
||||
|
||||
Reference in New Issue
Block a user