Compare commits

..

2 Commits

Author SHA1 Message Date
Joonas Koivunen
ca1ed3dc3b drive by typo fix 2022-11-02 21:11:05 +02:00
Joonas Koivunen
dc2554dff6 chore: remove no longer needed empty rel fix
this seems to have been fixed long enough ago.
2022-11-02 21:10:44 +02:00
98 changed files with 1239 additions and 4466 deletions

View File

@@ -55,22 +55,6 @@ runs:
name: neon-${{ runner.os }}-${{ inputs.build_type }}-artifact
path: /tmp/neon
- name: Download Neon binaries for the previous release
if: inputs.build_type != 'remote'
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ inputs.build_type }}-artifact
path: /tmp/neon-previous
prefix: latest
- name: Download compatibility snapshot for Postgres 14
if: inputs.build_type != 'remote'
uses: ./.github/actions/download
with:
name: compatibility-snapshot-${{ inputs.build_type }}-pg14
path: /tmp/compatibility_snapshot_pg14
prefix: latest
- name: Checkout
if: inputs.needs_postgres_source == 'true'
uses: actions/checkout@v3
@@ -89,18 +73,23 @@ runs:
shell: bash -euxo pipefail {0}
run: ./scripts/pysync
- name: Download compatibility snapshot for Postgres 14
if: inputs.build_type != 'remote'
uses: ./.github/actions/download
with:
name: compatibility-snapshot-${{ inputs.build_type }}-pg14
path: /tmp/compatibility_snapshot_pg14
prefix: latest
- name: Run pytest
env:
NEON_BIN: /tmp/neon/bin
COMPATIBILITY_NEON_BIN: /tmp/neon-previous/bin
COMPATIBILITY_POSTGRES_DISTRIB_DIR: /tmp/neon-previous/pg_install
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: ${{ inputs.build_type }}
AWS_ACCESS_KEY_ID: ${{ inputs.real_s3_access_key_id }}
AWS_SECRET_ACCESS_KEY: ${{ inputs.real_s3_secret_access_key }}
COMPATIBILITY_SNAPSHOT_DIR: /tmp/compatibility_snapshot_pg14
ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'backward compatibility breakage')
ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage')
ALLOW_BREAKING_CHANGES: contains(github.event.pull_request.labels.*.name, 'breaking changes')
shell: bash -euxo pipefail {0}
run: |
# PLATFORM will be embedded in the perf test report
@@ -123,12 +112,7 @@ runs:
exit 1
fi
if [[ "${{ inputs.run_in_parallel }}" == "true" ]]; then
# -n4 uses four processes to run tests via pytest-xdist
EXTRA_PARAMS="-n4 $EXTRA_PARAMS"
# --dist=loadgroup points tests marked with @pytest.mark.xdist_group
# to the same worker to make @pytest.mark.order work with xdist
EXTRA_PARAMS="--dist=loadgroup $EXTRA_PARAMS"
fi
if [[ "${{ inputs.run_with_real_s3 }}" == "true" ]]; then
@@ -163,9 +147,9 @@ runs:
# --verbose prints name of each test (helpful when there are
# multiple tests in one file)
# -rA prints summary in the end
# -n4 uses four processes to run tests via pytest-xdist
# -s is not used to prevent pytest from capturing output, because tests are running
# in parallel and logs are mixed between different tests
#
mkdir -p $TEST_OUTPUT/allure/results
"${cov_prefix[@]}" ./scripts/pytest \
--junitxml=$TEST_OUTPUT/junit.xml \
@@ -185,8 +169,8 @@ runs:
uses: ./.github/actions/upload
with:
name: compatibility-snapshot-${{ inputs.build_type }}-pg14-${{ github.run_id }}
# The path includes a test name (test_create_snapshot) and directory that the test creates (compatibility_snapshot_pg14), keep the path in sync with the test
path: /tmp/test_output/test_create_snapshot/compatibility_snapshot_pg14/
# The path includes a test name (test_prepare_snapshot) and directory that the test creates (compatibility_snapshot_pg14), keep the path in sync with the test
path: /tmp/test_output/test_prepare_snapshot/compatibility_snapshot_pg14/
prefix: latest
- name: Create Allure report

View File

@@ -268,6 +268,32 @@ jobs:
if: matrix.build_type == 'debug'
uses: ./.github/actions/save-coverage-data
upload-latest-artifacts:
runs-on: dev
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ regress-tests ]
if: github.ref_name == 'main'
steps:
- name: Copy Neon artifact to the latest directory
shell: bash -euxo pipefail {0}
env:
BUCKET: neon-github-public-dev
PREFIX: artifacts/${{ github.run_id }}
run: |
for build_type in debug release; do
FILENAME=neon-${{ runner.os }}-${build_type}-artifact.tar.zst
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
if [ -z "${S3_KEY}" ]; then
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${FILENAME} nor its version from previous attempts exist"
exit 1
fi
time aws s3 cp --only-show-errors s3://${BUCKET}/${S3_KEY} s3://${BUCKET}/artifacts/latest/${FILENAME}
done
benchmarks:
runs-on: dev
container:
@@ -419,15 +445,11 @@ jobs:
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
options: --init
needs: [ push-docker-hub, tag ]
needs: [ build-neon ]
steps:
- name: Set PR's status to pending and request a remote CI test
run: |
# For pull requests, GH Actions set "github.sha" variable to point at a fake merge commit
# but we need to use a real sha of a latest commit in the PR's branch for the e2e job,
# to place a job run status update later.
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
# For non-PR kinds of runs, the above will produce an empty variable, pick the original sha value for those
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
REMOTE_REPO="${{ github.repository_owner }}/cloud"
@@ -453,9 +475,7 @@ jobs:
\"inputs\": {
\"ci_job_name\": \"neon-cloud-e2e\",
\"commit_hash\": \"$COMMIT_SHA\",
\"remote_repo\": \"${{ github.repository }}\",
\"storage_image_tag\": \"${{ needs.tag.outputs.build-tag }}\",
\"compute_image_tag\": \"${{ needs.tag.outputs.build-tag }}\"
\"remote_repo\": \"${{ github.repository }}\"
}
}"
@@ -916,7 +936,7 @@ jobs:
DOCKER_TAG=${{needs.tag.outputs.build-tag}}
helm upgrade neon-proxy-scram neondatabase/neon-proxy --namespace neon-proxy --create-namespace --install -f .github/helm-values/${{ matrix.target_cluster }}.neon-proxy-scram.yaml --set image.tag=${DOCKER_TAG} --wait --timeout 15m0s
promote-compatibility-data:
promote-compatibility-test-snapshot:
runs-on: dev
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
@@ -930,24 +950,9 @@ jobs:
BUCKET: neon-github-public-dev
PREFIX: artifacts/latest
run: |
# Update compatibility snapshot for the release
for build_type in debug release; do
OLD_FILENAME=compatibility-snapshot-${build_type}-pg14-${GITHUB_RUN_ID}.tar.zst
NEW_FILENAME=compatibility-snapshot-${build_type}-pg14.tar.zst
time aws s3 mv --only-show-errors s3://${BUCKET}/${PREFIX}/${OLD_FILENAME} s3://${BUCKET}/${PREFIX}/${NEW_FILENAME}
done
# Update Neon artifact for the release (reuse already uploaded artifact)
for build_type in debug release; do
OLD_PREFIX=artifacts/${GITHUB_RUN_ID}
FILENAME=neon-${{ runner.os }}-${build_type}-artifact.tar.zst
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${OLD_PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
if [ -z "${S3_KEY}" ]; then
echo 2>&1 "Neither s3://${BUCKET}/${OLD_PREFIX}/${FILENAME} nor its version from previous attempts exist"
exit 1
fi
time aws s3 cp --only-show-errors s3://${BUCKET}/${S3_KEY} s3://${BUCKET}/${PREFIX}/${FILENAME}
done

2
.gitmodules vendored
View File

@@ -1,7 +1,7 @@
[submodule "vendor/postgres-v14"]
path = vendor/postgres-v14
url = https://github.com/neondatabase/postgres.git
branch = REL_14_STABLE_neon
branch = main
[submodule "vendor/postgres-v15"]
path = vendor/postgres-v15
url = https://github.com/neondatabase/postgres.git

View File

@@ -1,10 +0,0 @@
/compute_tools/ @neondatabase/control-plane
/control_plane/ @neondatabase/compute @neondatabase/storage
/libs/pageserver_api/ @neondatabase/compute @neondatabase/storage
/libs/postgres_ffi/ @neondatabase/compute
/libs/remote_storage/ @neondatabase/storage
/libs/safekeeper_api/ @neondatabase/safekeepers
/pageserver/ @neondatabase/compute @neondatabase/storage
/pgxn/ @neondatabase/compute
/proxy/ @neondatabase/control-plane
/safekeeper/ @neondatabase/safekeepers

35
Cargo.lock generated
View File

@@ -2145,7 +2145,6 @@ dependencies = [
"postgres-types",
"postgres_ffi",
"pprof",
"pq_proto",
"rand",
"regex",
"remote_storage",
@@ -2158,7 +2157,6 @@ dependencies = [
"svg_fmt",
"tar",
"tempfile",
"tenant_size_model",
"thiserror",
"tokio",
"tokio-postgres",
@@ -2176,7 +2174,6 @@ name = "pageserver_api"
version = "0.1.0"
dependencies = [
"anyhow",
"byteorder",
"bytes",
"const_format",
"postgres_ffi",
@@ -2439,21 +2436,6 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "pq_proto"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"pin-project-lite",
"postgres-protocol",
"rand",
"serde",
"tokio",
"tracing",
"workspace_hack",
]
[[package]]
name = "prettyplease"
version = "0.1.21"
@@ -2586,7 +2568,6 @@ dependencies = [
"once_cell",
"parking_lot 0.12.1",
"pin-project-lite",
"pq_proto",
"rand",
"rcgen",
"reqwest",
@@ -3103,7 +3084,6 @@ dependencies = [
"postgres",
"postgres-protocol",
"postgres_ffi",
"pq_proto",
"regex",
"remote_storage",
"safekeeper_api",
@@ -3552,13 +3532,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "tenant_size_model"
version = "0.1.0"
dependencies = [
"workspace_hack",
]
[[package]]
name = "termcolor"
version = "1.1.3"
@@ -4064,7 +4037,9 @@ dependencies = [
"metrics",
"nix 0.25.0",
"once_cell",
"pq_proto",
"pin-project-lite",
"postgres",
"postgres-protocol",
"rand",
"routerify",
"rustls",
@@ -4389,9 +4364,6 @@ dependencies = [
"crossbeam-utils",
"either",
"fail",
"futures-channel",
"futures-task",
"futures-util",
"hashbrown",
"indexmap",
"libc",
@@ -4405,7 +4377,6 @@ dependencies = [
"rand",
"regex",
"regex-syntax",
"reqwest",
"scopeguard",
"serde",
"stable_deref_trait",

View File

@@ -65,7 +65,7 @@ impl GenericOption {
let name = match self.name.as_str() {
"safekeepers" => "neon.safekeepers",
"wal_acceptor_reconnect" => "neon.safekeeper_reconnect_timeout",
"wal_acceptor_connection_timeout" => "neon.safekeeper_connection_timeout",
"wal_acceptor_connect_timeout" => "neon.safekeeper_connect_timeout",
it => it,
};

View File

@@ -287,7 +287,7 @@ impl PostgresNode {
conf.append("shared_buffers", "1MB");
conf.append("fsync", "off");
conf.append("max_connections", "100");
conf.append("wal_level", "logical");
conf.append("wal_level", "replica");
// wal_sender_timeout is the maximum time to wait for WAL replication.
// It also defines how often the walreciever will send a feedback message to the wal sender.
conf.append("wal_sender_timeout", "5s");

View File

@@ -362,11 +362,6 @@ impl PageServerNode {
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
trace_read_requests: settings
.remove("trace_read_requests")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
@@ -429,11 +424,6 @@ impl PageServerNode {
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
trace_read_requests: settings
.get("trace_read_requests")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
})
.send()?
.error_from_body()?;

View File

@@ -9,7 +9,6 @@ serde_with = "2.0"
const_format = "0.2.21"
anyhow = { version = "1.0", features = ["backtrace"] }
bytes = "1.0.1"
byteorder = "1.4.3"
utils = { path = "../utils" }
postgres_ffi = { path = "../postgres_ffi" }

View File

@@ -1,6 +1,5 @@
use std::num::NonZeroU64;
use byteorder::{BigEndian, ReadBytesExt};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::{
@@ -10,7 +9,7 @@ use utils::{
use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Buf, BufMut, Bytes, BytesMut};
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -73,7 +72,6 @@ pub struct TenantCreateRequest {
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
}
#[serde_as]
@@ -113,7 +111,6 @@ pub struct TenantConfigRequest {
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
}
impl TenantConfigRequest {
@@ -132,7 +129,6 @@ impl TenantConfigRequest {
walreceiver_connect_timeout: None,
lagging_wal_timeout: None,
max_lsn_wal_lag: None,
trace_read_requests: None,
}
}
}
@@ -229,7 +225,6 @@ pub struct TimelineGcRequest {
}
// Wrapped in libpq CopyData
#[derive(PartialEq, Eq)]
pub enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
@@ -246,21 +241,21 @@ pub enum PagestreamBeMessage {
DbSize(PagestreamDbSizeResponse),
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug)]
pub struct PagestreamExistsRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug)]
pub struct PagestreamNblocksRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug)]
pub struct PagestreamGetPageRequest {
pub latest: bool,
pub lsn: Lsn,
@@ -268,7 +263,7 @@ pub struct PagestreamGetPageRequest {
pub blkno: u32,
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug)]
pub struct PagestreamDbSizeRequest {
pub latest: bool,
pub lsn: Lsn,
@@ -301,98 +296,52 @@ pub struct PagestreamDbSizeResponse {
}
impl PagestreamFeMessage {
pub fn serialize(&self) -> Bytes {
let mut bytes = BytesMut::new();
match self {
Self::Exists(req) => {
bytes.put_u8(0);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::Nblocks(req) => {
bytes.put_u8(1);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
}
Self::GetPage(req) => {
bytes.put_u8(2);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
}
Self::DbSize(req) => {
bytes.put_u8(3);
bytes.put_u8(if req.latest { 1 } else { 0 });
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.dbnode);
}
}
bytes.into()
}
pub fn parse<R: std::io::Read>(body: &mut R) -> anyhow::Result<PagestreamFeMessage> {
pub fn parse(mut body: Bytes) -> anyhow::Result<PagestreamFeMessage> {
// TODO these gets can fail
// these correspond to the NeonMessageTag enum in pagestore_client.h
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.read_u8()?;
let msg_tag = body.get_u8();
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
spcnode: body.get_u32(),
dbnode: body.get_u32(),
relnode: body.get_u32(),
forknum: body.get_u8(),
},
blkno: body.read_u32::<BigEndian>()?,
blkno: body.get_u32(),
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
latest: body.get_u8() != 0,
lsn: Lsn::from(body.get_u64()),
dbnode: body.get_u32(),
})),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
}
}
}
@@ -431,58 +380,3 @@ impl PagestreamBeMessage {
bytes.into()
}
}
#[cfg(test)]
mod tests {
use bytes::Buf;
use super::*;
#[test]
fn test_pagestream() {
// Test serialization/deserialization of PagestreamFeMessage
let messages = vec![
PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: true,
lsn: Lsn(4),
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
}),
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: false,
lsn: Lsn(4),
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
}),
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: true,
lsn: Lsn(4),
rel: RelTag {
forknum: 1,
spcnode: 2,
dbnode: 3,
relnode: 4,
},
blkno: 7,
}),
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: true,
lsn: Lsn(4),
dbnode: 7,
}),
];
for msg in messages {
let bytes = msg.serialize();
let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap();
assert!(msg == reconstructed);
}
}
}

View File

@@ -1,16 +0,0 @@
[package]
name = "pq_proto"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0"
bytes = "1.0.1"
pin-project-lite = "0.2.7"
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
rand = "0.8.3"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.17", features = ["macros"] }
tracing = "0.1"
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -1,3 +0,0 @@
*.dot
*.png
*.svg

View File

@@ -1,8 +0,0 @@
[package]
name = "tenant_size_model"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -1,13 +0,0 @@
all: 1.svg 2.svg 3.svg 4.svg 1.png 2.png 3.png 4.png
../../target/debug/tenant_size_model: Cargo.toml src/main.rs src/lib.rs
cargo build --bin tenant_size_model
%.svg: %.dot
dot -Tsvg $< > $@
%.png: %.dot
dot -Tpng $< > $@
%.dot: ../../target/debug/tenant_size_model
../../target/debug/tenant_size_model $* > $@

View File

@@ -1,7 +0,0 @@
# Logical size + WAL pricing
This is a simulator to calculate the tenant size in different scenarios,
using the "Logical size + WAL" method. Makefile produces diagrams used in a
private presentation:
https://docs.google.com/presentation/d/1OapE4k11xmcwMh7I7YvNWGC63yCRLh6udO9bXZ-fZmo/edit?usp=sharing

View File

@@ -1,349 +0,0 @@
use std::borrow::Cow;
use std::collections::HashMap;
/// Pricing model or history size builder.
///
/// Maintains knowledge of the branches and their modifications. Generic over the branch name key
/// type.
pub struct Storage<K: 'static> {
segments: Vec<Segment>,
/// Mapping from the branch name to the index of a segment describing it's latest state.
branches: HashMap<K, usize>,
}
/// Snapshot of a branch.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Segment {
/// Previous segment index into ['Storage::segments`], if any.
parent: Option<usize>,
/// Description of how did we get to this state.
///
/// Mainly used in the original scenarios 1..=4 with insert, delete and update. Not used when
/// modifying a branch directly.
pub op: Cow<'static, str>,
/// LSN before this state
start_lsn: u64,
/// LSN at this state
pub end_lsn: u64,
/// Logical size before this state
start_size: u64,
/// Logical size at this state
pub end_size: u64,
/// Indices to [`Storage::segments`]
///
/// FIXME: this could be an Option<usize>
children_after: Vec<usize>,
/// Determined by `retention_period` given to [`Storage::calculate`]
pub needed: bool,
}
//
//
//
//
// *-g--*---D--->
// /
// /
// / *---b----*-B--->
// / /
// / /
// -----*--e---*-----f----* C
// E \
// \
// *--a---*---A-->
//
// If A and B need to be retained, is it cheaper to store
// snapshot at C+a+b, or snapshots at A and B ?
//
// If D also needs to be retained, which is cheaper:
//
// 1. E+g+e+f+a+b
// 2. D+C+a+b
// 3. D+A+B
/// [`Segment`] which has had it's size calculated.
pub struct SegmentSize {
pub seg_id: usize,
pub method: SegmentMethod,
this_size: u64,
pub children: Vec<SegmentSize>,
}
impl SegmentSize {
fn total(&self) -> u64 {
self.this_size + self.children.iter().fold(0, |acc, x| acc + x.total())
}
pub fn total_children(&self) -> u64 {
if self.method == SnapshotAfter {
self.this_size + self.children.iter().fold(0, |acc, x| acc + x.total())
} else {
self.children.iter().fold(0, |acc, x| acc + x.total())
}
}
}
/// Different methods to retain history from a particular state
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SegmentMethod {
SnapshotAfter,
Wal,
WalNeeded,
Skipped,
}
use SegmentMethod::*;
impl<K: std::hash::Hash + Eq + 'static> Storage<K> {
/// Creates a new storage with the given default branch name.
pub fn new(initial_branch: K) -> Storage<K> {
let init_segment = Segment {
op: "".into(),
needed: false,
parent: None,
start_lsn: 0,
end_lsn: 0,
start_size: 0,
end_size: 0,
children_after: Vec::new(),
};
Storage {
segments: vec![init_segment],
branches: HashMap::from([(initial_branch, 0)]),
}
}
/// Advances the branch with the named operation, by the relative LSN and logical size bytes.
pub fn modify_branch<Q: ?Sized>(
&mut self,
branch: &Q,
op: Cow<'static, str>,
lsn_bytes: u64,
size_bytes: i64,
) where
K: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq,
{
let lastseg_id = *self.branches.get(branch).unwrap();
let newseg_id = self.segments.len();
let lastseg = &mut self.segments[lastseg_id];
let newseg = Segment {
op,
parent: Some(lastseg_id),
start_lsn: lastseg.end_lsn,
end_lsn: lastseg.end_lsn + lsn_bytes,
start_size: lastseg.end_size,
end_size: (lastseg.end_size as i64 + size_bytes) as u64,
children_after: Vec::new(),
needed: false,
};
lastseg.children_after.push(newseg_id);
self.segments.push(newseg);
*self.branches.get_mut(branch).expect("read already") = newseg_id;
}
pub fn insert<Q: ?Sized>(&mut self, branch: &Q, bytes: u64)
where
K: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq,
{
self.modify_branch(branch, "insert".into(), bytes, bytes as i64);
}
pub fn update<Q: ?Sized>(&mut self, branch: &Q, bytes: u64)
where
K: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq,
{
self.modify_branch(branch, "update".into(), bytes, 0i64);
}
pub fn delete<Q: ?Sized>(&mut self, branch: &Q, bytes: u64)
where
K: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq,
{
self.modify_branch(branch, "delete".into(), bytes, -(bytes as i64));
}
/// Panics if the parent branch cannot be found.
pub fn branch<Q: ?Sized>(&mut self, parent: &Q, name: K)
where
K: std::borrow::Borrow<Q>,
Q: std::hash::Hash + Eq,
{
// Find the right segment
let branchseg_id = *self
.branches
.get(parent)
.expect("should had found the parent by key");
let _branchseg = &mut self.segments[branchseg_id];
// Create branch name for it
self.branches.insert(name, branchseg_id);
}
pub fn calculate(&mut self, retention_period: u64) -> SegmentSize {
// Phase 1: Mark all the segments that need to be retained
for (_branch, &last_seg_id) in self.branches.iter() {
let last_seg = &self.segments[last_seg_id];
let cutoff_lsn = last_seg.start_lsn.saturating_sub(retention_period);
let mut seg_id = last_seg_id;
loop {
let seg = &mut self.segments[seg_id];
if seg.end_lsn < cutoff_lsn {
break;
}
seg.needed = true;
if let Some(prev_seg_id) = seg.parent {
seg_id = prev_seg_id;
} else {
break;
}
}
}
// Phase 2: For each oldest segment in a chain that needs to be retained,
// calculate if we should store snapshot or WAL
self.size_from_snapshot_later(0)
}
fn size_from_wal(&self, seg_id: usize) -> SegmentSize {
let seg = &self.segments[seg_id];
let this_size = seg.end_lsn - seg.start_lsn;
let mut children = Vec::new();
// try both ways
for &child_id in seg.children_after.iter() {
// try each child both ways
let child = &self.segments[child_id];
let p1 = self.size_from_wal(child_id);
let p = if !child.needed {
let p2 = self.size_from_snapshot_later(child_id);
if p1.total() < p2.total() {
p1
} else {
p2
}
} else {
p1
};
children.push(p);
}
SegmentSize {
seg_id,
method: if seg.needed { WalNeeded } else { Wal },
this_size,
children,
}
}
fn size_from_snapshot_later(&self, seg_id: usize) -> SegmentSize {
// If this is needed, then it's time to do the snapshot and continue
// with wal method.
let seg = &self.segments[seg_id];
//eprintln!("snap: seg{}: {} needed: {}", seg_id, seg.children_after.len(), seg.needed);
if seg.needed {
let mut children = Vec::new();
for &child_id in seg.children_after.iter() {
// try each child both ways
let child = &self.segments[child_id];
let p1 = self.size_from_wal(child_id);
let p = if !child.needed {
let p2 = self.size_from_snapshot_later(child_id);
if p1.total() < p2.total() {
p1
} else {
p2
}
} else {
p1
};
children.push(p);
}
SegmentSize {
seg_id,
method: WalNeeded,
this_size: seg.start_size,
children,
}
} else {
// If any of the direct children are "needed", need to be able to reconstruct here
let mut children_needed = false;
for &child in seg.children_after.iter() {
let seg = &self.segments[child];
if seg.needed {
children_needed = true;
break;
}
}
let method1 = if !children_needed {
let mut children = Vec::new();
for child in seg.children_after.iter() {
children.push(self.size_from_snapshot_later(*child));
}
Some(SegmentSize {
seg_id,
method: Skipped,
this_size: 0,
children,
})
} else {
None
};
// If this a junction, consider snapshotting here
let method2 = if children_needed || seg.children_after.len() >= 2 {
let mut children = Vec::new();
for child in seg.children_after.iter() {
children.push(self.size_from_wal(*child));
}
Some(SegmentSize {
seg_id,
method: SnapshotAfter,
this_size: seg.end_size,
children,
})
} else {
None
};
match (method1, method2) {
(None, None) => panic!(),
(Some(method), None) => method,
(None, Some(method)) => method,
(Some(method1), Some(method2)) => {
if method1.total() < method2.total() {
method1
} else {
method2
}
}
}
}
}
pub fn into_segments(self) -> Vec<Segment> {
self.segments
}
}

View File

@@ -1,268 +0,0 @@
//! Tenant size model testing ground.
//!
//! Has a number of scenarios and a `main` for invoking these by number, calculating the history
//! size, outputs graphviz graph. Makefile in directory shows how to use graphviz to turn scenarios
//! into pngs.
use tenant_size_model::{Segment, SegmentSize, Storage};
// Main branch only. Some updates on it.
fn scenario_1() -> (Vec<Segment>, SegmentSize) {
// Create main branch
let mut storage = Storage::new("main");
// Bulk load 5 GB of data to it
storage.insert("main", 5_000);
// Stream of updates
for _ in 0..5 {
storage.update("main", 1_000);
}
let size = storage.calculate(1000);
(storage.into_segments(), size)
}
// Main branch only. Some updates on it.
fn scenario_2() -> (Vec<Segment>, SegmentSize) {
// Create main branch
let mut storage = Storage::new("main");
// Bulk load 5 GB of data to it
storage.insert("main", 5_000);
// Stream of updates
for _ in 0..5 {
storage.update("main", 1_000);
}
// Branch
storage.branch("main", "child");
storage.update("child", 1_000);
// More updates on parent
storage.update("main", 1_000);
let size = storage.calculate(1000);
(storage.into_segments(), size)
}
// Like 2, but more updates on main
fn scenario_3() -> (Vec<Segment>, SegmentSize) {
// Create main branch
let mut storage = Storage::new("main");
// Bulk load 5 GB of data to it
storage.insert("main", 5_000);
// Stream of updates
for _ in 0..5 {
storage.update("main", 1_000);
}
// Branch
storage.branch("main", "child");
storage.update("child", 1_000);
// More updates on parent
for _ in 0..5 {
storage.update("main", 1_000);
}
let size = storage.calculate(1000);
(storage.into_segments(), size)
}
// Diverged branches
fn scenario_4() -> (Vec<Segment>, SegmentSize) {
// Create main branch
let mut storage = Storage::new("main");
// Bulk load 5 GB of data to it
storage.insert("main", 5_000);
// Stream of updates
for _ in 0..5 {
storage.update("main", 1_000);
}
// Branch
storage.branch("main", "child");
storage.update("child", 1_000);
// More updates on parent
for _ in 0..8 {
storage.update("main", 1_000);
}
let size = storage.calculate(1000);
(storage.into_segments(), size)
}
fn scenario_5() -> (Vec<Segment>, SegmentSize) {
let mut storage = Storage::new("a");
storage.insert("a", 5000);
storage.branch("a", "b");
storage.update("b", 4000);
storage.update("a", 2000);
storage.branch("a", "c");
storage.insert("c", 4000);
storage.insert("a", 2000);
let size = storage.calculate(5000);
(storage.into_segments(), size)
}
fn scenario_6() -> (Vec<Segment>, SegmentSize) {
use std::borrow::Cow;
const NO_OP: Cow<'static, str> = Cow::Borrowed("");
let branches = [
Some(0x7ff1edab8182025f15ae33482edb590a_u128),
Some(0xb1719e044db05401a05a2ed588a3ad3f),
Some(0xb68d6691c895ad0a70809470020929ef),
];
// compared to other scenarios, this one uses bytes instead of kB
let mut storage = Storage::new(None);
storage.branch(&None, branches[0]); // at 0
storage.modify_branch(&branches[0], NO_OP, 108951064, 43696128); // at 108951064
storage.branch(&branches[0], branches[1]); // at 108951064
storage.modify_branch(&branches[1], NO_OP, 15560408, -1851392); // at 124511472
storage.modify_branch(&branches[0], NO_OP, 174464360, -1531904); // at 283415424
storage.branch(&branches[0], branches[2]); // at 283415424
storage.modify_branch(&branches[2], NO_OP, 15906192, 8192); // at 299321616
storage.modify_branch(&branches[0], NO_OP, 18909976, 32768); // at 302325400
let size = storage.calculate(100_000);
(storage.into_segments(), size)
}
fn main() {
let args: Vec<String> = std::env::args().collect();
let scenario = if args.len() < 2 { "1" } else { &args[1] };
let (segments, size) = match scenario {
"1" => scenario_1(),
"2" => scenario_2(),
"3" => scenario_3(),
"4" => scenario_4(),
"5" => scenario_5(),
"6" => scenario_6(),
other => {
eprintln!("invalid scenario {}", other);
std::process::exit(1);
}
};
graphviz_tree(&segments, &size);
}
fn graphviz_recurse(segments: &[Segment], node: &SegmentSize) {
use tenant_size_model::SegmentMethod::*;
let seg_id = node.seg_id;
let seg = segments.get(seg_id).unwrap();
let lsn = seg.end_lsn;
let size = seg.end_size;
let method = node.method;
println!(" {{");
println!(" node [width=0.1 height=0.1 shape=oval]");
let tenant_size = node.total_children();
let penwidth = if seg.needed { 6 } else { 3 };
let x = match method {
SnapshotAfter =>
format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" style=filled penwidth={penwidth}"),
Wal =>
format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" color=\"black\" penwidth={penwidth}"),
WalNeeded =>
format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" color=\"black\" penwidth={penwidth}"),
Skipped =>
format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" color=\"gray\" penwidth={penwidth}"),
};
println!(" \"seg{seg_id}\" [{x}]");
println!(" }}");
// Recurse. Much of the data is actually on the edge
for child in node.children.iter() {
let child_id = child.seg_id;
graphviz_recurse(segments, child);
let edge_color = match child.method {
SnapshotAfter => "gray",
Wal => "black",
WalNeeded => "black",
Skipped => "gray",
};
println!(" {{");
println!(" edge [] ");
print!(" \"seg{seg_id}\" -> \"seg{child_id}\" [");
print!("color={edge_color}");
if child.method == WalNeeded {
print!(" penwidth=6");
}
if child.method == Wal {
print!(" penwidth=3");
}
let next = segments.get(child_id).unwrap();
if next.op.is_empty() {
print!(
" label=\"{} / {}\"",
next.end_lsn - seg.end_lsn,
(next.end_size as i128 - seg.end_size as i128)
);
} else {
print!(" label=\"{}: {}\"", next.op, next.end_lsn - seg.end_lsn);
}
println!("]");
println!(" }}");
}
}
fn graphviz_tree(segments: &[Segment], tree: &SegmentSize) {
println!("digraph G {{");
println!(" fontname=\"Helvetica,Arial,sans-serif\"");
println!(" node [fontname=\"Helvetica,Arial,sans-serif\"]");
println!(" edge [fontname=\"Helvetica,Arial,sans-serif\"]");
println!(" graph [center=1 rankdir=LR]");
println!(" edge [dir=none]");
graphviz_recurse(segments, tree);
println!("}}");
}
#[test]
fn scenarios_return_same_size() {
type ScenarioFn = fn() -> (Vec<Segment>, SegmentSize);
let truths: &[(u32, ScenarioFn, _)] = &[
(line!(), scenario_1, 8000),
(line!(), scenario_2, 9000),
(line!(), scenario_3, 13000),
(line!(), scenario_4, 16000),
(line!(), scenario_5, 17000),
(line!(), scenario_6, 333_792_000),
];
for (line, scenario, expected) in truths {
let (_, size) = scenario();
assert_eq!(*expected, size.total_children(), "scenario on line {line}");
}
}

View File

@@ -9,6 +9,9 @@ anyhow = "1.0"
bincode = "1.3"
bytes = "1.0.1"
hyper = { version = "0.14.7", features = ["full"] }
pin-project-lite = "0.2.7"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
routerify = "3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
@@ -30,8 +33,8 @@ once_cell = "1.13.0"
strum = "0.24"
strum_macros = "0.24"
metrics = { path = "../metrics" }
pq_proto = { path = "../pq_proto" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[dev-dependencies]

View File

@@ -204,17 +204,6 @@ pub struct TenantId(Id);
id_newtype!(TenantId);
/// Neon Connection Id identifies long-lived connections (for example a pagestream
/// connection with the page_service). Is used for better logging and tracing
///
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
/// See [`Id`] for alternative ways to serialize it.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
pub struct ConnectionId(Id);
id_newtype!(ConnectionId);
// A pair uniquely identifying Neon instance.
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct TenantTimelineId {

View File

@@ -1,6 +1,8 @@
//! `utils` is intended to be a place to put code that is shared
//! between other crates in this repository.
#![allow(clippy::manual_range_contains)]
/// `Lsn` type implements common tasks on Log Sequence Numbers
pub mod lsn;
/// SeqWait allows waiting for a future sequence number to arrive
@@ -15,6 +17,7 @@ pub mod vec_map;
pub mod bin_ser;
pub mod postgres_backend;
pub mod postgres_backend_async;
pub mod pq_proto;
// helper functions for creating and fsyncing
pub mod crashsafe;
@@ -39,6 +42,9 @@ pub mod lock_file;
pub mod accum;
pub mod shutdown;
// Tools for calling certain async methods in sync contexts
pub mod sync;
// Utility for binding TcpListeners with proper socket options.
pub mod tcp_listener;

View File

@@ -13,7 +13,7 @@ use crate::seqwait::MonotonicCounter;
pub const XLOG_BLCKSZ: u32 = 8192;
/// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Serialize, Deserialize)]
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)]
#[serde(transparent)]
pub struct Lsn(pub u64);

View File

@@ -3,10 +3,10 @@
//! implementation determining how to process the queries. Currently its API
//! is rather narrow, but we can extend it once required.
use crate::pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket};
use crate::sock_split::{BidiStream, ReadStream, WriteStream};
use anyhow::{bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket};
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::fmt;

View File

@@ -4,9 +4,9 @@
//! is rather narrow, but we can extend it once required.
use crate::postgres_backend::AuthType;
use crate::pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket};
use anyhow::{bail, Context, Result};
use bytes::{Bytes, BytesMut};
use pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket};
use rand::Rng;
use std::future::Future;
use std::net::SocketAddr;

View File

@@ -2,9 +2,7 @@
//! <https://www.postgresql.org/docs/devel/protocol-message-formats.html>
//! on message formats.
// Tools for calling certain async methods in sync contexts.
pub mod sync;
use crate::sync::{AsyncishRead, SyncFuture};
use anyhow::{bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_protocol::PG_EPOCH;
@@ -18,7 +16,6 @@ use std::{
str,
time::{Duration, SystemTime},
};
use sync::{AsyncishRead, SyncFuture};
use tokio::io::AsyncReadExt;
use tracing::{trace, warn};
@@ -201,7 +198,7 @@ impl FeMessage {
///
/// ```
/// # use std::io;
/// # use pq_proto::FeMessage;
/// # use utils::pq_proto::FeMessage;
/// #
/// # fn process_message(msg: FeMessage) -> anyhow::Result<()> {
/// # Ok(())
@@ -305,7 +302,6 @@ impl FeStartupPacket {
Err(e) => return Err(e.into()),
};
#[allow(clippy::manual_range_contains)]
if len < 4 || len > MAX_STARTUP_PACKET_LENGTH {
bail!("invalid message length");
}

View File

@@ -29,7 +29,7 @@ impl<S, T: Future> SyncFuture<S, T> {
/// Example:
///
/// ```
/// # use pq_proto::sync::SyncFuture;
/// # use utils::sync::SyncFuture;
/// # use std::future::Future;
/// # use tokio::io::AsyncReadExt;
/// #

View File

@@ -12,61 +12,61 @@ testing = ["fail/failpoints"]
profiling = ["pprof"]
[dependencies]
amplify_num = { git = "https://github.com/hlinnaka/rust-amplify.git", branch = "unsigned-int-perf" }
anyhow = { version = "1.0", features = ["backtrace"] }
async-stream = "0.3"
async-trait = "0.1"
byteorder = "1.4.3"
bytes = "1.0.1"
chrono = "0.4.19"
clap = { version = "4.0", features = ["string"] }
close_fds = "0.3.2"
const_format = "0.2.21"
crc32c = "0.6.0"
crossbeam-utils = "0.8.5"
fail = "0.5.0"
futures = "0.3.13"
git-version = "0.3.5"
hex = "0.4.3"
humantime = "2.1.0"
humantime-serde = "1.1.1"
hyper = "0.14"
itertools = "0.10.3"
nix = "0.25"
num-traits = "0.2.15"
once_cell = "1.13.0"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
pprof = { git = "https://github.com/neondatabase/pprof-rs.git", branch = "wallclock-profiling", features = ["flamegraph"], optional = true }
rand = "0.8.3"
regex = "1.4.5"
rstar = "0.9.3"
scopeguard = "1.1.0"
bytes = "1.0.1"
byteorder = "1.4.3"
futures = "0.3.13"
hex = "0.4.3"
hyper = "0.14"
itertools = "0.10.3"
clap = { version = "4.0", features = ["string"] }
tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] }
tokio-util = { version = "0.7.3", features = ["io", "io-util"] }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
anyhow = { version = "1.0", features = ["backtrace"] }
crc32c = "0.6.0"
thiserror = "1.0"
tar = "0.4.33"
humantime = "2.1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "2.0"
signal-hook = "0.3.10"
svg_fmt = "0.4.1"
tar = "0.4.33"
thiserror = "1.0"
tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-util = { version = "0.7.3", features = ["io", "io-util"] }
toml_edit = { version = "0.14", features = ["easy"] }
tracing = "0.1.36"
url = "2"
walkdir = "2.3.2"
humantime-serde = "1.1.1"
pprof = { git = "https://github.com/neondatabase/pprof-rs.git", branch = "wallclock-profiling", features = ["flamegraph"], optional = true }
toml_edit = { version = "0.14", features = ["easy"] }
scopeguard = "1.1.0"
const_format = "0.2.21"
tracing = "0.1.36"
signal-hook = "0.3.10"
url = "2"
nix = "0.25"
once_cell = "1.13.0"
crossbeam-utils = "0.8.5"
fail = "0.5.0"
git-version = "0.3.5"
rstar = "0.9.3"
num-traits = "0.2.15"
amplify_num = { git = "https://github.com/hlinnaka/rust-amplify.git", branch = "unsigned-int-perf" }
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }
pageserver_api = { path = "../libs/pageserver_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
pq_proto = { path = "../libs/pq_proto" }
remote_storage = { path = "../libs/remote_storage" }
tenant_size_model = { path = "../libs/tenant_size_model" }
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }
remote_storage = { path = "../libs/remote_storage" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
close_fds = "0.3.2"
walkdir = "2.3.2"
svg_fmt = "0.4.1"
[dev-dependencies]
criterion = "0.4"

View File

@@ -205,7 +205,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
new_lock_contents,
file,
} => {
info!("Created lock file at {lock_file_path:?} with contenst {new_lock_contents}");
info!("Created lock file at {lock_file_path:?} with contents {new_lock_contents}");
file
}
lock_file::LockCreationResult::AlreadyLocked {

View File

@@ -8,9 +8,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
use remote_storage::RemoteStorageConfig;
use std::env;
use utils::crashsafe::path_with_suffix_extension;
use utils::id::ConnectionId;
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;
@@ -50,9 +48,6 @@ pub mod defaults {
pub const DEFAULT_LOG_FORMAT: &str = "plain";
pub const DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES: usize =
super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
///
/// Default built-in configuration file.
///
@@ -72,9 +67,6 @@ pub mod defaults {
#initial_superuser_name = '{DEFAULT_SUPERUSER}'
#log_format = '{DEFAULT_LOG_FORMAT}'
#concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}'
# [tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
@@ -140,9 +132,6 @@ pub struct PageServerConf {
pub broker_endpoints: Vec<Url>,
pub log_format: LogFormat,
/// Number of concurrent [`Tenant::gather_size_inputs`] allowed.
pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -211,8 +200,6 @@ struct PageServerConfigBuilder {
broker_endpoints: BuilderValue<Vec<Url>>,
log_format: BuilderValue<LogFormat>,
concurrent_tenant_size_logical_size_queries: BuilderValue<ConfigurableSemaphore>,
}
impl Default for PageServerConfigBuilder {
@@ -241,8 +228,6 @@ impl Default for PageServerConfigBuilder {
broker_etcd_prefix: Set(etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string()),
broker_endpoints: Set(Vec::new()),
log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
concurrent_tenant_size_logical_size_queries: Set(ConfigurableSemaphore::default()),
}
}
}
@@ -319,10 +304,6 @@ impl PageServerConfigBuilder {
self.log_format = BuilderValue::Set(log_format)
}
pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: ConfigurableSemaphore) {
self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u);
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let broker_endpoints = self
.broker_endpoints
@@ -368,11 +349,6 @@ impl PageServerConfigBuilder {
.broker_etcd_prefix
.ok_or(anyhow!("missing broker_etcd_prefix"))?,
log_format: self.log_format.ok_or(anyhow!("missing log_format"))?,
concurrent_tenant_size_logical_size_queries: self
.concurrent_tenant_size_logical_size_queries
.ok_or(anyhow!(
"missing concurrent_tenant_size_logical_size_queries"
))?,
})
}
}
@@ -415,22 +391,6 @@ impl PageServerConf {
)
}
pub fn traces_path(&self) -> PathBuf {
self.workdir.join("traces")
}
pub fn trace_path(
&self,
tenant_id: &TenantId,
timeline_id: &TimelineId,
connection_id: &ConnectionId,
) -> PathBuf {
self.traces_path()
.join(tenant_id.to_string())
.join(timeline_id.to_string())
.join(connection_id.to_string())
}
/// Points to a place in pageserver's local directory,
/// where certain timeline's metadata file should be located.
pub fn metadata_path(&self, timeline_id: TimelineId, tenant_id: TenantId) -> PathBuf {
@@ -516,12 +476,6 @@ impl PageServerConf {
"log_format" => builder.log_format(
LogFormat::from_config(&parse_toml_string(key, item)?)?
),
"concurrent_tenant_size_logical_size_queries" => builder.concurrent_tenant_size_logical_size_queries({
let input = parse_toml_string(key, item)?;
let permits = input.parse::<usize>().context("expected a number of initial permits, not {s:?}")?;
let permits = NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?;
ConfigurableSemaphore::new(permits)
}),
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -635,7 +589,6 @@ impl PageServerConf {
broker_endpoints: Vec::new(),
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
}
}
}
@@ -701,58 +654,6 @@ fn parse_toml_array(name: &str, item: &Item) -> anyhow::Result<Vec<String>> {
.collect()
}
/// Configurable semaphore permits setting.
///
/// Does not allow semaphore permits to be zero, because at runtime initially zero permits and empty
/// semaphore cannot be distinguished, leading any feature using these to await forever (or until
/// new permits are added).
#[derive(Debug, Clone)]
pub struct ConfigurableSemaphore {
initial_permits: NonZeroUsize,
inner: std::sync::Arc<tokio::sync::Semaphore>,
}
impl ConfigurableSemaphore {
pub const DEFAULT_INITIAL: NonZeroUsize = match NonZeroUsize::new(1) {
Some(x) => x,
None => panic!("const unwrap is not yet stable"),
};
/// Initializse using a non-zero amount of permits.
///
/// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a
/// feature such as [`Tenant::gather_size_inputs`]. Otherwise any semaphore using future will
/// behave like [`futures::future::pending`], just waiting until new permits are added.
pub fn new(initial_permits: NonZeroUsize) -> Self {
ConfigurableSemaphore {
initial_permits,
inner: std::sync::Arc::new(tokio::sync::Semaphore::new(initial_permits.get())),
}
}
}
impl Default for ConfigurableSemaphore {
fn default() -> Self {
Self::new(Self::DEFAULT_INITIAL)
}
}
impl PartialEq for ConfigurableSemaphore {
fn eq(&self, other: &Self) -> bool {
// the number of permits can be increased at runtime, so we cannot really fulfill the
// PartialEq value equality otherwise
self.initial_permits == other.initial_permits
}
}
impl Eq for ConfigurableSemaphore {}
impl ConfigurableSemaphore {
pub fn inner(&self) -> &std::sync::Arc<tokio::sync::Semaphore> {
&self.inner
}
}
#[cfg(test)]
mod tests {
use std::{
@@ -824,7 +725,6 @@ log_format = 'json'
.expect("Failed to parse a valid broker endpoint URL")],
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -870,7 +770,6 @@ log_format = 'json'
.expect("Failed to parse a valid broker endpoint URL")],
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
log_format: LogFormat::Json,
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -354,54 +354,6 @@ paths:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/size:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
get:
description: |
Calculate tenant's size, which is a mixture of WAL (bytes) and logical_size (bytes).
responses:
"200":
description: OK,
content:
application/json:
schema:
type: object
required:
- id
- size
properties:
id:
type: string
format: hex
size:
type: integer
description: |
Size metric in bytes.
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/timeline/:
parameters:
- name: tenant_id

View File

@@ -566,44 +566,6 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
)
}
async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let tenant = tenant_mgr::get_tenant(tenant_id, false).map_err(ApiError::InternalServerError)?;
// this can be long operation, it currently is not backed by any request coalescing or similar
let inputs = tenant
.gather_size_inputs()
.await
.map_err(ApiError::InternalServerError)?;
let size = inputs.calculate().map_err(ApiError::InternalServerError)?;
/// Private response type with the additional "unstable" `inputs` field.
///
/// The type is described with `id` and `size` in the openapi_spec file, but the `inputs` is
/// intentionally left out. The type resides in the pageserver not to expose `ModelInputs`.
#[serde_with::serde_as]
#[derive(serde::Serialize)]
struct TenantHistorySize {
#[serde_as(as = "serde_with::DisplayFromStr")]
id: TenantId,
/// Size is a mixture of WAL and logical size, so the unit is bytes.
size: u64,
inputs: crate::tenant::size::ModelInputs,
}
json_response(
StatusCode::OK,
TenantHistorySize {
id: tenant_id,
size,
inputs,
},
)
}
// Helper function to standardize the error messages we produce on bad durations
//
// Intended to be used with anyhow's `with_context`, e.g.:
@@ -618,7 +580,6 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
check_permission(&request, None)?;
let request_data: TenantCreateRequest = json_request(&mut request).await?;
println!("tenant create: {:?}", request_data.trace_read_requests);
let remote_index = get_state(&request).remote_index.clone();
let mut tenant_conf = TenantConfOpt::default();
@@ -660,9 +621,6 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(trace_read_requests) = request_data.trace_read_requests {
tenant_conf.trace_read_requests = Some(trace_read_requests);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
@@ -750,9 +708,6 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(trace_read_requests) = request_data.trace_read_requests {
tenant_conf.trace_read_requests = Some(trace_read_requests);
}
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = request_data.checkpoint_timeout {
@@ -832,14 +787,14 @@ async fn timeline_gc_handler(mut request: Request<Body>) -> Result<Response<Body
let tenant = tenant_mgr::get_tenant(tenant_id, false).map_err(ApiError::NotFound)?;
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
let _span_guard =
info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id).entered();
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
let result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)
.instrument(info_span!("manual_gc", tenant = %tenant_id, timeline = %timeline_id))
.await
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
.map_err(ApiError::InternalServerError)?;
@@ -875,7 +830,6 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
.map_err(ApiError::NotFound)?;
timeline
.checkpoint(CheckpointConfig::Forced)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
@@ -939,7 +893,6 @@ pub fn make_router(
.get("/v1/tenant", tenant_list_handler)
.post("/v1/tenant", tenant_create_handler)
.get("/v1/tenant/:tenant_id", tenant_status)
.get("/v1/tenant/:tenant_id/size", tenant_size_handler)
.put("/v1/tenant/config", tenant_config_handler)
.get("/v1/tenant/:tenant_id/timeline", timeline_list_handler)
.post("/v1/tenant/:tenant_id/timeline", timeline_create_handler)

View File

@@ -15,7 +15,6 @@ pub mod tenant;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_tasks;
pub mod trace;
pub mod virtual_file;
pub mod walingest;
pub mod walreceiver;

View File

@@ -31,7 +31,6 @@ const STORAGE_TIME_OPERATIONS: &[&str] = &[
"compact",
"create images",
"init logical size",
"logical size",
"load layer map",
"gc",
];
@@ -366,7 +365,6 @@ pub struct TimelineMetrics {
pub compact_time_histo: Histogram,
pub create_images_time_histo: Histogram,
pub init_logical_size_histo: Histogram,
pub logical_size_histo: Histogram,
pub load_layer_map_histo: Histogram,
pub last_record_gauge: IntGauge,
pub wait_lsn_time_histo: Histogram,
@@ -399,9 +397,6 @@ impl TimelineMetrics {
let init_logical_size_histo = STORAGE_TIME
.get_metric_with_label_values(&["init logical size", &tenant_id, &timeline_id])
.unwrap();
let logical_size_histo = STORAGE_TIME
.get_metric_with_label_values(&["logical size", &tenant_id, &timeline_id])
.unwrap();
let load_layer_map_histo = STORAGE_TIME
.get_metric_with_label_values(&["load layer map", &tenant_id, &timeline_id])
.unwrap();
@@ -433,7 +428,6 @@ impl TimelineMetrics {
compact_time_histo,
create_images_time_histo,
init_logical_size_histo,
logical_size_histo,
load_layer_map_histo,
last_record_gauge,
wait_lsn_time_histo,

View File

@@ -10,7 +10,6 @@
//
use anyhow::{bail, ensure, Context, Result};
use bytes::Buf;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use pageserver_api::models::{
@@ -19,23 +18,21 @@ use pageserver_api::models::{
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse,
};
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::io;
use std::net::TcpListener;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
use tokio::pin;
use tokio_util::io::StreamReader;
use tokio_util::io::SyncIoBridge;
use tracing::*;
use utils::id::ConnectionId;
use utils::{
auth::{self, Claims, JwtAuth, Scope},
id::{TenantId, TimelineId},
lsn::Lsn,
postgres_backend::AuthType,
postgres_backend_async::{self, PostgresBackend},
pq_proto::{BeMessage, FeMessage, RowDescriptor},
simple_rcu::RcuReadGuard,
};
@@ -48,7 +45,6 @@ use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use crate::tenant_mgr;
use crate::trace::Tracer;
use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
@@ -271,18 +267,6 @@ impl PageServerHandler {
// so there is no need to reset the association
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Make request tracer if needed
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
let mut tracer = if tenant.get_trace_read_requests() {
let connection_id = ConnectionId::generate();
let path = tenant
.conf
.trace_path(&tenant_id, &timeline_id, &connection_id);
Some(Tracer::new(path))
} else {
None
};
// Check that the timeline exists
let timeline = get_local_timeline(tenant_id, timeline_id)?;
@@ -315,12 +299,7 @@ impl PageServerHandler {
trace!("query: {copy_data_bytes:?}");
// Trace request if needed
if let Some(t) = tracer.as_mut() {
t.trace(&copy_data_bytes)
}
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
@@ -387,12 +366,14 @@ impl PageServerHandler {
pgb.write_message(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
let copyin_stream = copyin_stream(pgb);
pin!(copyin_stream);
timeline
.import_basebackup_from_tar(&mut copyin_stream, base_lsn)
.await?;
// import_basebackup_from_tar() is not async, mainly because the Tar crate
// it uses is not async. So we need to jump through some hoops:
// - convert the input from client connection to a synchronous Read
// - use block_in_place()
let mut copyin_stream = Box::pin(copyin_stream(pgb));
let reader = SyncIoBridge::new(StreamReader::new(&mut copyin_stream));
tokio::task::block_in_place(|| timeline.import_basebackup_from_tar(reader, base_lsn))?;
timeline.initialize()?;
// Drain the rest of the Copy data
let mut bytes_after_tar = 0;
@@ -457,7 +438,7 @@ impl PageServerHandler {
// We only want to persist the data, and it doesn't matter if it's in the
// shape of deltas or images.
info!("flushing layers");
timeline.checkpoint(CheckpointConfig::Flush).await?;
timeline.checkpoint(CheckpointConfig::Flush)?;
info!("done");
Ok(())

View File

@@ -12,12 +12,8 @@
//!
use anyhow::{bail, Context};
use bytes::Bytes;
use futures::Stream;
use pageserver_api::models::TimelineState;
use tokio::sync::watch;
use tokio_util::io::StreamReader;
use tokio_util::io::SyncIoBridge;
use tracing::*;
use utils::crashsafe::path_with_suffix_extension;
@@ -33,7 +29,6 @@ use std::io::Write;
use std::ops::Bound::Included;
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;
use std::process::Command;
use std::process::Stdio;
use std::sync::Arc;
@@ -77,8 +72,6 @@ pub mod storage_layer;
mod timeline;
pub mod size;
use storage_layer::Layer;
pub use timeline::Timeline;
@@ -127,9 +120,6 @@ pub struct Tenant {
/// Makes every timeline to backup their files to remote storage.
upload_layers: bool,
/// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
}
/// A timeline with some of its files on disk, being initialized.
@@ -142,7 +132,7 @@ pub struct Tenant {
pub struct UninitializedTimeline<'t> {
owning_tenant: &'t Tenant,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark)>,
raw_timeline: Option<(Timeline, TimelineUninitMark)>,
}
/// An uninit mark file, created along the timeline dir to ensure the timeline either gets fully initialized and loaded into pageserver's memory,
@@ -174,6 +164,7 @@ impl UninitializedTimeline<'_> {
let (new_timeline, uninit_mark) = self.raw_timeline.take().with_context(|| {
format!("No timeline for initalization found for {tenant_id}/{timeline_id}")
})?;
let new_timeline = Arc::new(new_timeline);
let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn();
// TODO it would be good to ensure that, but apparently a lot of our testing is dependend on that at least
@@ -201,9 +192,6 @@ impl UninitializedTimeline<'_> {
})?;
new_timeline.set_state(TimelineState::Active);
v.insert(Arc::clone(&new_timeline));
new_timeline.maybe_spawn_flush_loop();
new_timeline.launch_wal_receiver();
}
}
@@ -212,28 +200,20 @@ impl UninitializedTimeline<'_> {
}
/// Prepares timeline data by loading it from the basebackup archive.
pub async fn import_basebackup_from_tar(
self,
mut copyin_stream: &mut Pin<&mut impl Stream<Item = io::Result<Bytes>>>,
pub fn import_basebackup_from_tar(
&self,
reader: impl std::io::Read,
base_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
) -> anyhow::Result<()> {
let raw_timeline = self.raw_timeline()?;
// import_basebackup_from_tar() is not async, mainly because the Tar crate
// it uses is not async. So we need to jump through some hoops:
// - convert the input from client connection to a synchronous Read
// - use block_in_place()
let reader = SyncIoBridge::new(StreamReader::new(&mut copyin_stream));
tokio::task::block_in_place(|| {
import_datadir::import_basebackup_from_tar(raw_timeline, reader, base_lsn)
.context("Failed to import basebackup")
})?;
// Flush loop needs to be spawned in order for checkpoint to be able to flush.
// We want to run proper checkpoint before we mark timeline as available to outside world
// Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock
raw_timeline.maybe_spawn_flush_loop();
import_datadir::import_basebackup_from_tar(raw_timeline, reader, base_lsn).with_context(
|| {
format!(
"Failed to import basebackup for timeline {}/{}",
self.owning_tenant.tenant_id, self.timeline_id
)
},
)?;
fail::fail_point!("before-checkpoint-new-timeline", |_| {
bail!("failpoint before-checkpoint-new-timeline");
@@ -241,15 +221,16 @@ impl UninitializedTimeline<'_> {
raw_timeline
.checkpoint(CheckpointConfig::Flush)
.await
.context("Failed to checkpoint after basebackup import")?;
let timeline = self.initialize()?;
Ok(timeline)
.with_context(|| {
format!(
"Failed to checkpoint after basebackup import for timeline {}/{}",
self.owning_tenant.tenant_id, self.timeline_id
)
})?;
Ok(())
}
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
fn raw_timeline(&self) -> anyhow::Result<&Timeline> {
Ok(&self
.raw_timeline
.as_ref()
@@ -484,7 +465,7 @@ impl Tenant {
self.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?
}
None => self.bootstrap_timeline(new_timeline_id, pg_version).await?,
None => self.bootstrap_timeline(new_timeline_id, pg_version)?,
};
// Have added new timeline into the tenant, now its background tasks are needed.
@@ -502,7 +483,7 @@ impl Tenant {
/// `checkpoint_before_gc` parameter is used to force compaction of storage before GC
/// to make tests more deterministic.
/// TODO Do we still need it or we can call checkpoint explicitly in tests where needed?
pub async fn gc_iteration(
pub fn gc_iteration(
&self,
target_timeline_id: Option<TimelineId>,
horizon: u64,
@@ -518,13 +499,11 @@ impl Tenant {
.map(|x| x.to_string())
.unwrap_or_else(|| "-".to_string());
{
let _timer = STORAGE_TIME
.with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str])
.start_timer();
self.gc_iteration_internal(target_timeline_id, horizon, pitr, checkpoint_before_gc)
.await
}
STORAGE_TIME
.with_label_values(&["gc", &self.tenant_id.to_string(), &timeline_str])
.observe_closure_duration(|| {
self.gc_iteration_internal(target_timeline_id, horizon, pitr, checkpoint_before_gc)
})
}
/// Perform one compaction iteration.
@@ -560,24 +539,23 @@ impl Tenant {
///
/// Used at graceful shutdown.
///
pub async fn checkpoint(&self) -> anyhow::Result<()> {
pub fn checkpoint(&self) -> anyhow::Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// checkpoints. We don't want to block everything else while the
// checkpoint runs.
let timelines_to_checkpoint = {
let timelines = self.timelines.lock().unwrap();
timelines
.iter()
.map(|(id, timeline)| (*id, Arc::clone(timeline)))
.collect::<Vec<_>>()
};
let timelines = self.timelines.lock().unwrap();
let timelines_to_checkpoint = timelines
.iter()
.map(|(timeline_id, timeline)| (*timeline_id, Arc::clone(timeline)))
.collect::<Vec<_>>();
drop(timelines);
for (id, timeline) in &timelines_to_checkpoint {
timeline
.checkpoint(CheckpointConfig::Flush)
.instrument(info_span!("checkpoint", timeline = %id, tenant = %self.tenant_id))
.await?;
for (timeline_id, timeline) in &timelines_to_checkpoint {
let _entered =
info_span!("checkpoint", timeline = %timeline_id, tenant = %self.tenant_id)
.entered();
timeline.checkpoint(CheckpointConfig::Flush)?;
}
Ok(())
@@ -806,13 +784,6 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
}
pub fn get_trace_read_requests(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.trace_read_requests
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
}
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
}
@@ -863,7 +834,6 @@ impl Tenant {
remote_index,
upload_layers,
state,
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
}
}
@@ -985,9 +955,8 @@ impl Tenant {
// +-----baz-------->
//
//
// 1. Grab 'gc_cs' mutex to prevent new timelines from being created while Timeline's
// `gc_infos` are being refreshed
// 2. Scan collected timelines, and on each timeline, make note of the
// 1. Grab 'gc_cs' mutex to prevent new timelines from being created
// 2. Scan all timelines, and on each timeline, make note of the
// all the points where other timelines have been branched off.
// We will refrain from removing page versions at those LSNs.
// 3. For each timeline, scan all layer files on the timeline.
@@ -998,7 +967,7 @@ impl Tenant {
// - if a relation has a non-incremental persistent layer on a child branch, then we
// don't need to keep that in the parent anymore. But currently
// we do.
async fn gc_iteration_internal(
fn gc_iteration_internal(
&self,
target_timeline_id: Option<TimelineId>,
horizon: u64,
@@ -1008,68 +977,6 @@ impl Tenant {
let mut totals: GcResult = Default::default();
let now = Instant::now();
let gc_timelines = self.refresh_gc_info_internal(target_timeline_id, horizon, pitr)?;
// Perform GC for each timeline.
//
// Note that we don't hold the GC lock here because we don't want
// to delay the branch creation task, which requires the GC lock.
// A timeline GC iteration can be slow because it may need to wait for
// compaction (both require `layer_removal_cs` lock),
// but the GC iteration can run concurrently with branch creation.
//
// See comments in [`Tenant::branch_timeline`] for more information
// about why branch creation task can run concurrently with timeline's GC iteration.
for timeline in gc_timelines {
if task_mgr::is_shutdown_requested() {
// We were requested to shut down. Stop and return with the progress we
// made.
break;
}
// If requested, force flush all in-memory layers to disk first,
// so that they too can be garbage collected. That's
// used in tests, so we want as deterministic results as possible.
if checkpoint_before_gc {
timeline.checkpoint(CheckpointConfig::Forced).await?;
info!(
"timeline {} checkpoint_before_gc done",
timeline.timeline_id
);
}
let result = timeline.gc()?;
totals += result;
}
totals.elapsed = now.elapsed();
Ok(totals)
}
/// Refreshes the Timeline::gc_info for all timelines, returning the
/// vector of timelines which have [`Timeline::get_last_record_lsn`] past
/// [`Tenant::get_gc_horizon`].
///
/// This is usually executed as part of periodic gc, but can now be triggered more often.
pub fn refresh_gc_info(&self) -> anyhow::Result<Vec<Arc<Timeline>>> {
// since this method can now be called at different rates than the configured gc loop, it
// might be that these configuration values get applied faster than what it was previously,
// since these were only read from the gc task.
let horizon = self.get_gc_horizon();
let pitr = self.get_pitr_interval();
// refresh all timelines
let target_timeline_id = None;
self.refresh_gc_info_internal(target_timeline_id, horizon, pitr)
}
fn refresh_gc_info_internal(
&self,
target_timeline_id: Option<TimelineId>,
horizon: u64,
pitr: Duration,
) -> anyhow::Result<Vec<Arc<Timeline>>> {
// grab mutex to prevent new timelines from being created here.
let gc_cs = self.gc_cs.lock().unwrap();
@@ -1088,6 +995,9 @@ impl Tenant {
timelines
.iter()
.map(|(timeline_id, timeline_entry)| {
// This is unresolved question for now, how to do gc in presence of remote timelines
// especially when this is combined with branching.
// Somewhat related: https://github.com/neondatabase/neon/issues/999
if let Some(ancestor_timeline_id) = &timeline_entry.get_ancestor_timeline_id() {
// If target_timeline is specified, we only need to know branchpoints of its children
if let Some(timeline_id) = target_timeline_id {
@@ -1141,7 +1051,41 @@ impl Tenant {
}
}
drop(gc_cs);
Ok(gc_timelines)
// Perform GC for each timeline.
//
// Note that we don't hold the GC lock here because we don't want
// to delay the branch creation task, which requires the GC lock.
// A timeline GC iteration can be slow because it may need to wait for
// compaction (both require `layer_removal_cs` lock),
// but the GC iteration can run concurrently with branch creation.
//
// See comments in [`Tenant::branch_timeline`] for more information
// about why branch creation task can run concurrently with timeline's GC iteration.
for timeline in gc_timelines {
if task_mgr::is_shutdown_requested() {
// We were requested to shut down. Stop and return with the progress we
// made.
break;
}
// If requested, force flush all in-memory layers to disk first,
// so that they too can be garbage collected. That's
// used in tests, so we want as deterministic results as possible.
if checkpoint_before_gc {
timeline.checkpoint(CheckpointConfig::Forced)?;
info!(
"timeline {} checkpoint_before_gc done",
timeline.timeline_id
);
}
let result = timeline.gc()?;
totals += result;
}
totals.elapsed = now.elapsed();
Ok(totals)
}
/// Branch an existing timeline
@@ -1245,15 +1189,14 @@ impl Tenant {
/// - run initdb to init temporary instance and get bootstrap data
/// - after initialization complete, remove the temp dir.
async fn bootstrap_timeline(
fn bootstrap_timeline(
&self,
timeline_id: TimelineId,
pg_version: u32,
) -> anyhow::Result<Arc<Timeline>> {
let timeline_uninit_mark = {
let timelines = self.timelines.lock().unwrap();
self.create_timeline_uninit_mark(timeline_id, &timelines)?
};
let timelines = self.timelines.lock().unwrap();
let timeline_uninit_mark = self.create_timeline_uninit_mark(timeline_id, &timelines)?;
drop(timelines);
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
// temporary directory for basebackup files for the given timeline.
let initdb_path = path_with_suffix_extension(
@@ -1303,35 +1246,25 @@ impl Tenant {
let tenant_id = raw_timeline.owning_tenant.tenant_id;
let unfinished_timeline = raw_timeline.raw_timeline()?;
tokio::task::block_in_place(|| {
import_datadir::import_timeline_from_postgres_datadir(
unfinished_timeline,
pgdata_path,
pgdata_lsn,
)
})
import_datadir::import_timeline_from_postgres_datadir(
unfinished_timeline,
pgdata_path,
pgdata_lsn,
)
.with_context(|| {
format!("Failed to import pgdatadir for timeline {tenant_id}/{timeline_id}")
})?;
// Flush loop needs to be spawned in order for checkpoint to be able to flush.
// We want to run proper checkpoint before we mark timeline as available to outside world
// Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock
unfinished_timeline.maybe_spawn_flush_loop();
fail::fail_point!("before-checkpoint-new-timeline", |_| {
anyhow::bail!("failpoint before-checkpoint-new-timeline");
});
unfinished_timeline
.checkpoint(CheckpointConfig::Forced).await
.checkpoint(CheckpointConfig::Forced)
.with_context(|| format!("Failed to checkpoint after pgdatadir import for timeline {tenant_id}/{timeline_id}"))?;
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
raw_timeline.initialize_with_lock(&mut timelines, false)?
};
let mut timelines = self.timelines.lock().unwrap();
let timeline = raw_timeline.initialize_with_lock(&mut timelines, false)?;
drop(timelines);
info!(
"created root timeline {} timeline.lsn {}",
@@ -1371,7 +1304,7 @@ impl Tenant {
Ok(UninitializedTimeline {
owning_tenant: self,
timeline_id: new_timeline_id,
raw_timeline: Some((Arc::new(new_timeline), uninit_mark)),
raw_timeline: Some((new_timeline, uninit_mark)),
})
}
Err(e) => {
@@ -1490,7 +1423,7 @@ impl Tenant {
let timeline = UninitializedTimeline {
owning_tenant: self,
timeline_id,
raw_timeline: Some((Arc::new(dummy_timeline), TimelineUninitMark::dummy())),
raw_timeline: Some((dummy_timeline, TimelineUninitMark::dummy())),
};
match timeline.initialize_with_lock(&mut timelines_accessor, true) {
Ok(initialized_timeline) => {
@@ -1511,25 +1444,6 @@ impl Tenant {
Ok(())
}
/// Gathers inputs from all of the timelines to produce a sizing model input.
///
/// Future is cancellation safe. Only one calculation can be running at once per tenant.
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
pub async fn gather_size_inputs(&self) -> anyhow::Result<size::ModelInputs> {
let logical_sizes_at_once = self
.conf
.concurrent_tenant_size_logical_size_queries
.inner();
// TODO: Having a single mutex block concurrent reads is unfortunate, but since the queries
// are for testing/experimenting, we tolerate this.
//
// See more for on the issue #2748 condenced out of the initial PR review.
let mut shared_cache = self.cached_logical_sizes.lock().await;
size::gather_inputs(self, logical_sizes_at_once, &mut *shared_cache).await
}
}
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
@@ -1673,7 +1587,6 @@ pub mod harness {
walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
trace_read_requests: Some(tenant_conf.trace_read_requests),
}
}
}
@@ -1945,7 +1858,7 @@ mod tests {
Ok(())
}
async fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
let mut lsn = start_lsn;
#[allow(non_snake_case)]
{
@@ -1966,7 +1879,7 @@ mod tests {
writer.finish_write(lsn);
lsn += 0x10;
}
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.checkpoint(CheckpointConfig::Forced)?;
{
let writer = tline.writer();
writer.put(
@@ -1983,26 +1896,24 @@ mod tests {
)?;
writer.finish_write(lsn);
}
tline.checkpoint(CheckpointConfig::Forced).await
tline.checkpoint(CheckpointConfig::Forced)
}
#[tokio::test]
async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
#[test]
fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
.initialize()?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
// FIXME: this doesn't actually remove any layer currently, given how the checkpointing
// and compaction works. But it does set the 'cutoff' point so that the cross check
// below should fail.
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
.await?;
tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
// try to branch at lsn 25, should fail because we already garbage collected the data
match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) {
@@ -2047,14 +1958,14 @@ mod tests {
/*
// FIXME: This currently fails to error out. Calling GC doesn't currently
// remove the old value, we'd need to work a little harder
#[tokio::test]
async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
#[test]
fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
let repo =
RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
@@ -2067,47 +1978,43 @@ mod tests {
}
*/
#[tokio::test]
async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
#[test]
fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
let tenant =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
.initialize()?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
.await?;
tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok());
Ok(())
}
#[tokio::test]
async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
#[test]
fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
let tenant =
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
.initialize()?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
// run gc on parent
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)
.await?;
tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
// Check that the data is still accessible on the branch.
assert_eq!(
@@ -2118,8 +2025,8 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn timeline_load() -> anyhow::Result<()> {
#[test]
fn timeline_load() -> anyhow::Result<()> {
const TEST_NAME: &str = "timeline_load";
let harness = TenantHarness::create(TEST_NAME)?;
{
@@ -2127,8 +2034,8 @@ mod tests {
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION)?
.initialize()?;
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
tline.checkpoint(CheckpointConfig::Forced).await?;
make_some_layers(tline.as_ref(), Lsn(0x8000))?;
tline.checkpoint(CheckpointConfig::Forced)?;
}
let tenant = harness.load();
@@ -2139,8 +2046,8 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
#[test]
fn timeline_load_with_ancestor() -> anyhow::Result<()> {
const TEST_NAME: &str = "timeline_load_with_ancestor";
let harness = TenantHarness::create(TEST_NAME)?;
// create two timelines
@@ -2150,8 +2057,8 @@ mod tests {
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
.initialize()?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tline.checkpoint(CheckpointConfig::Forced).await?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
tline.checkpoint(CheckpointConfig::Forced)?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
@@ -2159,8 +2066,8 @@ mod tests {
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
tline.checkpoint(CheckpointConfig::Forced).await?;
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
tline.checkpoint(CheckpointConfig::Forced)?;
}
// check that both of them are initially unloaded
@@ -2220,8 +2127,8 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_images() -> anyhow::Result<()> {
#[test]
fn test_images() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_images")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2232,7 +2139,7 @@ mod tests {
writer.finish_write(Lsn(0x10));
drop(writer);
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
let writer = tline.writer();
@@ -2240,7 +2147,7 @@ mod tests {
writer.finish_write(Lsn(0x20));
drop(writer);
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
let writer = tline.writer();
@@ -2248,7 +2155,7 @@ mod tests {
writer.finish_write(Lsn(0x30));
drop(writer);
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
let writer = tline.writer();
@@ -2256,7 +2163,7 @@ mod tests {
writer.finish_write(Lsn(0x40));
drop(writer);
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
assert_eq!(tline.get(*TEST_KEY, Lsn(0x10))?, TEST_IMG("foo at 0x10"));
@@ -2272,8 +2179,8 @@ mod tests {
// Insert 1000 key-value pairs with increasing keys, checkpoint,
// repeat 50 times.
//
#[tokio::test]
async fn test_bulk_insert() -> anyhow::Result<()> {
#[test]
fn test_bulk_insert() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_bulk_insert")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2306,7 +2213,7 @@ mod tests {
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.gc()?;
}
@@ -2314,8 +2221,8 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_random_updates() -> anyhow::Result<()> {
#[test]
fn test_random_updates() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_random_updates")?.load();
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2378,7 +2285,7 @@ mod tests {
println!("checkpointing {}", lsn);
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.gc()?;
}
@@ -2386,8 +2293,8 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_traverse_branches() -> anyhow::Result<()> {
#[test]
fn test_traverse_branches() -> anyhow::Result<()> {
let tenant = TenantHarness::create("test_traverse_branches")?.load();
let mut tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?
@@ -2459,7 +2366,7 @@ mod tests {
println!("checkpointing {}", lsn);
let cutoff = tline.get_last_record_lsn();
tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?;
tline.checkpoint(CheckpointConfig::Forced).await?;
tline.checkpoint(CheckpointConfig::Forced)?;
tline.compact()?;
tline.gc()?;
}

View File

@@ -1,475 +0,0 @@
use std::cmp;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use anyhow::Context;
use tokio::sync::Semaphore;
use super::Tenant;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use tracing::*;
/// Inputs to the actual tenant sizing model
///
/// Implements [`serde::Serialize`] but is not meant to be part of the public API, instead meant to
/// be a transferrable format between execution environments and developer.
#[serde_with::serde_as]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct ModelInputs {
updates: Vec<Update>,
retention_period: u64,
#[serde_as(as = "HashMap<serde_with::DisplayFromStr, _>")]
timeline_inputs: HashMap<TimelineId, TimelineInputs>,
}
/// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
/// part of [`ModelInputs`] from the HTTP api, explaining the inputs.
#[serde_with::serde_as]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct TimelineInputs {
#[serde_as(as = "serde_with::DisplayFromStr")]
last_record: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
latest_gc_cutoff: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
horizon_cutoff: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
pitr_cutoff: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
next_gc_cutoff: Lsn,
}
/// Gathers the inputs for the tenant sizing model.
///
/// Tenant size does not consider the latest state, but only the state until next_gc_cutoff, which
/// is updated on-demand, during the start of this calculation and separate from the
/// [`Timeline::latest_gc_cutoff`].
///
/// For timelines in general:
///
/// ```ignore
/// 0-----|---------|----|------------| · · · · · |·> lsn
/// initdb_lsn branchpoints* next_gc_cutoff latest
/// ```
///
/// Until gc_horizon_cutoff > `Timeline::last_record_lsn` for any of the tenant's timelines, the
/// tenant size will be zero.
pub(super) async fn gather_inputs(
tenant: &Tenant,
limit: &Arc<Semaphore>,
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
) -> anyhow::Result<ModelInputs> {
// with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
// our advantage with `?` error handling.
let mut joinset = tokio::task::JoinSet::new();
let timelines = tenant
.refresh_gc_info()
.context("Failed to refresh gc_info before gathering inputs")?;
if timelines.is_empty() {
// All timelines are below tenant's gc_horizon; alternative would be to use
// Tenant::list_timelines but then those gc_info's would not be updated yet, possibly
// missing GcInfo::retain_lsns or having obsolete values for cutoff's.
return Ok(ModelInputs {
updates: vec![],
retention_period: 0,
timeline_inputs: HashMap::new(),
});
}
// record the used/inserted cache keys here, to remove extras not to start leaking
// after initial run the cache should be quite stable, but live timelines will eventually
// require new lsns to be inspected.
let mut needed_cache = HashSet::<(TimelineId, Lsn)>::new();
let mut updates = Vec::new();
// record the per timline values used to determine `retention_period`
let mut timeline_inputs = HashMap::with_capacity(timelines.len());
// used to determine the `retention_period` for the size model
let mut max_cutoff_distance = None;
// this will probably conflict with on-demand downloaded layers, or at least force them all
// to be downloaded
for timeline in timelines {
let last_record_lsn = timeline.get_last_record_lsn();
let (interesting_lsns, horizon_cutoff, pitr_cutoff, next_gc_cutoff) = {
// there's a race between the update (holding tenant.gc_lock) and this read but it
// might not be an issue, because it's not for Timeline::gc
let gc_info = timeline.gc_info.read().unwrap();
// similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a
// new gc run, which we have no control over. however differently from `Timeline::gc`
// we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not
// actually removing files.
let next_gc_cutoff = cmp::min(gc_info.horizon_cutoff, gc_info.pitr_cutoff);
// the minimum where we should find the next_gc_cutoff for our calculations.
//
// next_gc_cutoff in parent branch are not of interest (right now at least), nor do we
// want to query any logical size before initdb_lsn.
let cutoff_minimum = cmp::max(timeline.get_ancestor_lsn(), timeline.initdb_lsn);
let maybe_cutoff = if next_gc_cutoff > cutoff_minimum {
Some((next_gc_cutoff, LsnKind::GcCutOff))
} else {
None
};
// this assumes there are no other lsns than the branchpoints
let lsns = gc_info
.retain_lsns
.iter()
.inspect(|&&lsn| {
trace!(
timeline_id=%timeline.timeline_id,
"retained lsn: {lsn:?}, is_before_ancestor_lsn={}",
lsn < timeline.get_ancestor_lsn()
)
})
.filter(|&&lsn| lsn > timeline.get_ancestor_lsn())
.copied()
.map(|lsn| (lsn, LsnKind::BranchPoint))
.chain(maybe_cutoff)
.collect::<Vec<_>>();
(
lsns,
gc_info.horizon_cutoff,
gc_info.pitr_cutoff,
next_gc_cutoff,
)
};
// update this to have a retention_period later for the tenant_size_model
// tenant_size_model compares this to the last segments start_lsn
if let Some(cutoff_distance) = last_record_lsn.checked_sub(next_gc_cutoff) {
match max_cutoff_distance.as_mut() {
Some(max) => {
*max = std::cmp::max(*max, cutoff_distance);
}
_ => {
max_cutoff_distance = Some(cutoff_distance);
}
}
}
// all timelines branch from something, because it might be impossible to pinpoint
// which is the tenant_size_model's "default" branch.
updates.push(Update {
lsn: timeline.get_ancestor_lsn(),
command: Command::BranchFrom(timeline.get_ancestor_timeline_id()),
timeline_id: timeline.timeline_id,
});
for (lsn, _kind) in &interesting_lsns {
if let Some(size) = logical_size_cache.get(&(timeline.timeline_id, *lsn)) {
updates.push(Update {
lsn: *lsn,
timeline_id: timeline.timeline_id,
command: Command::Update(*size),
});
needed_cache.insert((timeline.timeline_id, *lsn));
} else {
let timeline = Arc::clone(&timeline);
let parallel_size_calcs = Arc::clone(limit);
joinset.spawn(calculate_logical_size(parallel_size_calcs, timeline, *lsn));
}
}
timeline_inputs.insert(
timeline.timeline_id,
TimelineInputs {
last_record: last_record_lsn,
// this is not used above, because it might not have updated recently enough
latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(),
horizon_cutoff,
pitr_cutoff,
next_gc_cutoff,
},
);
}
let mut have_any_error = false;
while let Some(res) = joinset.join_next().await {
// each of these come with Result<Result<_, JoinError>, JoinError>
// because of spawn + spawn_blocking
let res = res.and_then(|inner| inner);
match res {
Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size))) => {
debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
logical_size_cache.insert((timeline.timeline_id, lsn), size);
needed_cache.insert((timeline.timeline_id, lsn));
updates.push(Update {
lsn,
timeline_id: timeline.timeline_id,
command: Command::Update(size),
});
}
Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error))) => {
warn!(
timeline_id=%timeline.timeline_id,
"failed to calculate logical size at {lsn}: {error:#}"
);
have_any_error = true;
}
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the futures, nor should be");
}
Err(join_error) => {
// cannot really do anything, as this panic is likely a bug
error!("logical size query panicked: {join_error:#}");
have_any_error = true;
}
}
}
// prune any keys not needed anymore; we record every used key and added key.
logical_size_cache.retain(|key, _| needed_cache.contains(key));
if have_any_error {
// we cannot complete this round, because we are missing data.
// we have however cached all we were able to request calculation on.
anyhow::bail!("failed to calculate some logical_sizes");
}
// the data gathered to updates is per lsn, regardless of the branch, so we can use it to
// our advantage, not requiring a sorted container or graph walk.
//
// for branch points, which come as multiple updates at the same LSN, the Command::Update
// is needed before a branch is made out of that branch Command::BranchFrom. this is
// handled by the variant order in `Command`.
updates.sort_unstable();
let retention_period = match max_cutoff_distance {
Some(max) => max.0,
None => {
anyhow::bail!("the first branch should have a gc_cutoff after it's branch point at 0")
}
};
Ok(ModelInputs {
updates,
retention_period,
timeline_inputs,
})
}
impl ModelInputs {
pub fn calculate(&self) -> anyhow::Result<u64> {
// Option<TimelineId> is used for "naming" the branches because it is assumed to be
// impossible to always determine the a one main branch.
let mut storage = tenant_size_model::Storage::<Option<TimelineId>>::new(None);
// tracking these not to require modifying the current implementation of the size model,
// which works in relative LSNs and sizes.
let mut last_state: HashMap<TimelineId, (Lsn, u64)> = HashMap::new();
for update in &self.updates {
let Update {
lsn,
command: op,
timeline_id,
} = update;
match op {
Command::Update(sz) => {
let latest = last_state.get_mut(timeline_id).ok_or_else(|| {
anyhow::anyhow!(
"ordering-mismatch: there must had been a previous state for {timeline_id}"
)
})?;
let lsn_bytes = {
let Lsn(now) = lsn;
let Lsn(prev) = latest.0;
debug_assert!(prev <= *now, "self.updates should had been sorted");
now - prev
};
let size_diff =
i64::try_from(*sz as i128 - latest.1 as i128).with_context(|| {
format!("size difference i64 overflow for {timeline_id}")
})?;
storage.modify_branch(&Some(*timeline_id), "".into(), lsn_bytes, size_diff);
*latest = (*lsn, *sz);
}
Command::BranchFrom(parent) => {
storage.branch(parent, Some(*timeline_id));
let size = parent
.as_ref()
.and_then(|id| last_state.get(id))
.map(|x| x.1)
.unwrap_or(0);
last_state.insert(*timeline_id, (*lsn, size));
}
}
}
Ok(storage.calculate(self.retention_period).total_children())
}
}
/// Single size model update.
///
/// Sizing model works with relative increments over latest branch state.
/// Updates are absolute, so additional state needs to be tracked when applying.
#[serde_with::serde_as]
#[derive(
Debug, PartialEq, PartialOrd, Eq, Ord, Clone, Copy, serde::Serialize, serde::Deserialize,
)]
struct Update {
#[serde_as(as = "serde_with::DisplayFromStr")]
lsn: utils::lsn::Lsn,
command: Command,
#[serde_as(as = "serde_with::DisplayFromStr")]
timeline_id: TimelineId,
}
#[serde_with::serde_as]
#[derive(PartialOrd, PartialEq, Eq, Ord, Clone, Copy, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
enum Command {
Update(u64),
BranchFrom(#[serde_as(as = "Option<serde_with::DisplayFromStr>")] Option<TimelineId>),
}
impl std::fmt::Debug for Command {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// custom one-line implementation makes it more enjoyable to read {:#?} avoiding 3
// linebreaks
match self {
Self::Update(arg0) => write!(f, "Update({arg0})"),
Self::BranchFrom(arg0) => write!(f, "BranchFrom({arg0:?})"),
}
}
}
#[derive(Debug, Clone, Copy)]
enum LsnKind {
BranchPoint,
GcCutOff,
}
/// Newtype around the tuple that carries the timeline at lsn logical size calculation.
struct TimelineAtLsnSizeResult(
Arc<crate::tenant::Timeline>,
utils::lsn::Lsn,
anyhow::Result<u64>,
);
#[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
async fn calculate_logical_size(
limit: Arc<tokio::sync::Semaphore>,
timeline: Arc<crate::tenant::Timeline>,
lsn: utils::lsn::Lsn,
) -> Result<TimelineAtLsnSizeResult, tokio::task::JoinError> {
let permit = tokio::sync::Semaphore::acquire_owned(limit)
.await
.expect("global semaphore should not had been closed");
tokio::task::spawn_blocking(move || {
let _permit = permit;
let size_res = timeline.calculate_logical_size(lsn);
TimelineAtLsnSizeResult(timeline, lsn, size_res)
})
.await
}
#[test]
fn updates_sort() {
use std::str::FromStr;
use utils::id::TimelineId;
use utils::lsn::Lsn;
let ids = [
TimelineId::from_str("7ff1edab8182025f15ae33482edb590a").unwrap(),
TimelineId::from_str("b1719e044db05401a05a2ed588a3ad3f").unwrap(),
TimelineId::from_str("b68d6691c895ad0a70809470020929ef").unwrap(),
];
// try through all permutations
let ids = [
[&ids[0], &ids[1], &ids[2]],
[&ids[0], &ids[2], &ids[1]],
[&ids[1], &ids[0], &ids[2]],
[&ids[1], &ids[2], &ids[0]],
[&ids[2], &ids[0], &ids[1]],
[&ids[2], &ids[1], &ids[0]],
];
for ids in ids {
// apply a fixture which uses a permutation of ids
let commands = [
Update {
lsn: Lsn(0),
command: Command::BranchFrom(None),
timeline_id: *ids[0],
},
Update {
lsn: Lsn::from_str("0/67E7618").unwrap(),
command: Command::Update(43696128),
timeline_id: *ids[0],
},
Update {
lsn: Lsn::from_str("0/67E7618").unwrap(),
command: Command::BranchFrom(Some(*ids[0])),
timeline_id: *ids[1],
},
Update {
lsn: Lsn::from_str("0/76BE4F0").unwrap(),
command: Command::Update(41844736),
timeline_id: *ids[1],
},
Update {
lsn: Lsn::from_str("0/10E49380").unwrap(),
command: Command::Update(42164224),
timeline_id: *ids[0],
},
Update {
lsn: Lsn::from_str("0/10E49380").unwrap(),
command: Command::BranchFrom(Some(*ids[0])),
timeline_id: *ids[2],
},
Update {
lsn: Lsn::from_str("0/11D74910").unwrap(),
command: Command::Update(42172416),
timeline_id: *ids[2],
},
Update {
lsn: Lsn::from_str("0/12051E98").unwrap(),
command: Command::Update(42196992),
timeline_id: *ids[0],
},
];
let mut sorted = commands;
// these must sort in the same order, regardless of how the ids sort
// which is why the timeline_id is the last field
sorted.sort_unstable();
assert_eq!(commands, sorted, "{:#?} vs. {:#?}", commands, sorted);
}
}
#[test]
fn verify_size_for_multiple_branches() {
// this is generated from integration test test_tenant_size_with_multiple_branches, but this way
// it has the stable lsn's
let doc = r#"{"updates":[{"lsn":"0/0","command":{"branch_from":null},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"update":25763840},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/1819818","command":{"update":26075136},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/18B5E40","command":{"update":26427392},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"update":26492928},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"230fc9d756f7363574c0d66533564dcc"},{"lsn":"0/220F438","command":{"update":25239552},"timeline_id":"230fc9d756f7363574c0d66533564dcc"}],"retention_period":131072,"timeline_inputs":{"cd9d9409c216e64bf580904facedb01b":{"last_record":"0/18D5E40","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/18B5E40","pitr_cutoff":"0/18B5E40","next_gc_cutoff":"0/18B5E40"},"10b532a550540bc15385eac4edde416a":{"last_record":"0/1839818","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/1819818","pitr_cutoff":"0/1819818","next_gc_cutoff":"0/1819818"},"230fc9d756f7363574c0d66533564dcc":{"last_record":"0/222F438","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/220F438","pitr_cutoff":"0/220F438","next_gc_cutoff":"0/220F438"}}}"#;
let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
assert_eq!(inputs.calculate().unwrap(), 36_409_872);
}

View File

@@ -16,7 +16,7 @@ use std::fs;
use std::ops::{Deref, Range};
use std::path::PathBuf;
use std::sync::atomic::{self, AtomicBool, AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError};
use std::time::{Duration, Instant, SystemTime};
use crate::tenant::{
@@ -121,16 +121,8 @@ pub struct Timeline {
/// to avoid deadlock.
write_lock: Mutex<()>,
/// Used to avoid multiple `flush_loop` tasks running
flush_loop_started: Mutex<bool>,
/// layer_flush_start_tx can be used to wake up the layer-flushing task.
/// The value is a counter, incremented every time a new flush cycle is requested.
/// The flush cycle counter is sent back on the layer_flush_done channel when
/// the flush finishes. You can use that to wait for the flush to finish.
layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>,
/// Used to ensure that there is only task performing flushing at a time
layer_flush_lock: Mutex<()>,
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
@@ -280,11 +272,6 @@ impl LogicalSize {
self.size_added_after_initial
.fetch_add(delta, AtomicOrdering::SeqCst);
}
/// Returns the initialized (already calculated) value, if any.
fn initialized_size(&self) -> Option<u64> {
self.initial_logical_size.get().copied()
}
}
pub struct WalReceiverInfo {
@@ -474,16 +461,15 @@ impl Timeline {
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
pub async fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> {
pub fn checkpoint(&self, cconf: CheckpointConfig) -> anyhow::Result<()> {
match cconf {
CheckpointConfig::Flush => {
self.freeze_inmem_layer(false);
self.flush_frozen_layers_and_wait().await
self.flush_frozen_layers(true)
}
CheckpointConfig::Forced => {
self.freeze_inmem_layer(false);
self.flush_frozen_layers_and_wait().await?;
self.flush_frozen_layers(true)?;
self.compact()
}
}
@@ -633,8 +619,24 @@ impl Timeline {
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
// Wake up the layer flusher
self.flush_frozen_layers();
// Launch a task to flush the frozen layer to disk, unless
// a task was already running. (If the task was running
// at the time that we froze the layer, it must've seen the
// the layer we just froze before it exited; see comments
// in flush_frozen_layers())
if let Ok(guard) = self.layer_flush_lock.try_lock() {
drop(guard);
let self_clone = Arc::clone(self);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
Some(self.tenant_id),
Some(self.timeline_id),
"layer flush task",
false,
async move { self_clone.flush_frozen_layers(false) },
);
}
}
}
Ok(())
@@ -725,9 +727,6 @@ impl Timeline {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let (state, _) = watch::channel(TimelineState::Suspended);
let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
let mut result = Timeline {
conf,
tenant_conf,
@@ -755,12 +754,8 @@ impl Timeline {
upload_layers: AtomicBool::new(upload_layers),
flush_loop_started: Mutex::new(false),
layer_flush_start_tx,
layer_flush_done_tx,
write_lock: Mutex::new(()),
layer_flush_lock: Mutex::new(()),
layer_removal_cs: Mutex::new(()),
gc_info: RwLock::new(GcInfo {
@@ -793,33 +788,6 @@ impl Timeline {
result
}
pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
let mut flush_loop_started = self.flush_loop_started.lock().unwrap();
if *flush_loop_started {
info!(
"skipping attempt to start flush_loop twice {}/{}",
self.tenant_id, self.timeline_id
);
return;
}
let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
let self_clone = Arc::clone(self);
info!("spawning flush loop");
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::LayerFlushTask,
Some(self.tenant_id),
Some(self.timeline_id),
"layer flush task",
false,
async move { self_clone.flush_loop(layer_flush_start_rx).await; Ok(()) }
.instrument(info_span!(parent: None, "layer flush task", tenant = %self.tenant_id, timeline = %self.timeline_id))
);
*flush_loop_started = true;
}
pub(super) fn launch_wal_receiver(self: &Arc<Self>) {
if !is_etcd_client_initialized() {
if cfg!(test) {
@@ -1011,26 +979,9 @@ impl Timeline {
/// Calculate the logical size of the database at the latest LSN.
///
/// NOTE: counted incrementally, includes ancestors, this can be a slow operation.
pub fn calculate_logical_size(&self, up_to_lsn: Lsn) -> anyhow::Result<u64> {
info!(
"Calculating logical size for timeline {} at {}",
self.timeline_id, up_to_lsn
);
let timer = if up_to_lsn == self.initdb_lsn {
if let Some(size) = self.current_logical_size.initialized_size() {
if size != 0 {
// non-zero size means that the size has already been calculated by this method
// after startup. if the logical size is for a new timeline without layers the
// size will be zero, and we cannot use that, or this caching strategy until
// pageserver restart.
return Ok(size);
}
}
self.metrics.init_logical_size_histo.start_timer()
} else {
self.metrics.logical_size_histo.start_timer()
};
fn calculate_logical_size(&self, up_to_lsn: Lsn) -> anyhow::Result<u64> {
info!("Calculating logical size for timeline {}", self.timeline_id);
let timer = self.metrics.init_logical_size_histo.start_timer();
let logical_size = self.get_current_logical_size_non_incremental(up_to_lsn)?;
debug!("calculated logical size: {logical_size}");
timer.stop_and_record();
@@ -1316,94 +1267,53 @@ impl Timeline {
drop(layers);
}
/// Layer flusher task's main loop.
async fn flush_loop(&self, mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>) {
info!("started flush loop");
loop {
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
info!("shutting down layer flush task");
break;
},
_ = layer_flush_start_rx.changed() => {}
/// Flush all frozen layers to disk.
///
/// Only one task at a time can be doing layer-flushing for a
/// given timeline. If 'wait' is true, and another task is
/// currently doing the flushing, this function will wait for it
/// to finish. If 'wait' is false, this function will return
/// immediately instead.
fn flush_frozen_layers(&self, wait: bool) -> anyhow::Result<()> {
let flush_lock_guard = if wait {
self.layer_flush_lock.lock().unwrap()
} else {
match self.layer_flush_lock.try_lock() {
Ok(guard) => guard,
Err(TryLockError::WouldBlock) => return Ok(()),
Err(TryLockError::Poisoned(err)) => panic!("{:?}", err),
}
};
trace!("waking up");
let timer = self.metrics.flush_time_histo.start_timer();
let flush_counter = *layer_flush_start_rx.borrow();
let result = loop {
let layer_to_flush = {
let layers = self.layers.read().unwrap();
layers.frozen_layers.front().cloned()
// drop 'layers' lock to allow concurrent reads and writes
};
if let Some(layer_to_flush) = layer_to_flush {
if let Err(err) = self.flush_frozen_layer(layer_to_flush).await {
error!("could not flush frozen layer: {err:?}");
break Err(err);
}
continue;
} else {
break Ok(());
}
};
// Notify any listeners that we're done
let _ = self
.layer_flush_done_tx
.send_replace((flush_counter, result));
timer.stop_and_record();
}
}
async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
let mut rx = self.layer_flush_done_tx.subscribe();
// Increment the flush cycle counter and wake up the flush task.
// Remember the new value, so that when we listen for the flush
// to finish, we know when the flush that we initiated has
// finished, instead of some other flush that was started earlier.
let mut my_flush_request = 0;
if !&*self.flush_loop_started.lock().unwrap() {
anyhow::bail!("cannot flush frozen layers when flush_loop is not running")
}
self.layer_flush_start_tx.send_modify(|counter| {
my_flush_request = *counter + 1;
*counter = my_flush_request;
});
let timer = self.metrics.flush_time_histo.start_timer();
loop {
{
let (last_result_counter, last_result) = &*rx.borrow();
if *last_result_counter >= my_flush_request {
if let Err(_err) = last_result {
// We already logged the original error in
// flush_loop. We cannot propagate it to the caller
// here, because it might not be Cloneable
anyhow::bail!(
"Could not flush frozen layer. Request id: {}",
my_flush_request
);
} else {
return Ok(());
}
}
let layers = self.layers.read().unwrap();
if let Some(frozen_layer) = layers.frozen_layers.front() {
let frozen_layer = Arc::clone(frozen_layer);
drop(layers); // to allow concurrent reads and writes
self.flush_frozen_layer(frozen_layer)?;
} else {
// Drop the 'layer_flush_lock' *before* 'layers'. That
// way, if you freeze a layer, and then call
// flush_frozen_layers(false), it is guaranteed that
// if another thread was busy flushing layers and the
// call therefore returns immediately, the other
// thread will have seen the newly-frozen layer and
// will flush that too (assuming no errors).
drop(flush_lock_guard);
drop(layers);
break;
}
trace!("waiting for flush to complete");
rx.changed().await?;
trace!("done")
}
}
fn flush_frozen_layers(&self) {
self.layer_flush_start_tx.send_modify(|val| *val += 1);
timer.stop_and_record();
Ok(())
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
#[instrument(skip(self, frozen_layer), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.filename().display()))]
async fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> anyhow::Result<()> {
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> anyhow::Result<()> {
// As a special case, when we have just imported an image into the repository,
// instead of writing out a L0 delta layer, we directly write out image layer
// files instead. This is possible as long as *all* the data imported into the
@@ -2333,10 +2243,13 @@ impl Timeline {
let last_rec_lsn = data.records.last().unwrap().0;
let img = self
.walredo_mgr
.request_redo(key, request_lsn, base_img, data.records, self.pg_version)
.context("Failed to reconstruct a page image:")?;
let img = self.walredo_mgr.request_redo(
key,
request_lsn,
base_img,
data.records,
self.pg_version,
)?;
if img.len() == page_cache::PAGE_SZ {
let cache = page_cache::get();

View File

@@ -82,7 +82,6 @@ pub struct TenantConf {
/// A lagging safekeeper will be changed after `lagging_wal_timeout` time elapses since the last WAL update,
/// to avoid eager reconnects.
pub max_lsn_wal_lag: NonZeroU64,
pub trace_read_requests: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -106,7 +105,6 @@ pub struct TenantConfOpt {
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Option<Duration>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub trace_read_requests: Option<bool>,
}
impl TenantConfOpt {
@@ -140,9 +138,6 @@ impl TenantConfOpt {
.lagging_wal_timeout
.unwrap_or(global_conf.lagging_wal_timeout),
max_lsn_wal_lag: self.max_lsn_wal_lag.unwrap_or(global_conf.max_lsn_wal_lag),
trace_read_requests: self
.trace_read_requests
.unwrap_or(global_conf.trace_read_requests),
}
}
@@ -212,7 +207,6 @@ impl TenantConf {
.expect("cannot parse default walreceiver lagging wal timeout"),
max_lsn_wal_lag: NonZeroU64::new(DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.expect("cannot parse default max walreceiver Lsn wal lag"),
trace_read_requests: false,
}
}
@@ -238,7 +232,6 @@ impl TenantConf {
.unwrap(),
max_lsn_wal_lag: NonZeroU64::new(defaults::DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG)
.unwrap(),
trace_read_requests: false,
}
}
}

View File

@@ -241,7 +241,7 @@ pub async fn shutdown_all_tenants() {
let tenant_id = tenant.tenant_id();
debug!("shutdown tenant {tenant_id}");
if let Err(err) = tenant.checkpoint().await {
if let Err(err) = tenant.checkpoint() {
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
}
}

View File

@@ -119,7 +119,7 @@ async fn gc_loop(tenant_id: TenantId) {
let gc_horizon = tenant.get_gc_horizon();
let mut sleep_duration = gc_period;
if gc_horizon > 0 {
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false).await
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false)
{
sleep_duration = wait_duration;
error!("Gc failed, retrying in {:?}: {e:#}", sleep_duration);

View File

@@ -1,36 +0,0 @@
use bytes::Bytes;
use std::{
fs::{create_dir_all, File},
io::{BufWriter, Write},
path::PathBuf,
};
pub struct Tracer {
writer: BufWriter<File>,
}
impl Drop for Tracer {
fn drop(&mut self) {
self.flush()
}
}
impl Tracer {
pub fn new(path: PathBuf) -> Self {
let parent = path.parent().expect("failed to parse parent path");
create_dir_all(parent).expect("failed to create trace dir");
let file = File::create(path).expect("failed to create trace file");
Tracer {
writer: BufWriter::new(file),
}
}
pub fn trace(&mut self, msg: &Bytes) {
self.writer.write_all(msg).expect("failed to write trace");
}
pub fn flush(&mut self) {
self.writer.flush().expect("failed to flush trace file");
}
}

View File

@@ -31,8 +31,8 @@ use crate::{
walrecord::DecodedWALRecord,
};
use postgres_ffi::waldecoder::WalStreamDecoder;
use pq_proto::ReplicationFeedback;
use utils::{id::TenantTimelineId, lsn::Lsn};
use utils::id::TenantTimelineId;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback};
/// Status of the connection.
#[derive(Debug, Clone)]

View File

@@ -22,10 +22,10 @@ use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use serde::Serialize;
use std::fs;
use std::fs::OpenOptions;
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
use std::ops::{Deref, DerefMut};
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::CommandExt;
use std::path::PathBuf;
@@ -34,7 +34,6 @@ use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use std::{fs, io};
use tracing::*;
use utils::crashsafe::path_with_suffix_extension;
use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
@@ -45,7 +44,6 @@ use crate::metrics::{
};
use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
use crate::repository::Key;
use crate::task_mgr::BACKGROUND_RUNTIME;
use crate::walrecord::NeonWalRecord;
use crate::{config::PageServerConf, TEMP_FILE_SUFFIX};
use pageserver_api::reltag::{RelTag, SlruKind};
@@ -231,7 +229,7 @@ impl PostgresRedoManager {
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?;
let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
@@ -581,8 +579,7 @@ impl<C: CommandExt> CloseFileDescriptors for C {
/// Handle to the Postgres WAL redo process
///
struct PostgresRedoProcess {
tenant_id: TenantId,
child: NoLeakChild,
child: Child,
stdin: ChildStdin,
stdout: ChildStdout,
stderr: ChildStderr,
@@ -592,17 +589,16 @@ impl PostgresRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))]
fn launch(
conf: &PageServerConf,
tenant_id: TenantId,
tenant_id: &TenantId,
pg_version: u32,
) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
let datadir = path_with_suffix_extension(
conf.tenant_path(&tenant_id).join("wal-redo-datadir"),
conf.tenant_path(tenant_id).join("wal-redo-datadir"),
TEMP_FILE_SUFFIX,
);
@@ -657,7 +653,7 @@ impl PostgresRedoProcess {
}
// Start postgres itself
let child = Command::new(pg_bin_dir_path.join("postgres"))
let mut child = Command::new(pg_bin_dir_path.join("postgres"))
.arg("--wal-redo")
.stdin(Stdio::piped())
.stderr(Stdio::piped())
@@ -676,7 +672,7 @@ impl PostgresRedoProcess {
// as close-on-exec by default, but that's not enough, since we use
// libraries that directly call libc open without setting that flag.
.close_fds()
.spawn_no_leak_child()
.spawn()
.map_err(|e| {
Error::new(
e.kind(),
@@ -684,33 +680,20 @@ impl PostgresRedoProcess {
)
})?;
let mut child = scopeguard::guard(child, |child| {
error!("killing wal-redo-postgres process due to a problem during launch");
child.kill_and_wait();
});
info!(
"launched WAL redo postgres process on {}",
datadir.display()
);
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
macro_rules! set_nonblock_or_log_err {
($file:ident) => {{
let res = set_nonblock($file.as_raw_fd());
if let Err(e) = &res {
error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
}
res
}};
}
set_nonblock_or_log_err!(stdin)?;
set_nonblock_or_log_err!(stdout)?;
set_nonblock_or_log_err!(stderr)?;
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
set_nonblock(stdin.as_raw_fd())?;
set_nonblock(stdout.as_raw_fd())?;
set_nonblock(stderr.as_raw_fd())?;
Ok(PostgresRedoProcess {
tenant_id,
child,
stdin,
stdout,
@@ -718,16 +701,18 @@ impl PostgresRedoProcess {
})
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))]
fn kill(self) {
self.child.kill_and_wait();
fn kill(mut self) {
let _ = self.child.kill();
if let Ok(exit_status) = self.child.wait() {
error!("wal-redo-postgres exited with code {}", exit_status);
}
drop(self);
}
//
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))]
fn apply_wal_records(
&mut self,
tag: BufferTag,
@@ -853,99 +838,6 @@ impl PostgresRedoProcess {
}
}
/// Wrapper type around `std::process::Child` which guarantees that the child
/// will be killed and waited-for by this process before being dropped.
struct NoLeakChild {
child: Option<Child>,
}
impl Deref for NoLeakChild {
type Target = Child;
fn deref(&self) -> &Self::Target {
self.child.as_ref().expect("must not use from drop")
}
}
impl DerefMut for NoLeakChild {
fn deref_mut(&mut self) -> &mut Self::Target {
self.child.as_mut().expect("must not use from drop")
}
}
impl NoLeakChild {
fn spawn(command: &mut Command) -> io::Result<Self> {
let child = command.spawn()?;
Ok(NoLeakChild { child: Some(child) })
}
fn kill_and_wait(mut self) {
let child = match self.child.take() {
Some(child) => child,
None => return,
};
Self::kill_and_wait_impl(child);
}
#[instrument(skip_all, fields(pid=child.id()))]
fn kill_and_wait_impl(mut child: Child) {
let res = child.kill();
if let Err(e) = res {
// This branch is very unlikely because:
// - We (= pageserver) spawned this process successfully, so, we're allowed to kill it.
// - This is the only place that calls .kill()
// - We consume `self`, so, .kill() can't be called twice.
// - If the process exited by itself or was killed by someone else,
// .kill() will still succeed because we haven't wait()'ed yet.
//
// So, if we arrive here, we have really no idea what happened,
// whether the PID stored in self.child is still valid, etc.
// If this function were fallible, we'd return an error, but
// since it isn't, all we can do is log an error and proceed
// with the wait().
error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
}
match child.wait() {
Ok(exit_status) => {
// log at error level since .kill() is something we only do on errors ATM
error!(exit_status = %exit_status, "wait successful");
}
Err(e) => {
error!(error = %e, "wait error; might leak the child process; it will show as zombie (defunct)");
}
}
}
}
impl Drop for NoLeakChild {
fn drop(&mut self) {
let child = match self.child.take() {
Some(child) => child,
None => return,
};
// Offload the kill+wait of the child process into the background.
// If someone stops the runtime, we'll leak the child process.
// We can ignore that case because we only stop the runtime on pageserver exit.
BACKGROUND_RUNTIME.spawn(async move {
tokio::task::spawn_blocking(move || {
Self::kill_and_wait_impl(child);
})
.await
});
}
}
trait NoLeakChildCommandExt {
fn spawn_no_leak_child(&mut self) -> io::Result<NoLeakChild>;
}
impl NoLeakChildCommandExt for Command {
fn spawn_no_leak_child(&mut self) -> io::Result<NoLeakChild> {
NoLeakChild::spawn(self)
}
}
// Functions for constructing messages to send to the postgres WAL redo
// process. See pgxn/neon_walredo/walredoproc.c for
// explanation of the protocol.

View File

@@ -40,21 +40,8 @@
bool connected = false;
PGconn *pageserver_conn = NULL;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *pageserver_conn_wes = NULL;
char *page_server_connstring_raw;
int n_unflushed_requests = 0;
int flush_every_n_requests = 8;
static void pageserver_flush(void);
static void
pageserver_connect()
{
@@ -71,7 +58,6 @@ pageserver_connect()
PQfinish(pageserver_conn);
pageserver_conn = NULL;
ereport(ERROR,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "could not establish connection to pageserver"),
@@ -87,26 +73,22 @@ pageserver_connect()
neon_log(ERROR, "could not send pagestream command to pageserver");
}
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
while (PQisBusy(pageserver_conn))
{
int wc;
WaitEvent event;
/* Sleep until there's something to do */
wc = WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(pageserver_conn),
-1L, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (event.events & WL_SOCKET_READABLE)
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
{
@@ -114,7 +96,6 @@ pageserver_connect()
PQfinish(pageserver_conn);
pageserver_conn = NULL;
FreeWaitEventSet(pageserver_conn_wes);
neon_log(ERROR, "could not complete handshake with pageserver: %s",
msg);
@@ -131,30 +112,33 @@ pageserver_connect()
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(char **buffer)
call_PQgetCopyData(PGconn *conn, char **buffer)
{
int ret;
retry:
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
ret = PQgetCopyData(conn, buffer, 1 /* async */ );
if (ret == 0)
{
int wc;
WaitEvent event;
/* Sleep until there's something to do */
wc = WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(conn),
-1L, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (event.events & WL_SOCKET_READABLE)
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
if (!PQconsumeInput(conn))
neon_log(ERROR, "could not get response from pageserver: %s",
PQerrorMessage(pageserver_conn));
PQerrorMessage(conn));
}
goto retry;
@@ -180,11 +164,7 @@ pageserver_disconnect(void)
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
prefetch_on_ps_disconnect();
}
if (pageserver_conn_wes != NULL)
FreeWaitEventSet(pageserver_conn_wes);
}
static void
@@ -194,7 +174,11 @@ pageserver_send(NeonRequest * request)
/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
pageserver_disconnect();
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
}
if (!connected)
pageserver_connect();
@@ -218,11 +202,6 @@ pageserver_send(NeonRequest * request)
}
pfree(req_buff.data);
n_unflushed_requests++;
if (flush_every_n_requests > 0 && n_unflushed_requests >= flush_every_n_requests)
pageserver_flush();
if (message_level_is_interesting(PageStoreTrace))
{
char *msg = nm_to_string((NeonMessage *) request);
@@ -241,7 +220,7 @@ pageserver_receive(void)
PG_TRY();
{
/* read response */
resp_buff.len = call_PQgetCopyData(&resp_buff.data);
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
resp_buff.cursor = 0;
if (resp_buff.len < 0)
@@ -276,21 +255,25 @@ pageserver_receive(void)
static void
pageserver_flush(void)
{
if (!connected)
{
neon_log(WARNING, "Tried to flush while disconnected");
}
else if (PQflush(pageserver_conn))
if (PQflush(pageserver_conn))
{
char *msg = PQerrorMessage(pageserver_conn);
pageserver_disconnect();
neon_log(ERROR, "failed to flush page requests: %s", msg);
}
n_unflushed_requests = 0;
}
static NeonResponse *
pageserver_call(NeonRequest * request)
{
pageserver_send(request);
pageserver_flush();
return pageserver_receive();
}
page_server_api api = {
.request = pageserver_call,
.send = pageserver_send,
.flush = pageserver_flush,
.receive = pageserver_receive
@@ -444,14 +427,6 @@ pg_init_libpagestore(void)
PGC_SIGHUP,
GUC_UNIT_MB,
NULL, NULL, NULL);
DefineCustomIntVariable("neon.flush_output_after",
"Flush the output buffer after every N unflushed requests",
NULL,
&flush_every_n_requests,
8, -1, INT_MAX,
PGC_SIGHUP,
0, /* no flags required */
NULL, NULL, NULL);
relsize_hash_init();

View File

@@ -115,8 +115,6 @@ typedef struct
char page[FLEXIBLE_ARRAY_MEMBER];
} NeonGetPageResponse;
#define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ))
typedef struct
{
NeonMessageTag tag;
@@ -140,18 +138,15 @@ extern char *nm_to_string(NeonMessage * msg);
typedef struct
{
NeonResponse *(*request) (NeonRequest * request);
void (*send) (NeonRequest * request);
NeonResponse *(*receive) (void);
void (*flush) (void);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);
extern page_server_api * page_server;
extern char *page_server_connstring;
extern bool seqscan_prefetch_enabled;
extern int seqscan_prefetch_distance;
extern char *neon_timeline;
extern char *neon_tenant;
extern bool wal_redo;
@@ -172,6 +167,7 @@ extern void neon_extend(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
extern bool neon_prefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
extern void neon_reset_prefetch(SMgrRelation reln);
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer);

View File

@@ -49,20 +49,22 @@
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "access/xlog_internal.h"
#include "access/xlogdefs.h"
#include "catalog/pg_class.h"
#include "common/hashfn.h"
#include "pagestore_client.h"
#include "pagestore_client.h"
#include "storage/smgr.h"
#include "access/xlogdefs.h"
#include "postmaster/interrupt.h"
#include "postmaster/autovacuum.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/relfilenode.h"
#include "storage/buf_internals.h"
#include "storage/smgr.h"
#include "storage/md.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "catalog/pg_tablespace_d.h"
#include "postmaster/autovacuum.h"
#if PG_VERSION_NUM >= 150000
#include "access/xlogutils.h"
@@ -111,482 +113,48 @@ typedef enum
static SMgrRelation unlogged_build_rel = NULL;
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
/*
* Prefetch implementation:
*
* Prefetch is performed locally by each backend.
*
* There can be up to READ_BUFFER_SIZE active IO requests registered at any
* time. Requests using smgr_prefetch are sent to the pageserver, but we don't
* wait on the response. Requests using smgr_read are either read from the
* buffer, or (if that's not possible) we wait on the response to arrive -
* this also will allow us to receive other prefetched pages.
* Each request is immediately written to the output buffer of the pageserver
* connection, but may not be flushed if smgr_prefetch is used: pageserver
* flushes sent requests on manual flush, or every neon.flush_output_after
* unflushed requests; which is not necessarily always and all the time.
*
* Once we have received a response, this value will be stored in the response
* buffer, indexed in a hash table. This allows us to retain our buffered
* prefetch responses even when we have cache misses.
*
* Reading of prefetch responses is delayed until them are actually needed
* (smgr_read). In case of prefetch miss or any other SMGR request other than
* smgr_read, all prefetch responses in the pipeline will need to be read from
* the connection; the responses are stored for later use.
*
* NOTE: The current implementation of the prefetch system implements a ring
* buffer of up to READ_BUFFER_SIZE requests. If there are more _read and
* _prefetch requests between the initial _prefetch and the _read of a buffer,
* the prefetch request will have been dropped from this prefetch buffer, and
* your prefetch was wasted.
* There can be up to MAX_PREFETCH_REQUESTS registered using smgr_prefetch
* before smgr_read. All this requests are appended to primary smgr_read request.
* It is assumed that pages will be requested in prefetch order.
* Reading of prefetch responses is delayed until them are actually needed (smgr_read).
* It make it possible to parallelize processing and receiving of prefetched pages.
* In case of prefetch miss or any other SMGR request other than smgr_read,
* all prefetch responses has to be consumed.
*/
/* Max amount of tracked buffer reads */
#define READ_BUFFER_SIZE 128
#define MAX_PREFETCH_REQUESTS 128
typedef enum PrefetchStatus {
PRFS_UNUSED = 0, /* unused slot */
PRFS_REQUESTED, /* request was written to the sendbuffer to PS, but not
* necessarily flushed.
* all fields except response valid */
PRFS_RECEIVED, /* all fields valid */
PRFS_TAG_REMAINS, /* only buftag and my_ring_index are still valid */
} PrefetchStatus;
BufferTag prefetch_requests[MAX_PREFETCH_REQUESTS];
BufferTag prefetch_responses[MAX_PREFETCH_REQUESTS];
int n_prefetch_requests;
int n_prefetch_responses;
int n_prefetched_buffers;
int n_prefetch_hits;
int n_prefetch_misses;
XLogRecPtr prefetch_lsn;
typedef struct PrefetchRequest {
BufferTag buftag; /* must be first entry in the struct */
XLogRecPtr effective_request_lsn;
NeonResponse *response; /* may be null */
PrefetchStatus status;
uint64 my_ring_index;
} PrefetchRequest;
/* prefetch buffer lookup hash table */
typedef struct PrfHashEntry {
PrefetchRequest *slot;
uint32 status;
uint32 hash;
} PrfHashEntry;
#define SH_PREFIX prfh
#define SH_ELEMENT_TYPE PrfHashEntry
#define SH_KEY_TYPE PrefetchRequest *
#define SH_KEY slot
#define SH_STORE_HASH
#define SH_GET_HASH(tb, a) ((a)->hash)
#define SH_HASH_KEY(tb, key) hash_bytes( \
((const unsigned char *) &(key)->buftag), \
sizeof(BufferTag) \
)
#define SH_EQUAL(tb, a, b) (BUFFERTAGS_EQUAL((a)->buftag, (b)->buftag))
#define SH_SCOPE static inline
#define SH_DEFINE
#define SH_DECLARE
#include "lib/simplehash.h"
/*
* PrefetchState maintains the state of (prefetch) getPage@LSN requests.
* It maintains a (ring) buffer of in-flight requests and responses.
*
* We maintain several indexes into the ring buffer:
* ring_unused >= ring_receive >= ring_last >= 0
*
* ring_unused points to the first unused slot of the buffer
* ring_receive is the next request that is to be received
* ring_last is the oldest received entry in the buffer
*
* Apart from being an entry in the ring buffer of prefetch requests, each
* PrefetchRequest that is not UNUSED is indexed in prf_hash by buftag.
*/
typedef struct PrefetchState {
MemoryContext bufctx; /* context for prf_buffer[].response allocations */
MemoryContext errctx; /* context for prf_buffer[].response allocations */
MemoryContext hashctx; /* context for prf_buffer */
/* buffer indexes */
uint64 ring_unused; /* first unused slot */
uint64 ring_receive; /* next slot that is to receive a response */
uint64 ring_last; /* min slot with a response value */
/* metrics / statistics */
int n_responses_buffered; /* count of PS responses not yet in buffers */
int n_requests_inflight; /* count of PS requests considered in flight */
int n_unused; /* count of buffers < unused, > last, that are also unused */
/* the buffers */
prfh_hash *prf_hash;
PrefetchRequest prf_buffer[READ_BUFFER_SIZE]; /* prefetch buffers */
} PrefetchState;
PrefetchState *MyPState;
int n_prefetch_hits = 0;
int n_prefetch_misses = 0;
int n_prefetch_missed_caches = 0;
int n_prefetch_dupes = 0;
XLogRecPtr prefetch_lsn = 0;
static void consume_prefetch_responses(void);
static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn);
static void prefetch_read(PrefetchRequest *slot);
static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn);
static void prefetch_wait_for(uint64 ring_index);
static void prefetch_cleanup(void);
static inline void prefetch_set_unused(uint64 ring_index, bool hash_cleanup);
static XLogRecPtr neon_get_request_lsn(bool *latest, RelFileNode rnode,
ForkNumber forknum, BlockNumber blkno);
/*
* Make sure that there are no responses still in the buffer.
*/
static void
consume_prefetch_responses(void)
{
if (MyPState->ring_receive < MyPState->ring_unused)
prefetch_wait_for(MyPState->ring_unused - 1);
}
static void
prefetch_cleanup(void)
{
int index;
uint64 ring_index;
PrefetchRequest *slot;
while (MyPState->ring_last < MyPState->ring_receive) {
ring_index = MyPState->ring_last;
index = (ring_index % READ_BUFFER_SIZE);
slot = &MyPState->prf_buffer[index];
if (slot->status == PRFS_UNUSED)
MyPState->ring_last += 1;
else
break;
}
}
/*
* Wait for slot of ring_index to have received its response.
* The caller is responsible for making sure the request buffer is flushed.
*/
static void
prefetch_wait_for(uint64 ring_index)
{
int index;
PrefetchRequest *entry;
Assert(MyPState->ring_unused > ring_index);
while (MyPState->ring_receive <= ring_index)
for (int i = n_prefetched_buffers; i < n_prefetch_responses; i++)
{
index = (MyPState->ring_receive % READ_BUFFER_SIZE);
entry = &MyPState->prf_buffer[index];
NeonResponse *resp = page_server->receive();
Assert(entry->status == PRFS_REQUESTED);
prefetch_read(entry);
pfree(resp);
}
}
/*
* Read the response of a prefetch request into its slot.
*
* The caller is responsible for making sure that the request for this buffer
* was flushed to the PageServer.
*/
static void
prefetch_read(PrefetchRequest *slot)
{
NeonResponse *response;
MemoryContext old;
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive();
MemoryContextSwitchTo(old);
/* update prefetch state */
MyPState->n_responses_buffered += 1;
MyPState->n_requests_inflight -= 1;
MyPState->ring_receive += 1;
/* update slot state */
slot->status = PRFS_RECEIVED;
slot->response = response;
}
/*
* Disconnect hook - drop prefetches when the connection drops
*
* If we don't remove the failed prefetches, we'd be serving incorrect
* data to the smgr.
*/
void
prefetch_on_ps_disconnect(void)
{
for (; MyPState->ring_receive < MyPState->ring_unused; MyPState->ring_receive++)
{
PrefetchRequest *slot;
int index = MyPState->ring_receive % READ_BUFFER_SIZE;
slot = &MyPState->prf_buffer[index];
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->my_ring_index == MyPState->ring_receive);
/* clean up the request */
slot->status = PRFS_TAG_REMAINS;
MyPState->n_requests_inflight--;
prefetch_set_unused(MyPState->ring_receive, true);
}
}
/*
* prefetch_set_unused() - clear a received prefetch slot
*
* The slot at ring_index must be a current member of the ring buffer,
* and may not be in the PRFS_REQUESTED state.
*/
static inline void
prefetch_set_unused(uint64 ring_index, bool hash_cleanup)
{
PrefetchRequest *slot = &MyPState->prf_buffer[ring_index % READ_BUFFER_SIZE];
Assert(MyPState->ring_last <= ring_index &&
MyPState->ring_unused > ring_index);
if (slot->status == PRFS_UNUSED)
return;
Assert(slot->status == PRFS_RECEIVED || slot->status == PRFS_TAG_REMAINS);
Assert(ring_index >= MyPState->ring_last &&
ring_index < MyPState->ring_unused);
if (slot->status == PRFS_RECEIVED)
{
pfree(slot->response);
slot->response = NULL;
MyPState->n_responses_buffered -= 1;
MyPState->n_unused += 1;
}
else
{
Assert(slot->response == NULL);
}
if (hash_cleanup)
prfh_delete(MyPState->prf_hash, slot);
/* clear all fields */
MemSet(slot, 0, sizeof(PrefetchRequest));
slot->status = PRFS_UNUSED;
/* run cleanup if we're holding back ring_last */
if (MyPState->ring_last == ring_index)
prefetch_cleanup();
}
static void
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
{
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = false,
.req.lsn = 0,
.rnode = slot->buftag.rnode,
.forknum = slot->buftag.forkNum,
.blkno = slot->buftag.blockNum,
};
if (force_lsn && force_latest)
{
request.req.lsn = *force_lsn;
request.req.latest = *force_latest;
slot->effective_request_lsn = *force_lsn;
}
else
{
XLogRecPtr lsn = neon_get_request_lsn(
&request.req.latest,
slot->buftag.rnode,
slot->buftag.forkNum,
slot->buftag.blockNum
);
/*
* Note: effective_request_lsn is potentially higher than the requested
* LSN, but still correct:
*
* We know there are no changes between the actual requested LSN and
* the value of effective_request_lsn: If there were, the page would
* have been in cache and evicted between those LSN values, which
* then would have had to result in a larger request LSN for this page.
*
* It is possible that a concurrent backend loads the page, modifies
* it and then evicts it again, but the LSN of that eviction cannot be
* smaller than the current WAL insert/redo pointer, which is already
* larger than this prefetch_lsn. So in any case, that would
* invalidate this cache.
*
* The best LSN to use for effective_request_lsn would be
* XLogCtl->Insert.RedoRecPtr, but that's expensive to access.
*/
request.req.lsn = lsn;
prefetch_lsn = Max(prefetch_lsn, lsn);
slot->effective_request_lsn = prefetch_lsn;
}
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
page_server->send((NeonRequest *) &request);
/* update prefetch state */
MyPState->n_requests_inflight += 1;
MyPState->n_unused -= 1;
MyPState->ring_unused += 1;
/* update slot state */
slot->status = PRFS_REQUESTED;
}
/*
* prefetch_register_buffer() - register and prefetch buffer
*
* Register that we may want the contents of BufferTag in the near future.
*
* If force_latest and force_lsn are not NULL, those values are sent to the
* pageserver. If they are NULL, we utilize the lastWrittenLsn -infrastructure
* to fill in these values manually.
*/
static uint64
prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn)
{
int index;
bool found;
uint64 ring_index;
PrefetchRequest req;
PrefetchRequest *slot;
PrfHashEntry *entry;
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
req.buftag = tag;
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &req);
if (entry != NULL)
{
slot = entry->slot;
ring_index = slot->my_ring_index;
index = (ring_index % READ_BUFFER_SIZE);
Assert(slot == &MyPState->prf_buffer[index]);
Assert(slot->status != PRFS_UNUSED);
Assert(BUFFERTAGS_EQUAL(slot->buftag, tag));
/*
* If we want a specific lsn, we do not accept requests that were made
* with a potentially different LSN.
*/
if (force_lsn && slot->effective_request_lsn != *force_lsn)
{
prefetch_wait_for(ring_index);
prefetch_set_unused(ring_index, true);
}
/*
* We received a prefetch for a page that was recently read and
* removed from the buffers. Remove that request from the buffers.
*/
else if (slot->status == PRFS_TAG_REMAINS)
{
prefetch_set_unused(ring_index, true);
}
else
{
/* The buffered request is good enough, return that index */
n_prefetch_dupes++;
return ring_index;
}
}
/*
* If the prefetch queue is full, we need to make room by clearing the
* oldest slot. If the oldest slot holds a buffer that was already
* received, we can just throw it away; we fetched the page unnecessarily
* in that case. If the oldest slot holds a request that we haven't
* received a response for yet, we have to wait for the response to that
* before we can continue. We might not have even flushed the request to
* the pageserver yet, it might be just sitting in the output buffer. In
* that case, we flush it and wait for the response. (We could decide not
* to send it, but it's hard to abort when the request is already in the
* output buffer, and 'not sending' a prefetch request kind of goes
* against the principles of prefetching)
*/
if (MyPState->ring_last + READ_BUFFER_SIZE - 1 == MyPState->ring_unused)
{
slot = &MyPState->prf_buffer[(MyPState->ring_last % READ_BUFFER_SIZE)];
Assert(slot->status != PRFS_UNUSED);
/* We have the slot for ring_last, so that must still be in progress */
switch (slot->status)
{
case PRFS_REQUESTED:
Assert(MyPState->ring_receive == MyPState->ring_last);
prefetch_wait_for(MyPState->ring_last);
prefetch_set_unused(MyPState->ring_last, true);
break;
case PRFS_RECEIVED:
case PRFS_TAG_REMAINS:
prefetch_set_unused(MyPState->ring_last, true);
break;
default:
pg_unreachable();
}
}
/*
* The next buffer pointed to by `ring_unused` is now unused, so we can insert
* the new request to it.
*/
ring_index = MyPState->ring_unused;
index = (ring_index % READ_BUFFER_SIZE);
slot = &MyPState->prf_buffer[index];
Assert(MyPState->ring_last <= ring_index);
Assert(slot->status == PRFS_UNUSED);
/*
* We must update the slot data before insertion, because the hash
* function reads the buffer tag from the slot.
*/
slot->buftag = tag;
slot->my_ring_index = ring_index;
prfh_insert(MyPState->prf_hash, slot, &found);
Assert(!found);
prefetch_do_request(slot, force_latest, force_lsn);
Assert(slot->status == PRFS_REQUESTED);
Assert(ring_index < MyPState->ring_unused);
return ring_index;
n_prefetched_buffers = 0;
n_prefetch_responses = 0;
}
static NeonResponse *
page_server_request(void const *req)
{
page_server->send((NeonRequest *) req);
page_server->flush();
consume_prefetch_responses();
return page_server->receive();
return page_server->request((NeonRequest *) req);
}
@@ -700,15 +268,12 @@ nm_unpack_response(StringInfo s)
case T_NeonGetPageResponse:
{
NeonGetPageResponse *msg_resp;
NeonGetPageResponse *msg_resp = palloc0(offsetof(NeonGetPageResponse, page) + BLCKSZ);
msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE);
msg_resp->tag = tag;
/* XXX: should be varlena */
memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ);
pq_getmsgend(s);
Assert(msg_resp->tag == T_NeonGetPageResponse);
resp = (NeonResponse *) msg_resp;
break;
@@ -1052,32 +617,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
void
neon_init(void)
{
HASHCTL info;
if (MyPState != NULL)
return;
MyPState = MemoryContextAllocZero(TopMemoryContext, sizeof(PrefetchState));
MyPState->n_unused = READ_BUFFER_SIZE;
MyPState->bufctx = SlabContextCreate(TopMemoryContext,
"NeonSMGR/prefetch",
SLAB_DEFAULT_BLOCK_SIZE * 17,
PS_GETPAGERESPONSE_SIZE);
MyPState->errctx = AllocSetContextCreate(TopMemoryContext,
"NeonSMGR/errors",
ALLOCSET_DEFAULT_SIZES);
MyPState->hashctx = AllocSetContextCreate(TopMemoryContext,
"NeonSMGR/prefetch",
ALLOCSET_DEFAULT_SIZES);
info.keysize = sizeof(BufferTag);
info.entrysize = sizeof(uint64);
MyPState->prf_hash = prfh_create(MyPState->hashctx,
READ_BUFFER_SIZE, NULL);
/* noop */
#ifdef DEBUG_COMPARE_LOCAL
mdinit();
#endif
@@ -1464,17 +1004,27 @@ neon_close(SMgrRelation reln, ForkNumber forknum)
}
/*
* neon_reset_prefetch() -- reoe all previously rgistered prefeth requests
*/
void
neon_reset_prefetch(SMgrRelation reln)
{
n_prefetch_requests = 0;
}
/*
* neon_prefetch() -- Initiate asynchronous read of the specified block of a relation
*/
bool
neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
{
uint64 ring_index;
switch (reln->smgr_relpersistence)
{
case 0: /* probably shouldn't happen, but ignore it */
case 0:
/* probably shouldn't happen, but ignore it */
break;
case RELPERSISTENCE_PERMANENT:
break;
@@ -1486,17 +1036,14 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
BufferTag tag = (BufferTag) {
.rnode = reln->smgr_rnode.node,
.forkNum = forknum,
.blockNum = blocknum
};
ring_index = prefetch_register_buffer(tag, NULL, NULL);
Assert(ring_index < MyPState->ring_unused &&
MyPState->ring_last <= ring_index);
if (n_prefetch_requests < MAX_PREFETCH_REQUESTS)
{
prefetch_requests[n_prefetch_requests].rnode = reln->smgr_rnode.node;
prefetch_requests[n_prefetch_requests].forkNum = forknum;
prefetch_requests[n_prefetch_requests].blockNum = blocknum;
n_prefetch_requests += 1;
return true;
}
return false;
}
@@ -1547,72 +1094,81 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer)
{
NeonResponse *resp;
BufferTag buftag;
uint64 ring_index;
PrfHashEntry *entry;
PrefetchRequest *slot;
buftag = (BufferTag) {
.rnode = rnode,
.forkNum = forkNum,
.blockNum = blkno,
};
int i;
/*
* Try to find prefetched page in the list of received pages.
* Try to find prefetched page. It is assumed that pages will be requested
* in the same order as them are prefetched, but some other backend may
* load page in shared buffers, so some prefetch responses should be
* skipped.
*/
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &buftag);
if (entry != NULL)
for (i = n_prefetched_buffers; i < n_prefetch_responses; i++)
{
if (entry->slot->effective_request_lsn >= prefetch_lsn)
{
slot = entry->slot;
ring_index = slot->my_ring_index;
n_prefetch_hits += 1;
}
else /* the current prefetch LSN is not large enough, so drop the prefetch */
resp = page_server->receive();
if (resp->tag == T_NeonGetPageResponse &&
RelFileNodeEquals(prefetch_responses[i].rnode, rnode) &&
prefetch_responses[i].forkNum == forkNum &&
prefetch_responses[i].blockNum == blkno)
{
char *page = ((NeonGetPageResponse *) resp)->page;
/*
* We can't drop cache for not-yet-received requested items. It is
* unlikely this happens, but it can happen if prefetch distance is
* large enough and a backend didn't consume all prefetch requests.
* Check if prefetched page is still relevant. If it is updated by
* some other backend, then it should not be requested from smgr
* unless it is evicted from shared buffers. In the last case
* last_evicted_lsn should be updated and request_lsn should be
* greater than prefetch_lsn. Maximum with page LSN is used
* because page returned by page server may have LSN either
* greater either smaller than requested.
*/
if (entry->slot->status == PRFS_REQUESTED)
if (Max(prefetch_lsn, PageGetLSN(page)) >= request_lsn)
{
page_server->flush();
prefetch_wait_for(entry->slot->my_ring_index);
n_prefetched_buffers = i + 1;
n_prefetch_hits += 1;
n_prefetch_requests = 0;
memcpy(buffer, page, BLCKSZ);
pfree(resp);
return;
}
/* drop caches */
prefetch_set_unused(entry->slot->my_ring_index, true);
n_prefetch_missed_caches += 1;
/* make it look like a prefetch cache miss */
entry = NULL;
}
pfree(resp);
}
n_prefetched_buffers = 0;
n_prefetch_responses = 0;
n_prefetch_misses += 1;
{
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = request_latest,
.req.lsn = request_lsn,
.rnode = rnode,
.forknum = forkNum,
.blkno = blkno
};
if (n_prefetch_requests > 0)
{
/* Combine all prefetch requests with primary request */
page_server->send((NeonRequest *) & request);
for (i = 0; i < n_prefetch_requests; i++)
{
request.rnode = prefetch_requests[i].rnode;
request.forknum = prefetch_requests[i].forkNum;
request.blkno = prefetch_requests[i].blockNum;
prefetch_responses[i] = prefetch_requests[i];
page_server->send((NeonRequest *) & request);
}
page_server->flush();
n_prefetch_responses = n_prefetch_requests;
n_prefetch_requests = 0;
prefetch_lsn = request_lsn;
resp = page_server->receive();
}
else
{
resp = page_server->request((NeonRequest *) & request);
}
}
if (entry == NULL)
{
n_prefetch_misses += 1;
ring_index = prefetch_register_buffer(buftag, &request_latest,
&request_lsn);
slot = &MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)];
}
Assert(MyPState->ring_last <= ring_index &&
MyPState->ring_unused > ring_index);
Assert(slot->my_ring_index == ring_index);
Assert(slot->status != PRFS_UNUSED);
Assert(&MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)] == slot);
page_server->flush();
prefetch_wait_for(ring_index);
Assert(slot->status == PRFS_RECEIVED);
resp = slot->response;
switch (resp->tag)
{
case T_NeonGetPageResponse:
@@ -1632,13 +1188,12 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
}
/* buffer was used, clean up for later reuse */
prefetch_set_unused(ring_index, true);
prefetch_cleanup();
pfree(resp);
}
/*
@@ -2260,6 +1815,7 @@ static const struct f_smgr neon_smgr =
.smgr_unlink = neon_unlink,
.smgr_extend = neon_extend,
.smgr_prefetch = neon_prefetch,
.smgr_reset_prefetch = neon_reset_prefetch,
.smgr_read = neon_read,
.smgr_write = neon_write,
.smgr_writeback = neon_writeback,

View File

@@ -75,7 +75,7 @@ static bool syncSafekeepers = false;
char *wal_acceptors_list;
int wal_acceptor_reconnect_timeout;
int wal_acceptor_connection_timeout;
int wal_acceptor_connect_timeout;
bool am_wal_proposer;
char *neon_timeline_walproposer = NULL;
@@ -266,9 +266,9 @@ nwp_register_gucs(void)
DefineCustomIntVariable(
"neon.safekeeper_connect_timeout",
"Timeout for connection establishement and it's maintenance against safekeeper",
"Timeout after which give up connection attempt to safekeeper.",
NULL,
&wal_acceptor_connection_timeout,
&wal_acceptor_connect_timeout,
5000, 0, INT_MAX,
PGC_SIGHUP,
GUC_UNIT_MS,
@@ -417,9 +417,7 @@ WalProposerPoll(void)
ResetLatch(MyLatch);
break;
}
now = GetCurrentTimestamp();
if (rc == 0 || TimeToReconnect(now) <= 0) /* timeout expired: poll state */
if (rc == 0) /* timeout expired: poll state */
{
TimestampTz now;
@@ -440,11 +438,13 @@ WalProposerPoll(void)
{
Safekeeper *sk = &safekeeper[i];
if (TimestampDifferenceExceeds(sk->latestMsgReceivedAt, now,
wal_acceptor_connection_timeout))
if ((sk->state == SS_CONNECTING_WRITE ||
sk->state == SS_CONNECTING_READ) &&
TimestampDifferenceExceeds(sk->startedConnAt, now,
wal_acceptor_connect_timeout))
{
elog(WARNING, "failed to connect to node '%s:%s' in '%s' state: exceeded connection timeout %dms",
sk->host, sk->port, FormatSafekeeperState(sk->state), wal_acceptor_connection_timeout);
elog(WARNING, "failed to connect to node '%s:%s': exceeded connection timeout %dms",
sk->host, sk->port, wal_acceptor_connect_timeout);
ShutdownConnection(sk);
}
}
@@ -760,7 +760,7 @@ ResetConnection(Safekeeper *sk)
elog(LOG, "connecting with node %s:%s", sk->host, sk->port);
sk->state = SS_CONNECTING_WRITE;
sk->latestMsgReceivedAt = GetCurrentTimestamp();
sk->startedConnAt = GetCurrentTimestamp();
sock = walprop_socket(sk->conn);
sk->eventPos = AddWaitEventToSet(waitEvents, WL_SOCKET_WRITEABLE, sock, NULL, sk);
@@ -918,7 +918,7 @@ HandleConnectionEvent(Safekeeper *sk)
case WP_CONN_POLLING_OK:
elog(LOG, "connected with node %s:%s", sk->host,
sk->port);
sk->latestMsgReceivedAt = GetCurrentTimestamp();
/*
* We have to pick some event to update event set. We'll
* eventually need the socket to be readable, so we go with that.
@@ -2304,7 +2304,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage * anymsg)
ResetConnection(sk);
return false;
}
sk->latestMsgReceivedAt = GetCurrentTimestamp();
switch (tag)
{
case 'g':

View File

@@ -30,7 +30,7 @@
extern char *wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connection_timeout;
extern int wal_acceptor_connect_timeout;
extern bool am_wal_proposer;
struct WalProposerConn; /* Defined in libpqwalproposer */
@@ -371,7 +371,7 @@ typedef struct Safekeeper
int eventPos; /* position in wait event set. Equal to -1 if*
* no event */
SafekeeperState state; /* safekeeper state machine state */
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
TimestampTz startedConnAt; /* when connection attempt started */
AcceptorGreeting greetResponse; /* acceptor greeting */
VoteResponse voteResponse; /* the vote */
AppendResponse appendResponse; /* feedback for master */

120
poetry.lock generated
View File

@@ -583,7 +583,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
[[package]]
name = "cryptography"
version = "38.0.3"
version = "37.0.4"
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
category = "main"
optional = false
@@ -593,10 +593,10 @@ python-versions = ">=3.6"
cffi = ">=1.12"
[package.extras]
docs = ["sphinx (>=1.6.5,!=1.8.0,!=3.1.0,!=3.1.1)", "sphinx-rtd-theme"]
docs = ["sphinx (>=1.6.5,!=1.8.0,!=3.1.0,!=3.1.1)", "sphinx_rtd_theme"]
docstest = ["pyenchant (>=1.6.11)", "sphinxcontrib-spelling (>=4.0.1)", "twine (>=1.12.0)"]
pep8test = ["black", "flake8", "flake8-import-order", "pep8-naming"]
sdist = ["setuptools-rust (>=0.11.4)"]
sdist = ["setuptools_rust (>=0.11.4)"]
ssh = ["bcrypt (>=3.1.5)"]
test = ["hypothesis (>=1.11.4,!=3.79.2)", "iso8601", "pretend", "pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-subtests", "pytest-xdist", "pytz"]
@@ -1077,17 +1077,6 @@ python-versions = ">=3.6"
[package.extras]
twisted = ["twisted"]
[[package]]
name = "psutil"
version = "5.9.4"
description = "Cross-platform lib for process and system monitoring in Python."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
[package.extras]
test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"]
[[package]]
name = "psycopg2-binary"
version = "2.9.3"
@@ -1218,6 +1207,18 @@ pytest = ">=6.1.0"
[package.extras]
testing = ["coverage (>=6.2)", "flaky (>=3.5.0)", "hypothesis (>=5.7.1)", "mypy (>=0.931)", "pytest-trio (>=0.7.0)"]
[[package]]
name = "pytest-forked"
version = "1.4.0"
description = "run tests in isolated forked subprocesses"
category = "main"
optional = false
python-versions = ">=3.6"
[package.dependencies]
py = "*"
pytest = ">=3.10"
[[package]]
name = "pytest-lazy-fixture"
version = "0.6.3"
@@ -1239,8 +1240,8 @@ python-versions = ">=3.6"
[package.dependencies]
pytest = [
{version = ">=5.0", markers = "python_version < \"3.10\""},
{version = ">=6.2.4", markers = "python_version >= \"3.10\""},
{version = ">=5.0", markers = "python_version < \"3.10\""},
]
[[package]]
@@ -1256,7 +1257,7 @@ pytest = ">=5.0.0"
[[package]]
name = "pytest-xdist"
version = "3.0.2"
version = "2.5.0"
description = "pytest xdist plugin for distributed testing and loop-on-failing modes"
category = "main"
optional = false
@@ -1265,6 +1266,7 @@ python-versions = ">=3.6"
[package.dependencies]
execnet = ">=1.1"
pytest = ">=6.2.0"
pytest-forked = "*"
[package.extras]
psutil = ["psutil (>=3.0)"]
@@ -1447,14 +1449,6 @@ category = "dev"
optional = false
python-versions = ">=3.7"
[[package]]
name = "types-psutil"
version = "5.9.5.4"
description = "Typing stubs for psutil"
category = "main"
optional = false
python-versions = "*"
[[package]]
name = "types-psycopg2"
version = "2.9.18"
@@ -1574,7 +1568,7 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "c95c184fccaf40815405ad616ec1c55869c7f87b72777cc3a9cbaff41de98977"
content-hash = "9352a89d49d34807f6a58f6c3f898acbd8cf3570e0f45ede973673644bde4d0e"
[metadata.files]
aiopg = [
@@ -1756,32 +1750,28 @@ colorama = [
{file = "colorama-0.4.5.tar.gz", hash = "sha256:e6c6b4334fc50988a639d9b98aa429a0b57da6e17b9a44f0451f930b6967b7a4"},
]
cryptography = [
{file = "cryptography-38.0.3-cp36-abi3-macosx_10_10_universal2.whl", hash = "sha256:984fe150f350a3c91e84de405fe49e688aa6092b3525f407a18b9646f6612320"},
{file = "cryptography-38.0.3-cp36-abi3-macosx_10_10_x86_64.whl", hash = "sha256:ed7b00096790213e09eb11c97cc6e2b757f15f3d2f85833cd2d3ec3fe37c1722"},
{file = "cryptography-38.0.3-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl", hash = "sha256:bbf203f1a814007ce24bd4d51362991d5cb90ba0c177a9c08825f2cc304d871f"},
{file = "cryptography-38.0.3-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:554bec92ee7d1e9d10ded2f7e92a5d70c1f74ba9524947c0ba0c850c7b011828"},
{file = "cryptography-38.0.3-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b1b52c9e5f8aa2b802d48bd693190341fae201ea51c7a167d69fc48b60e8a959"},
{file = "cryptography-38.0.3-cp36-abi3-manylinux_2_24_x86_64.whl", hash = "sha256:728f2694fa743a996d7784a6194da430f197d5c58e2f4e278612b359f455e4a2"},
{file = "cryptography-38.0.3-cp36-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:dfb4f4dd568de1b6af9f4cda334adf7d72cf5bc052516e1b2608b683375dd95c"},
{file = "cryptography-38.0.3-cp36-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:5419a127426084933076132d317911e3c6eb77568a1ce23c3ac1e12d111e61e0"},
{file = "cryptography-38.0.3-cp36-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:9b24bcff7853ed18a63cfb0c2b008936a9554af24af2fb146e16d8e1aed75748"},
{file = "cryptography-38.0.3-cp36-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:25c1d1f19729fb09d42e06b4bf9895212292cb27bb50229f5aa64d039ab29146"},
{file = "cryptography-38.0.3-cp36-abi3-win32.whl", hash = "sha256:7f836217000342d448e1c9a342e9163149e45d5b5eca76a30e84503a5a96cab0"},
{file = "cryptography-38.0.3-cp36-abi3-win_amd64.whl", hash = "sha256:c46837ea467ed1efea562bbeb543994c2d1f6e800785bd5a2c98bc096f5cb220"},
{file = "cryptography-38.0.3-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:06fc3cc7b6f6cca87bd56ec80a580c88f1da5306f505876a71c8cfa7050257dd"},
{file = "cryptography-38.0.3-pp37-pypy37_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:65535bc550b70bd6271984d9863a37741352b4aad6fb1b3344a54e6950249b55"},
{file = "cryptography-38.0.3-pp37-pypy37_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:5e89468fbd2fcd733b5899333bc54d0d06c80e04cd23d8c6f3e0542358c6060b"},
{file = "cryptography-38.0.3-pp38-pypy38_pp73-macosx_10_10_x86_64.whl", hash = "sha256:6ab9516b85bebe7aa83f309bacc5f44a61eeb90d0b4ec125d2d003ce41932d36"},
{file = "cryptography-38.0.3-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:068147f32fa662c81aebab95c74679b401b12b57494872886eb5c1139250ec5d"},
{file = "cryptography-38.0.3-pp38-pypy38_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:402852a0aea73833d982cabb6d0c3bb582c15483d29fb7085ef2c42bfa7e38d7"},
{file = "cryptography-38.0.3-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:b1b35d9d3a65542ed2e9d90115dfd16bbc027b3f07ee3304fc83580f26e43249"},
{file = "cryptography-38.0.3-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:6addc3b6d593cd980989261dc1cce38263c76954d758c3c94de51f1e010c9a50"},
{file = "cryptography-38.0.3-pp39-pypy39_pp73-macosx_10_10_x86_64.whl", hash = "sha256:be243c7e2bfcf6cc4cb350c0d5cdf15ca6383bbcb2a8ef51d3c9411a9d4386f0"},
{file = "cryptography-38.0.3-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:78cf5eefac2b52c10398a42765bfa981ce2372cbc0457e6bf9658f41ec3c41d8"},
{file = "cryptography-38.0.3-pp39-pypy39_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:4e269dcd9b102c5a3d72be3c45d8ce20377b8076a43cbed6f660a1afe365e436"},
{file = "cryptography-38.0.3-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:8d41a46251bf0634e21fac50ffd643216ccecfaf3701a063257fe0b2be1b6548"},
{file = "cryptography-38.0.3-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:785e4056b5a8b28f05a533fab69febf5004458e20dad7e2e13a3120d8ecec75a"},
{file = "cryptography-38.0.3.tar.gz", hash = "sha256:bfbe6ee19615b07a98b1d2287d6a6073f734735b49ee45b11324d85efc4d5cbd"},
{file = "cryptography-37.0.4-cp36-abi3-macosx_10_10_universal2.whl", hash = "sha256:549153378611c0cca1042f20fd9c5030d37a72f634c9326e225c9f666d472884"},
{file = "cryptography-37.0.4-cp36-abi3-macosx_10_10_x86_64.whl", hash = "sha256:a958c52505c8adf0d3822703078580d2c0456dd1d27fabfb6f76fe63d2971cd6"},
{file = "cryptography-37.0.4-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f721d1885ecae9078c3f6bbe8a88bc0786b6e749bf32ccec1ef2b18929a05046"},
{file = "cryptography-37.0.4-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl", hash = "sha256:3d41b965b3380f10e4611dbae366f6dc3cefc7c9ac4e8842a806b9672ae9add5"},
{file = "cryptography-37.0.4-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:80f49023dd13ba35f7c34072fa17f604d2f19bf0989f292cedf7ab5770b87a0b"},
{file = "cryptography-37.0.4-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f2dcb0b3b63afb6df7fd94ec6fbddac81b5492513f7b0436210d390c14d46ee8"},
{file = "cryptography-37.0.4-cp36-abi3-manylinux_2_24_x86_64.whl", hash = "sha256:b7f8dd0d4c1f21759695c05a5ec8536c12f31611541f8904083f3dc582604280"},
{file = "cryptography-37.0.4-cp36-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:30788e070800fec9bbcf9faa71ea6d8068f5136f60029759fd8c3efec3c9dcb3"},
{file = "cryptography-37.0.4-cp36-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:190f82f3e87033821828f60787cfa42bff98404483577b591429ed99bed39d59"},
{file = "cryptography-37.0.4-cp36-abi3-win32.whl", hash = "sha256:b62439d7cd1222f3da897e9a9fe53bbf5c104fff4d60893ad1355d4c14a24157"},
{file = "cryptography-37.0.4-cp36-abi3-win_amd64.whl", hash = "sha256:f7a6de3e98771e183645181b3627e2563dcde3ce94a9e42a3f427d2255190327"},
{file = "cryptography-37.0.4-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bc95ed67b6741b2607298f9ea4932ff157e570ef456ef7ff0ef4884a134cc4b"},
{file = "cryptography-37.0.4-pp37-pypy37_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:f8c0a6e9e1dd3eb0414ba320f85da6b0dcbd543126e30fcc546e7372a7fbf3b9"},
{file = "cryptography-37.0.4-pp38-pypy38_pp73-macosx_10_10_x86_64.whl", hash = "sha256:e007f052ed10cc316df59bc90fbb7ff7950d7e2919c9757fd42a2b8ecf8a5f67"},
{file = "cryptography-37.0.4-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7bc997818309f56c0038a33b8da5c0bfbb3f1f067f315f9abd6fc07ad359398d"},
{file = "cryptography-37.0.4-pp38-pypy38_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:d204833f3c8a33bbe11eda63a54b1aad7aa7456ed769a982f21ec599ba5fa282"},
{file = "cryptography-37.0.4-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:75976c217f10d48a8b5a8de3d70c454c249e4b91851f6838a4e48b8f41eb71aa"},
{file = "cryptography-37.0.4-pp39-pypy39_pp73-macosx_10_10_x86_64.whl", hash = "sha256:7099a8d55cd49b737ffc99c17de504f2257e3787e02abe6d1a6d136574873441"},
{file = "cryptography-37.0.4-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2be53f9f5505673eeda5f2736bea736c40f051a739bfae2f92d18aed1eb54596"},
{file = "cryptography-37.0.4-pp39-pypy39_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:91ce48d35f4e3d3f1d83e29ef4a9267246e6a3be51864a5b7d2247d5086fa99a"},
{file = "cryptography-37.0.4-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:4c590ec31550a724ef893c50f9a97a0c14e9c851c85621c5650d699a7b88f7ab"},
{file = "cryptography-37.0.4.tar.gz", hash = "sha256:63f9c17c0e2474ccbebc9302ce2f07b55b3b3fcb211ded18a42d5764f5c10a82"},
]
docker = [
{file = "docker-4.2.2-py2.py3-none-any.whl", hash = "sha256:03a46400c4080cb6f7aa997f881ddd84fef855499ece219d75fbdb53289c17ab"},
@@ -1985,22 +1975,6 @@ prometheus-client = [
{file = "prometheus_client-0.14.1-py3-none-any.whl", hash = "sha256:522fded625282822a89e2773452f42df14b5a8e84a86433e3f8a189c1d54dc01"},
{file = "prometheus_client-0.14.1.tar.gz", hash = "sha256:5459c427624961076277fdc6dc50540e2bacb98eebde99886e59ec55ed92093a"},
]
psutil = [
{file = "psutil-5.9.4-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:c1ca331af862803a42677c120aff8a814a804e09832f166f226bfd22b56feee8"},
{file = "psutil-5.9.4-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:68908971daf802203f3d37e78d3f8831b6d1014864d7a85937941bb35f09aefe"},
{file = "psutil-5.9.4-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:3ff89f9b835100a825b14c2808a106b6fdcc4b15483141482a12c725e7f78549"},
{file = "psutil-5.9.4-cp27-cp27m-win32.whl", hash = "sha256:852dd5d9f8a47169fe62fd4a971aa07859476c2ba22c2254d4a1baa4e10b95ad"},
{file = "psutil-5.9.4-cp27-cp27m-win_amd64.whl", hash = "sha256:9120cd39dca5c5e1c54b59a41d205023d436799b1c8c4d3ff71af18535728e94"},
{file = "psutil-5.9.4-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:6b92c532979bafc2df23ddc785ed116fced1f492ad90a6830cf24f4d1ea27d24"},
{file = "psutil-5.9.4-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:efeae04f9516907be44904cc7ce08defb6b665128992a56957abc9b61dca94b7"},
{file = "psutil-5.9.4-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:54d5b184728298f2ca8567bf83c422b706200bcbbfafdc06718264f9393cfeb7"},
{file = "psutil-5.9.4-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:16653106f3b59386ffe10e0bad3bb6299e169d5327d3f187614b1cb8f24cf2e1"},
{file = "psutil-5.9.4-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:54c0d3d8e0078b7666984e11b12b88af2db11d11249a8ac8920dd5ef68a66e08"},
{file = "psutil-5.9.4-cp36-abi3-win32.whl", hash = "sha256:149555f59a69b33f056ba1c4eb22bb7bf24332ce631c44a319cec09f876aaeff"},
{file = "psutil-5.9.4-cp36-abi3-win_amd64.whl", hash = "sha256:fd8522436a6ada7b4aad6638662966de0d61d241cb821239b2ae7013d41a43d4"},
{file = "psutil-5.9.4-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:6001c809253a29599bc0dfd5179d9f8a5779f9dffea1da0f13c53ee568115e1e"},
{file = "psutil-5.9.4.tar.gz", hash = "sha256:3d7f9739eb435d4b1338944abe23f49584bde5395f27487d2ee25ad9a8774a62"},
]
psycopg2-binary = [
{file = "psycopg2-binary-2.9.3.tar.gz", hash = "sha256:761df5313dc15da1502b21453642d7599d26be88bff659382f8f9747c7ebea4e"},
{file = "psycopg2_binary-2.9.3-cp310-cp310-macosx_10_14_x86_64.macosx_10_9_intel.macosx_10_9_x86_64.macosx_10_10_intel.macosx_10_10_x86_64.whl", hash = "sha256:539b28661b71da7c0e428692438efbcd048ca21ea81af618d845e06ebfd29478"},
@@ -2125,6 +2099,10 @@ pytest-asyncio = [
{file = "pytest-asyncio-0.19.0.tar.gz", hash = "sha256:ac4ebf3b6207259750bc32f4c1d8fcd7e79739edbc67ad0c58dd150b1d072fed"},
{file = "pytest_asyncio-0.19.0-py3-none-any.whl", hash = "sha256:7a97e37cfe1ed296e2e84941384bdd37c376453912d397ed39293e0916f521fa"},
]
pytest-forked = [
{file = "pytest-forked-1.4.0.tar.gz", hash = "sha256:8b67587c8f98cbbadfdd804539ed5455b6ed03802203485dd2f53c1422d7440e"},
{file = "pytest_forked-1.4.0-py3-none-any.whl", hash = "sha256:bbbb6717efc886b9d64537b41fb1497cfaf3c9601276be8da2cccfea5a3c8ad8"},
]
pytest-lazy-fixture = [
{file = "pytest-lazy-fixture-0.6.3.tar.gz", hash = "sha256:0e7d0c7f74ba33e6e80905e9bfd81f9d15ef9a790de97993e34213deb5ad10ac"},
{file = "pytest_lazy_fixture-0.6.3-py3-none-any.whl", hash = "sha256:e0b379f38299ff27a653f03eaa69b08a6fd4484e46fd1c9907d984b9f9daeda6"},
@@ -2138,8 +2116,8 @@ pytest-timeout = [
{file = "pytest_timeout-2.1.0-py3-none-any.whl", hash = "sha256:f6f50101443ce70ad325ceb4473c4255e9d74e3c7cd0ef827309dfa4c0d975c6"},
]
pytest-xdist = [
{file = "pytest-xdist-3.0.2.tar.gz", hash = "sha256:688da9b814370e891ba5de650c9327d1a9d861721a524eb917e620eec3e90291"},
{file = "pytest_xdist-3.0.2-py3-none-any.whl", hash = "sha256:9feb9a18e1790696ea23e1434fa73b325ed4998b0e9fcb221f16fd1945e6df1b"},
{file = "pytest-xdist-2.5.0.tar.gz", hash = "sha256:4580deca3ff04ddb2ac53eba39d76cb5dd5edeac050cb6fbc768b0dd712b4edf"},
{file = "pytest_xdist-2.5.0-py3-none-any.whl", hash = "sha256:6fe5c74fec98906deb8f2d2b616b5c782022744978e7bd4695d39c8f42d0ce65"},
]
python-dateutil = [
{file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"},
@@ -2247,10 +2225,6 @@ tomli = [
{file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"},
{file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"},
]
types-psutil = [
{file = "types-psutil-5.9.5.4.tar.gz", hash = "sha256:aa09102b80c65a3b4573216614372398dab78972d650488eaff1ff05482cc18f"},
{file = "types_psutil-5.9.5.4-py3-none-any.whl", hash = "sha256:28e59764630187e462d43788efa16d59d5e77b510115f9e25901b2d4007fca62"},
]
types-psycopg2 = [
{file = "types-psycopg2-2.9.18.tar.gz", hash = "sha256:9b0e9e1f097b15cd9fa8aad2596a9e3082fd72f8d9cfe52b190cfa709105b6c0"},
{file = "types_psycopg2-2.9.18-py3-none-any.whl", hash = "sha256:14c779dcab18c31453fa1cad3cf4b1601d33540a344adead3c47a6b8091cd2fa"},

View File

@@ -22,7 +22,11 @@ once_cell = "1.13.0"
parking_lot = "0.12"
pin-project-lite = "0.2.7"
rand = "0.8.3"
reqwest = { version = "0.11", default-features = false, features = [ "json", "rustls-tls" ] }
reqwest = { version = "0.11", default-features = false, features = [
"blocking",
"json",
"rustls-tls",
] }
routerify = "3"
rustls = "0.20.0"
rustls-pemfile = "1"
@@ -41,9 +45,8 @@ url = "2.2.2"
uuid = { version = "1.2", features = ["v4", "serde"] }
x509-parser = "0.14"
metrics = { path = "../libs/metrics" }
pq_proto = { path = "../libs/pq_proto" }
utils = { path = "../libs/utils" }
metrics = { path = "../libs/metrics" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
[dev-dependencies]

View File

@@ -1,8 +1,8 @@
use crate::{auth, compute, error::UserFacingError, stream::PqStream, waiters};
use pq_proto::{BeMessage as Be, BeParameterStatusMessage};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, info_span};
use utils::pq_proto::{BeMessage as Be, BeParameterStatusMessage};
#[derive(Debug, Error)]
pub enum LinkAuthError {

View File

@@ -1,10 +1,10 @@
//! User credentials used in authentication.
use crate::error::UserFacingError;
use pq_proto::StartupMessageParams;
use std::borrow::Cow;
use thiserror::Error;
use tracing::info;
use utils::pq_proto::StartupMessageParams;
#[derive(Debug, Error, PartialEq, Eq, Clone)]
pub enum ClientCredsParseError {

View File

@@ -2,9 +2,9 @@
use super::{AuthErrorImpl, PasswordHackPayload};
use crate::{sasl, scram, stream::PqStream};
use pq_proto::{BeAuthenticationSaslMessage, BeMessage, BeMessage as Be};
use std::io;
use tokio::io::{AsyncRead, AsyncWrite};
use utils::pq_proto::{BeAuthenticationSaslMessage, BeMessage, BeMessage as Be};
/// Every authentication selector is supposed to implement this trait.
pub trait AuthMethod {

View File

@@ -1,11 +1,11 @@
use anyhow::{anyhow, Context};
use hashbrown::HashMap;
use parking_lot::Mutex;
use pq_proto::CancelKeyData;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio_postgres::{CancelToken, NoTls};
use tracing::info;
use utils::pq_proto::CancelKeyData;
/// Enables serving `CancelRequest`s.
#[derive(Default)]

View File

@@ -1,12 +1,12 @@
use crate::{cancellation::CancelClosure, error::UserFacingError};
use futures::TryFutureExt;
use itertools::Itertools;
use pq_proto::StartupMessageParams;
use std::{io, net::SocketAddr};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::NoTls;
use tracing::{error, info};
use utils::pq_proto::StartupMessageParams;
#[derive(Debug, Error)]
pub enum ConnectionError {
@@ -44,7 +44,7 @@ pub type ComputeConnCfg = tokio_postgres::Config;
/// Various compute node info for establishing connection etc.
pub struct NodeInfo {
/// Did we send [`pq_proto::BeMessage::AuthenticationOk`]?
/// Did we send [`utils::pq_proto::BeMessage::AuthenticationOk`]?
pub reported_auth_ok: bool,
/// Compute node connection params.
pub config: tokio_postgres::Config,

View File

@@ -1,13 +1,15 @@
use crate::auth;
use anyhow::Context;
use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
use serde::Deserialize;
use std::{
net::{TcpListener, TcpStream},
thread,
};
use tracing::{error, info};
use utils::postgres_backend::{self, AuthType, PostgresBackend};
use utils::{
postgres_backend::{self, AuthType, PostgresBackend},
pq_proto::{BeMessage, SINGLE_COL_ROWDESC},
};
/// TODO: move all of that to auth-backend/link.rs when we ditch legacy-console backend

View File

@@ -6,10 +6,10 @@ use anyhow::{bail, Context};
use futures::TryFutureExt;
use metrics::{register_int_counter, IntCounter};
use once_cell::sync::Lazy;
use pq_proto::{BeMessage as Be, *};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{error, info, info_span, Instrument};
use utils::pq_proto::{BeMessage as Be, *};
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";

View File

@@ -1,9 +1,9 @@
//! Definitions for SASL messages.
use crate::parse::{split_at_const, split_cstr};
use pq_proto::{BeAuthenticationSaslMessage, BeMessage};
use utils::pq_proto::{BeAuthenticationSaslMessage, BeMessage};
/// SASL-specific payload of [`PasswordMessage`](pq_proto::FeMessage::PasswordMessage).
/// SASL-specific payload of [`PasswordMessage`](utils::pq_proto::FeMessage::PasswordMessage).
#[derive(Debug)]
pub struct FirstMessage<'a> {
/// Authentication method, e.g. `"SCRAM-SHA-256"`.
@@ -31,7 +31,7 @@ impl<'a> FirstMessage<'a> {
/// A single SASL message.
/// This struct is deliberately decoupled from lower-level
/// [`BeAuthenticationSaslMessage`](pq_proto::BeAuthenticationSaslMessage).
/// [`BeAuthenticationSaslMessage`](utils::pq_proto::BeAuthenticationSaslMessage).
#[derive(Debug)]
pub(super) enum ServerMessage<T> {
/// We expect to see more steps.

View File

@@ -2,7 +2,6 @@ use crate::error::UserFacingError;
use anyhow::bail;
use bytes::BytesMut;
use pin_project_lite::pin_project;
use pq_proto::{BeMessage, FeMessage, FeStartupPacket};
use rustls::ServerConfig;
use std::pin::Pin;
use std::sync::Arc;
@@ -10,6 +9,7 @@ use std::{io, task};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio_rustls::server::TlsStream;
use utils::pq_proto::{BeMessage, FeMessage, FeStartupPacket};
pin_project! {
/// Stream wrapper which implements libpq's protocol.

View File

@@ -11,7 +11,7 @@ psycopg2-binary = "^2.9.1"
typing-extensions = "^4.1.0"
PyJWT = {version = "^2.1.0", extras = ["crypto"]}
requests = "^2.26.0"
pytest-xdist = "^3.0.2"
pytest-xdist = "^2.3.0"
asyncpg = "^0.24.0"
aiopg = "^1.3.1"
Jinja2 = "^3.0.2"
@@ -29,8 +29,6 @@ pytest-order = "^1.0.1"
allure-pytest = "^2.10.0"
pytest-asyncio = "^0.19.0"
toml = "^0.10.2"
psutil = "^5.9.4"
types-psutil = "^5.9.5.4"
[tool.poetry.dev-dependencies]
flake8 = "^5.0.4"

View File

@@ -4,42 +4,41 @@ version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
byteorder = "1.4.3"
bytes = "1.0.1"
clap = "4.0"
const_format = "0.2.21"
crc32c = "0.6.0"
fs2 = "0.4.3"
git-version = "0.3.5"
hex = "0.4.3"
humantime = "2.1.0"
hyper = "0.14"
nix = "0.25"
once_cell = "1.13.0"
parking_lot = "0.12.1"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
regex = "1.4.5"
serde = { version = "1.0", features = ["derive"] }
bytes = "1.0.1"
byteorder = "1.4.3"
hyper = "0.14"
fs2 = "0.4.3"
serde_json = "1"
serde_with = "2.0"
signal-hook = "0.3.10"
thiserror = "1"
tokio = { version = "1.17", features = ["macros", "fs"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
toml_edit = { version = "0.14", features = ["easy"] }
tracing = "0.1.27"
clap = "4.0"
nix = "0.25"
tokio = { version = "1.17", features = ["macros", "fs"] }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
anyhow = "1.0"
crc32c = "0.6.0"
humantime = "2.1.0"
url = "2.2.2"
signal-hook = "0.3.10"
serde = { version = "1.0", features = ["derive"] }
serde_with = "2.0"
hex = "0.4.3"
const_format = "0.2.21"
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
git-version = "0.3.5"
async-trait = "0.1"
once_cell = "1.13.0"
toml_edit = { version = "0.14", features = ["easy"] }
thiserror = "1"
parking_lot = "0.12.1"
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }
postgres_ffi = { path = "../libs/postgres_ffi" }
pq_proto = { path = "../libs/pq_proto" }
remote_storage = { path = "../libs/remote_storage" }
safekeeper_api = { path = "../libs/safekeeper_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }
etcd_broker = { path = "../libs/etcd_broker" }
remote_storage = { path = "../libs/remote_storage" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
[dev-dependencies]

View File

@@ -147,7 +147,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
new_lock_contents,
file,
} => {
info!("Created lock file at {lock_file_path:?} with contenst {new_lock_contents}");
info!("Created lock file at {lock_file_path:?} with contents {new_lock_contents}");
file
}
lock_file::LockCreationResult::AlreadyLocked {

View File

@@ -4,13 +4,13 @@ use crate::safekeeper::{
TermSwitchEntry,
};
use anyhow::{bail, Result};
use pq_proto::SystemId;
use serde::{Deserialize, Serialize};
use tracing::*;
use utils::{
bin_ser::LeSer,
id::{TenantId, TimelineId},
lsn::Lsn,
pq_proto::SystemId,
};
/// Persistent consensus state of the acceptor.

View File

@@ -12,12 +12,12 @@ use anyhow::{bail, Context, Result};
use postgres_ffi::PG_TLI;
use regex::Regex;
use pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID};
use tracing::info;
use utils::{
id::{TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
postgres_backend::{self, PostgresBackend},
pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID},
};
/// Safekeeper handler of postgres commands

View File

@@ -24,8 +24,11 @@ use crate::timeline::Timeline;
use crate::GlobalTimelines;
use postgres_ffi::encode_logical_message;
use postgres_ffi::WAL_SEGMENT_SIZE;
use pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
use utils::{lsn::Lsn, postgres_backend::PostgresBackend};
use utils::{
lsn::Lsn,
postgres_backend::PostgresBackend,
pq_proto::{BeMessage, RowDescriptor, TEXT_OID},
};
#[derive(Serialize, Deserialize, Debug)]
pub struct AppendLogicalMessage {

View File

@@ -383,7 +383,7 @@ impl Collector for TimelineCollector {
let timeline_id = tli.ttid.timeline_id.to_string();
let labels = &[tenant_id.as_str(), timeline_id.as_str()];
let mut most_advanced: Option<pq_proto::ReplicationFeedback> = None;
let mut most_advanced: Option<utils::pq_proto::ReplicationFeedback> = None;
for replica in tli.replicas.iter() {
if let Some(replica_feedback) = replica.pageserver_feedback {
if let Some(current) = most_advanced {

View File

@@ -23,8 +23,11 @@ use crate::safekeeper::AcceptorProposerMessage;
use crate::safekeeper::ProposerAcceptorMessage;
use crate::handler::SafekeeperPostgresHandler;
use pq_proto::{BeMessage, FeMessage};
use utils::{postgres_backend::PostgresBackend, sock_split::ReadStream};
use utils::{
postgres_backend::PostgresBackend,
pq_proto::{BeMessage, FeMessage},
sock_split::ReadStream,
};
pub struct ReceiveWalConn<'pg> {
/// Postgres connection

View File

@@ -18,11 +18,11 @@ use crate::control_file;
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
use pq_proto::{ReplicationFeedback, SystemId};
use utils::{
bin_ser::LeSer,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
pq_proto::{ReplicationFeedback, SystemId},
};
pub const SK_MAGIC: u32 = 0xcafeceefu32;

View File

@@ -17,11 +17,16 @@ use std::sync::Arc;
use std::time::Duration;
use std::{str, thread};
use pq_proto::{BeMessage, FeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody};
use tokio::sync::watch::Receiver;
use tokio::time::timeout;
use tracing::*;
use utils::{bin_ser::BeSer, lsn::Lsn, postgres_backend::PostgresBackend, sock_split::ReadStream};
use utils::{
bin_ser::BeSer,
lsn::Lsn,
postgres_backend::PostgresBackend,
pq_proto::{BeMessage, FeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody},
sock_split::ReadStream,
};
// See: https://www.postgresql.org/docs/13/protocol-replication.html
const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';

View File

@@ -2,20 +2,26 @@
//! to glue together SafeKeeper and all other background services.
use anyhow::{bail, Result};
use etcd_broker::subscription_value::SkTimelineInfo;
use parking_lot::{Mutex, MutexGuard};
use postgres_ffi::XLogSegNo;
use pq_proto::ReplicationFeedback;
use tokio::{sync::watch, time::Instant};
use std::cmp::{max, min};
use parking_lot::{Mutex, MutexGuard};
use std::path::PathBuf;
use tokio::{
sync::{mpsc::Sender, watch},
time::Instant,
};
use tokio::sync::mpsc::Sender;
use tracing::*;
use utils::{
id::{NodeId, TenantTimelineId},
lsn::Lsn,
pq_proto::ReplicationFeedback,
};
use crate::safekeeper::{
@@ -549,16 +555,13 @@ impl Timeline {
if self.is_cancelled() {
return true;
}
let mut shared_state = self.write_shared_state();
if shared_state.num_computes == 0 {
let replica_state = shared_state.replicas[replica_id].unwrap();
let reported_remote_consistent_lsn = replica_state
.pageserver_feedback
.map(|f| Lsn(f.ps_applylsn))
.unwrap_or(Lsn::INVALID);
let stop = shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
(reported_remote_consistent_lsn!= Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
if stop {
shared_state.update_status(self.ttid);
return true;

View File

@@ -4,18 +4,12 @@
# Outline of steps:
# 1. Get `(last_lsn, prev_lsn)` from old pageserver
# 2. Get `fullbackup` from old pageserver, which creates a basebackup tar file
# 3. This tar file might be missing relation files for empty relations, if the pageserver
# is old enough (we didn't always store those). So to recreate them, we start a local
# vanilla postgres on this basebackup and ask it what relations should exist, then touch
# any missing files and re-pack the tar.
# TODO This functionality is no longer needed, so we can delete it later if we don't
# end up using the same utils for the pg 15 upgrade. Not sure.
# 4. We import the patched basebackup into a new pageserver
# 5. We export again via fullbackup, now from the new pageserver and compare the returned
# 3. We import the basebackup into a new pageserver
# 4. We export again via fullbackup, now from the new pageserver and compare the returned
# tar file with the one we imported. This confirms that we imported everything that was
# exported, but doesn't guarantee correctness (what if we didn't **export** everything
# initially?)
# 6. We wait for the new pageserver's remote_consistent_lsn to catch up
# 5. We wait for the new pageserver's remote_consistent_lsn to catch up
#
# For more context on how to use this, see:
# https://github.com/neondatabase/cloud/wiki/Storage-format-migration
@@ -24,17 +18,13 @@ import argparse
import os
import shutil
import subprocess
import tempfile
import time
import uuid
from contextlib import closing
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, cast
from typing import Any, Dict, List, Optional
import psycopg2
import requests
from psycopg2.extensions import connection as PgConnection
from psycopg2.extensions import parse_dsn
###############################################
### client-side utils copied from test fixtures
@@ -135,105 +125,6 @@ class PgBin:
)
class PgProtocol:
"""Reusable connection logic"""
def __init__(self, **kwargs):
self.default_options = kwargs
def conn_options(self, **kwargs):
conn_options = self.default_options.copy()
if "dsn" in kwargs:
conn_options.update(parse_dsn(kwargs["dsn"]))
conn_options.update(kwargs)
# Individual statement timeout in seconds. 2 minutes should be
# enough for our tests, but if you need a longer, you can
# change it by calling "SET statement_timeout" after
# connecting.
conn_options["options"] = f"-cstatement_timeout=120s {conn_options.get('options', '')}"
return conn_options
# autocommit=True here by default because that's what we need most of the time
def connect(self, autocommit=True, **kwargs) -> PgConnection:
"""
Connect to the node.
Returns psycopg2's connection object.
This method passes all extra params to connstr.
"""
conn = psycopg2.connect(**self.conn_options(**kwargs))
# WARNING: this setting affects *all* tests!
conn.autocommit = autocommit
return conn
def safe_psql(self, query: str, **kwargs: Any) -> List[Tuple[Any, ...]]:
"""
Execute query against the node and return all rows.
This method passes all extra params to connstr.
"""
return self.safe_psql_many([query], **kwargs)[0]
def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]:
"""
Execute queries against the node and return all rows.
This method passes all extra params to connstr.
"""
result: List[List[Any]] = []
with closing(self.connect(**kwargs)) as conn:
with conn.cursor() as cur:
for query in queries:
print(f"Executing query: {query}")
cur.execute(query)
if cur.description is None:
result.append([]) # query didn't return data
else:
result.append(cast(List[Any], cur.fetchall()))
return result
class VanillaPostgres(PgProtocol):
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
super().__init__(host="localhost", port=port, dbname="postgres")
self.pgdatadir = pgdatadir
self.pg_bin = pg_bin
self.running = False
if init:
self.pg_bin.run_capture(["initdb", "-D", str(pgdatadir)])
self.configure([f"port = {port}\n"])
def configure(self, options: List[str]):
"""Append lines into postgresql.conf file."""
assert not self.running
with open(os.path.join(self.pgdatadir, "postgresql.conf"), "a") as conf_file:
conf_file.write("\n".join(options))
def start(self, log_path: Optional[str] = None):
assert not self.running
self.running = True
if log_path is None:
log_path = os.path.join(self.pgdatadir, "pg.log")
self.pg_bin.run_capture(
["pg_ctl", "-w", "-D", str(self.pgdatadir), "-l", log_path, "start"]
)
def stop(self):
assert self.running
self.running = False
self.pg_bin.run_capture(["pg_ctl", "-w", "-D", str(self.pgdatadir), "stop"])
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
if self.running:
self.stop()
class NeonPageserverApiException(Exception):
pass
@@ -370,83 +261,6 @@ def pack_base(log_dir, restored_dir, output_tar):
shutil.move(tmp_tar_path, output_tar)
def reconstruct_paths(log_dir, pg_bin, base_tar, port: int):
"""Reconstruct what relation files should exist in the datadir by querying postgres."""
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
# Start a vanilla postgres from the given datadir and query it to find
# what relfiles should exist, but possibly don't.
with VanillaPostgres(Path(restored_dir), pg_bin, port, init=False) as vanilla_pg:
vanilla_pg.configure([f"port={port}"])
vanilla_pg.start(log_path=os.path.join(log_dir, "tmp_pg.log"))
# Create database based on template0 because we can't connect to template0
query = "create database template0copy template template0"
vanilla_pg.safe_psql(query, user="cloud_admin")
vanilla_pg.safe_psql("CHECKPOINT", user="cloud_admin")
# Get all databases
query = "select oid, datname from pg_database"
oid_dbname_pairs = vanilla_pg.safe_psql(query, user="cloud_admin")
template0_oid = [
oid for (oid, database) in oid_dbname_pairs if database == "template0"
][0]
# Get rel paths for each database
for oid, database in oid_dbname_pairs:
if database == "template0":
# We can't connect to template0
continue
query = "select relname, pg_relation_filepath(oid) from pg_class"
result = vanilla_pg.safe_psql(query, user="cloud_admin", dbname=database)
for relname, filepath in result:
if filepath is not None:
if database == "template0copy":
# Add all template0copy paths to template0
prefix = f"base/{oid}/"
if filepath.startswith(prefix):
suffix = filepath[len(prefix) :]
yield f"base/{template0_oid}/{suffix}"
elif filepath.startswith("global"):
print(f"skipping {database} global file {filepath}")
else:
raise AssertionError
else:
yield filepath
def touch_missing_rels(log_dir, corrupt_tar, output_tar, paths):
"""Add the appropriate empty files to a basebadkup tar."""
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", corrupt_tar, "-C", restored_dir])
# Touch files that don't exist
for path in paths:
absolute_path = os.path.join(restored_dir, path)
exists = os.path.exists(absolute_path)
if not exists:
print(f"File {absolute_path} didn't exist. Creating..")
Path(absolute_path).touch()
# Repackage
pack_base(log_dir, restored_dir, output_tar)
# HACK This is a workaround for exporting from old pageservers that
# can't export empty relations. In this case we need to start
# a vanilla postgres from the exported datadir, and query it
# to see what empty relations are missing, and then create
# those empty files before importing.
def add_missing_rels(base_tar, output_tar, log_dir, pg_bin, tmp_pg_port: int):
reconstructed_paths = set(reconstruct_paths(log_dir, pg_bin, base_tar, tmp_pg_port))
touch_missing_rels(log_dir, base_tar, output_tar, reconstructed_paths)
def get_rlsn(pageserver_connstr, tenant_id, timeline_id):
conn = psycopg2.connect(pageserver_connstr)
conn.autocommit = True
@@ -515,7 +329,6 @@ def export_timeline(
pg_version,
):
# Choose filenames
incomplete_filename = tar_filename + ".incomplete"
stderr_filename = os.path.join(args.work_dir, f"{tenant_id}_{timeline_id}.stderr")
# Construct export command
@@ -524,18 +337,14 @@ def export_timeline(
# Run export command
print(f"Running: {cmd}")
with open(incomplete_filename, "w") as stdout_f:
with open(tar_filename, "w") as stdout_f:
with open(stderr_filename, "w") as stderr_f:
print(f"(capturing output to {incomplete_filename})")
print(f"(capturing output to {tar_filename})")
pg_bin = PgBin(args.work_dir, args.pg_distrib_dir, pg_version)
subprocess.run(
cmd, stdout=stdout_f, stderr=stderr_f, env=pg_bin._build_env(None), check=True
)
# Add missing rels
pg_bin = PgBin(args.work_dir, args.pg_distrib_dir, pg_version)
add_missing_rels(incomplete_filename, tar_filename, args.work_dir, pg_bin, args.tmp_pg_port)
# Log more info
file_size = os.path.getsize(tar_filename)
print(f"Done export: {tar_filename}, size {file_size}")

View File

@@ -11,37 +11,39 @@ from datetime import datetime
from pathlib import Path
# Type-related stuff
from typing import Callable, ClassVar, Iterator, Optional
from typing import Iterator, Optional
import pytest
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.terminal import TerminalReporter
from fixtures.neon_fixtures import NeonPageserver
from fixtures.types import TenantId, TimelineId
"""
This file contains fixtures for micro-benchmarks.
To use, declare the `zenbenchmark` fixture in the test function. Run the
bencmark, and then record the result by calling `zenbenchmark.record`. For example:
To use, declare the 'zenbenchmark' fixture in the test function. Run the
bencmark, and then record the result by calling zenbenchmark.record. For example:
>>> import timeit
>>> from fixtures.neon_fixtures import NeonEnv
>>> def test_mybench(neon_simple_env: NeonEnv, zenbenchmark):
... # Initialize the test
... ...
... # Run the test, timing how long it takes
... with zenbenchmark.record_duration('test_query'):
... cur.execute('SELECT test_query(...)')
... # Record another measurement
... zenbenchmark.record('speed_of_light', 300000, 'km/s')
import timeit
from fixtures.neon_fixtures import NeonEnv
def test_mybench(neon_simple_env: env, zenbenchmark):
# Initialize the test
...
# Run the test, timing how long it takes
with zenbenchmark.record_duration('test_query'):
cur.execute('SELECT test_query(...)')
# Record another measurement
zenbenchmark.record('speed_of_light', 300000, 'km/s')
There's no need to import this file to use it. It should be declared as a plugin
inside `conftest.py`, and that makes it available to all tests.
inside conftest.py, and that makes it available to all tests.
You can measure multiple things in one test, and record each one with a separate
call to `zenbenchmark`. For example, you could time the bulk loading that happens
call to zenbenchmark. For example, you could time the bulk loading that happens
in the test initialization, or measure disk usage after the test query.
"""
@@ -115,7 +117,7 @@ class PgBenchRunResult:
# tps = 309.281539 (without initial connection time)
if line.startswith("tps = ") and (
"(excluding connections establishing)" in line
or "(without initial connection time)" in line
or "(without initial connection time)"
):
tps = float(line.split()[2])
@@ -135,17 +137,6 @@ class PgBenchRunResult:
@dataclasses.dataclass
class PgBenchInitResult:
REGEX: ClassVar[re.Pattern] = re.compile( # type: ignore[type-arg]
r"done in (\d+\.\d+) s "
r"\("
r"(?:drop tables (\d+\.\d+) s)?(?:, )?"
r"(?:create tables (\d+\.\d+) s)?(?:, )?"
r"(?:client-side generate (\d+\.\d+) s)?(?:, )?"
r"(?:vacuum (\d+\.\d+) s)?(?:, )?"
r"(?:primary keys (\d+\.\d+) s)?(?:, )?"
r"\)\."
)
total: float
drop_tables: Optional[float]
create_tables: Optional[float]
@@ -169,7 +160,18 @@ class PgBenchInitResult:
last_line = stderr.splitlines()[-1]
if (m := cls.REGEX.match(last_line)) is not None:
regex = re.compile(
r"done in (\d+\.\d+) s "
r"\("
r"(?:drop tables (\d+\.\d+) s)?(?:, )?"
r"(?:create tables (\d+\.\d+) s)?(?:, )?"
r"(?:client-side generate (\d+\.\d+) s)?(?:, )?"
r"(?:vacuum (\d+\.\d+) s)?(?:, )?"
r"(?:primary keys (\d+\.\d+) s)?(?:, )?"
r"\)\."
)
if (m := regex.match(last_line)) is not None:
total, drop_tables, create_tables, client_side_generate, vacuum, primary_keys = [
float(v) for v in m.groups() if v is not None
]
@@ -206,7 +208,7 @@ class NeonBenchmarker:
function by the zenbenchmark fixture
"""
def __init__(self, property_recorder: Callable[[str, object], None]):
def __init__(self, property_recorder):
# property recorder here is a pytest fixture provided by junitxml module
# https://docs.pytest.org/en/6.2.x/reference.html#pytest.junitxml.record_property
self.property_recorder = property_recorder
@@ -234,7 +236,7 @@ class NeonBenchmarker:
)
@contextmanager
def record_duration(self, metric_name: str) -> Iterator[None]:
def record_duration(self, metric_name: str):
"""
Record a duration. Usage:
@@ -335,21 +337,21 @@ class NeonBenchmarker:
f"{prefix}.{metric}", value, unit="s", report=MetricReport.LOWER_IS_BETTER
)
def get_io_writes(self, pageserver: NeonPageserver) -> int:
def get_io_writes(self, pageserver) -> int:
"""
Fetch the "cumulative # of bytes written" metric from the pageserver
"""
metric_name = r'libmetrics_disk_io_bytes_total{io_operation="write"}'
return self.get_int_counter_value(pageserver, metric_name)
def get_peak_mem(self, pageserver: NeonPageserver) -> int:
def get_peak_mem(self, pageserver) -> int:
"""
Fetch the "maxrss" metric from the pageserver
"""
metric_name = r"libmetrics_maxrss_kb"
return self.get_int_counter_value(pageserver, metric_name)
def get_int_counter_value(self, pageserver: NeonPageserver, metric_name: str) -> int:
def get_int_counter_value(self, pageserver, metric_name) -> int:
"""Fetch the value of given int counter from pageserver metrics."""
# TODO: If we start to collect more of the prometheus metrics in the
# performance test suite like this, we should refactor this to load and
@@ -363,9 +365,7 @@ class NeonBenchmarker:
assert matches, f"metric {metric_name} not found"
return int(round(float(matches.group(1))))
def get_timeline_size(
self, repo_dir: Path, tenant_id: TenantId, timeline_id: TimelineId
) -> int:
def get_timeline_size(self, repo_dir: Path, tenant_id: TenantId, timeline_id: TimelineId):
"""
Calculate the on-disk size of a timeline
"""
@@ -379,9 +379,7 @@ class NeonBenchmarker:
return totalbytes
@contextmanager
def record_pageserver_writes(
self, pageserver: NeonPageserver, metric_name: str
) -> Iterator[None]:
def record_pageserver_writes(self, pageserver, metric_name):
"""
Record bytes written by the pageserver during a test.
"""
@@ -398,7 +396,7 @@ class NeonBenchmarker:
@pytest.fixture(scope="function")
def zenbenchmark(record_property: Callable[[str, object], None]) -> Iterator[NeonBenchmarker]:
def zenbenchmark(record_property) -> Iterator[NeonBenchmarker]:
"""
This is a python decorator for benchmark fixtures. It contains functions for
recording measurements, and prints them out at the end.
@@ -407,7 +405,7 @@ def zenbenchmark(record_property: Callable[[str, object], None]) -> Iterator[Neo
yield benchmarker
def pytest_addoption(parser: Parser):
def pytest_addoption(parser):
parser.addoption(
"--out-dir",
dest="out_dir",
@@ -431,9 +429,7 @@ def get_out_path(target_dir: Path, revision: str) -> Path:
# Hook to print the results at the end
@pytest.hookimpl(hookwrapper=True)
def pytest_terminal_summary(
terminalreporter: TerminalReporter, exitstatus: int, config: Config
) -> Iterator[None]:
def pytest_terminal_summary(terminalreporter: TerminalReporter, exitstatus: int, config: Config):
yield
revision = os.getenv("GITHUB_SHA", "local")
platform = os.getenv("PLATFORM", "local")

View File

@@ -1,11 +1,10 @@
from abc import ABC, abstractmethod
from contextlib import _GeneratorContextManager, contextmanager
from contextlib import contextmanager
# Type-related stuff
from typing import Dict, Iterator, List
from typing import Dict, List
import pytest
from _pytest.fixtures import FixtureRequest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.neon_fixtures import NeonEnv, PgBin, PgProtocol, RemotePostgres, VanillaPostgres
from fixtures.pg_stats import PgStatTable
@@ -29,20 +28,19 @@ class PgCompare(ABC):
pass
@property
@abstractmethod
def zenbenchmark(self) -> NeonBenchmarker:
pass
@abstractmethod
def flush(self):
def flush(self) -> None:
pass
@abstractmethod
def report_peak_memory_use(self):
def report_peak_memory_use(self) -> None:
pass
@abstractmethod
def report_size(self):
def report_size(self) -> None:
pass
@contextmanager
@@ -56,7 +54,7 @@ class PgCompare(ABC):
pass
@contextmanager
def record_pg_stats(self, pg_stats: List[PgStatTable]) -> Iterator[None]:
def record_pg_stats(self, pg_stats: List[PgStatTable]):
init_data = self._retrieve_pg_stats(pg_stats)
yield
@@ -86,11 +84,7 @@ class NeonCompare(PgCompare):
"""PgCompare interface for the neon stack."""
def __init__(
self,
zenbenchmark: NeonBenchmarker,
neon_simple_env: NeonEnv,
pg_bin: PgBin,
branch_name: str,
self, zenbenchmark: NeonBenchmarker, neon_simple_env: NeonEnv, pg_bin: PgBin, branch_name
):
self.env = neon_simple_env
self._zenbenchmark = zenbenchmark
@@ -103,15 +97,15 @@ class NeonCompare(PgCompare):
self.timeline = self.pg.safe_psql("SHOW neon.timeline_id")[0][0]
@property
def pg(self) -> PgProtocol:
def pg(self):
return self._pg
@property
def zenbenchmark(self) -> NeonBenchmarker:
def zenbenchmark(self):
return self._zenbenchmark
@property
def pg_bin(self) -> PgBin:
def pg_bin(self):
return self._pg_bin
def flush(self):
@@ -120,7 +114,7 @@ class NeonCompare(PgCompare):
def compact(self):
self.pageserver_http_client.timeline_compact(self.env.initial_tenant, self.timeline)
def report_peak_memory_use(self):
def report_peak_memory_use(self) -> None:
self.zenbenchmark.record(
"peak_mem",
self.zenbenchmark.get_peak_mem(self.env.pageserver) / 1024,
@@ -128,7 +122,7 @@ class NeonCompare(PgCompare):
report=MetricReport.LOWER_IS_BETTER,
)
def report_size(self):
def report_size(self) -> None:
timeline_size = self.zenbenchmark.get_timeline_size(
self.env.repo_dir, self.env.initial_tenant, self.timeline
)
@@ -150,17 +144,17 @@ class NeonCompare(PgCompare):
"num_files_uploaded", total_files, "", report=MetricReport.LOWER_IS_BETTER
)
def record_pageserver_writes(self, out_name: str) -> _GeneratorContextManager[None]:
def record_pageserver_writes(self, out_name):
return self.zenbenchmark.record_pageserver_writes(self.env.pageserver, out_name)
def record_duration(self, out_name: str) -> _GeneratorContextManager[None]:
def record_duration(self, out_name):
return self.zenbenchmark.record_duration(out_name)
class VanillaCompare(PgCompare):
"""PgCompare interface for vanilla postgres."""
def __init__(self, zenbenchmark: NeonBenchmarker, vanilla_pg: VanillaPostgres):
def __init__(self, zenbenchmark, vanilla_pg: VanillaPostgres):
self._pg = vanilla_pg
self._zenbenchmark = zenbenchmark
vanilla_pg.configure(
@@ -176,24 +170,24 @@ class VanillaCompare(PgCompare):
self.cur = self.conn.cursor()
@property
def pg(self) -> PgProtocol:
def pg(self):
return self._pg
@property
def zenbenchmark(self) -> NeonBenchmarker:
def zenbenchmark(self):
return self._zenbenchmark
@property
def pg_bin(self) -> PgBin:
def pg_bin(self):
return self._pg.pg_bin
def flush(self):
self.cur.execute("checkpoint")
def report_peak_memory_use(self):
def report_peak_memory_use(self) -> None:
pass # TODO find something
def report_size(self):
def report_size(self) -> None:
data_size = self.pg.get_subdir_size("base")
self.zenbenchmark.record(
"data_size", data_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER
@@ -204,17 +198,17 @@ class VanillaCompare(PgCompare):
)
@contextmanager
def record_pageserver_writes(self, out_name: str) -> Iterator[None]:
def record_pageserver_writes(self, out_name):
yield # Do nothing
def record_duration(self, out_name: str) -> _GeneratorContextManager[None]:
def record_duration(self, out_name):
return self.zenbenchmark.record_duration(out_name)
class RemoteCompare(PgCompare):
"""PgCompare interface for a remote postgres instance."""
def __init__(self, zenbenchmark: NeonBenchmarker, remote_pg: RemotePostgres):
def __init__(self, zenbenchmark, remote_pg: RemotePostgres):
self._pg = remote_pg
self._zenbenchmark = zenbenchmark
@@ -223,60 +217,55 @@ class RemoteCompare(PgCompare):
self.cur = self.conn.cursor()
@property
def pg(self) -> PgProtocol:
def pg(self):
return self._pg
@property
def zenbenchmark(self) -> NeonBenchmarker:
def zenbenchmark(self):
return self._zenbenchmark
@property
def pg_bin(self) -> PgBin:
def pg_bin(self):
return self._pg.pg_bin
def flush(self):
# TODO: flush the remote pageserver
pass
def report_peak_memory_use(self):
def report_peak_memory_use(self) -> None:
# TODO: get memory usage from remote pageserver
pass
def report_size(self):
def report_size(self) -> None:
# TODO: get storage size from remote pageserver
pass
@contextmanager
def record_pageserver_writes(self, out_name: str) -> Iterator[None]:
def record_pageserver_writes(self, out_name):
yield # Do nothing
def record_duration(self, out_name: str) -> _GeneratorContextManager[None]:
def record_duration(self, out_name):
return self.zenbenchmark.record_duration(out_name)
@pytest.fixture(scope="function")
def neon_compare(
request: FixtureRequest,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
neon_simple_env: NeonEnv,
) -> NeonCompare:
def neon_compare(request, zenbenchmark, pg_bin, neon_simple_env) -> NeonCompare:
branch_name = request.node.name
return NeonCompare(zenbenchmark, neon_simple_env, pg_bin, branch_name)
@pytest.fixture(scope="function")
def vanilla_compare(zenbenchmark: NeonBenchmarker, vanilla_pg: VanillaPostgres) -> VanillaCompare:
def vanilla_compare(zenbenchmark, vanilla_pg) -> VanillaCompare:
return VanillaCompare(zenbenchmark, vanilla_pg)
@pytest.fixture(scope="function")
def remote_compare(zenbenchmark: NeonBenchmarker, remote_pg: RemotePostgres) -> RemoteCompare:
def remote_compare(zenbenchmark, remote_pg) -> RemoteCompare:
return RemoteCompare(zenbenchmark, remote_pg)
@pytest.fixture(params=["vanilla_compare", "neon_compare"], ids=["vanilla", "neon"])
def neon_with_baseline(request: FixtureRequest) -> PgCompare:
def neon_with_baseline(request) -> PgCompare:
"""Parameterized fixture that helps compare neon against vanilla postgres.
A test that uses this fixture turns into a parameterized test that runs against:
@@ -297,6 +286,8 @@ def neon_with_baseline(request: FixtureRequest) -> PgCompare:
implementation-specific logic is widely useful across multiple tests, it might
make sense to add methods to the PgCompare class.
"""
fixture = request.getfixturevalue(request.param) # type: ignore
assert isinstance(fixture, PgCompare), f"test error: fixture {fixture} is not PgCompare"
return fixture
fixture = request.getfixturevalue(request.param)
if isinstance(fixture, PgCompare):
return fixture
else:
raise AssertionError(f"test error: fixture {request.param} is not PgCompare")

View File

@@ -1,5 +1,5 @@
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
from typing import Dict, List
from prometheus_client.parser import text_string_to_metric_families
from prometheus_client.samples import Sample
@@ -23,13 +23,13 @@ class Metrics:
pass
return res
def query_one(self, name: str, filter: Optional[Dict[str, str]] = None) -> Sample:
res = self.query_all(name, filter or {})
def query_one(self, name: str, filter: Dict[str, str] = {}) -> Sample:
res = self.query_all(name, filter)
assert len(res) == 1, f"expected single sample for {name} {filter}, found {res}"
return res[0]
def parse_metrics(text: str, name: str = "") -> Metrics:
def parse_metrics(text: str, name: str = ""):
metrics = Metrics(name)
gen = text_string_to_metric_families(text)
for family in gen:
@@ -39,7 +39,7 @@ def parse_metrics(text: str, name: str = "") -> Metrics:
return metrics
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
PAGESERVER_PER_TENANT_METRICS = [
"pageserver_current_logical_size",
"pageserver_current_physical_size",
"pageserver_getpage_reconstruct_seconds_bucket",
@@ -62,4 +62,4 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_wait_lsn_seconds_sum",
"pageserver_created_persistent_files_total",
"pageserver_written_persistent_bytes_total",
)
]

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,3 @@
from functools import cached_property
from typing import List
import pytest
@@ -14,7 +13,7 @@ class PgStatTable:
self.columns = columns
self.additional_query = filter_query
@cached_property
@property
def query(self) -> str:
return f"SELECT {','.join(self.columns)} FROM {self.table} {self.additional_query}"
@@ -56,5 +55,6 @@ def pg_stats_wal() -> List[PgStatTable]:
PgStatTable(
"pg_stat_wal",
["wal_records", "wal_fpi", "wal_bytes", "wal_buffers_full", "wal_write"],
"",
)
]

View File

@@ -1,8 +1,4 @@
from typing import Any, List
import pytest
from _pytest.config import Config
from _pytest.config.argparsing import Parser
"""
This plugin allows tests to be marked as slow using pytest.mark.slow. By default slow
@@ -13,15 +9,15 @@ Copied from here: https://docs.pytest.org/en/latest/example/simple.html
"""
def pytest_addoption(parser: Parser):
def pytest_addoption(parser):
parser.addoption("--runslow", action="store_true", default=False, help="run slow tests")
def pytest_configure(config: Config):
def pytest_configure(config):
config.addinivalue_line("markers", "slow: mark test as slow to run")
def pytest_collection_modifyitems(config: Config, items: List[Any]):
def pytest_collection_modifyitems(config, items):
if config.getoption("--runslow"):
# --runslow given in cli: do not skip slow tests
return

View File

@@ -1,8 +1,6 @@
import random
from functools import total_ordering
from typing import Any, Type, TypeVar, Union
T = TypeVar("T", bound="Id")
from typing import Union
@total_ordering
@@ -19,35 +17,31 @@ class Lsn:
"""Convert lsn from hex notation to int."""
l, r = x.split("/")
self.lsn_int = (int(l, 16) << 32) + int(r, 16)
assert 0 <= self.lsn_int <= 0xFFFFFFFF_FFFFFFFF
# FIXME: error if it doesn't look like a valid LSN
def __str__(self) -> str:
def __str__(self):
"""Convert lsn from int to standard hex notation."""
return f"{(self.lsn_int >> 32):X}/{(self.lsn_int & 0xFFFFFFFF):X}"
return "{:X}/{:X}".format(self.lsn_int >> 32, self.lsn_int & 0xFFFFFFFF)
def __repr__(self) -> str:
return f'Lsn("{str(self)}")'
def __repr__(self):
return 'Lsn("{:X}/{:X}")'.format(self.lsn_int >> 32, self.lsn_int & 0xFFFFFFFF)
def __int__(self) -> int:
def __int__(self):
return self.lsn_int
def __lt__(self, other: Any) -> bool:
if not isinstance(other, Lsn):
return NotImplemented
def __lt__(self, other: "Lsn") -> bool:
return self.lsn_int < other.lsn_int
def __eq__(self, other: Any) -> bool:
def __eq__(self, other) -> bool:
if not isinstance(other, Lsn):
return NotImplemented
return self.lsn_int == other.lsn_int
# Returns the difference between two Lsns, in bytes
def __sub__(self, other: Any) -> int:
if not isinstance(other, Lsn):
return NotImplemented
def __sub__(self, other: "Lsn") -> int:
return self.lsn_int - other.lsn_int
def __hash__(self) -> int:
def __hash__(self):
return hash(self.lsn_int)
@@ -63,7 +57,7 @@ class Id:
self.id = bytearray.fromhex(x)
assert len(self.id) == 16
def __str__(self) -> str:
def __str__(self):
return self.id.hex()
def __lt__(self, other) -> bool:
@@ -76,20 +70,20 @@ class Id:
return NotImplemented
return self.id == other.id
def __hash__(self) -> int:
def __hash__(self):
return hash(str(self.id))
@classmethod
def generate(cls: Type[T]) -> T:
def generate(cls):
"""Generate a random ID"""
return cls(random.randbytes(16).hex())
class TenantId(Id):
def __repr__(self) -> str:
def __repr__(self):
return f'`TenantId("{self.id.hex()}")'
class TimelineId(Id):
def __repr__(self) -> str:
def __repr__(self):
return f'TimelineId("{self.id.hex()}")'

View File

@@ -6,7 +6,7 @@ import subprocess
import tarfile
import time
from pathlib import Path
from typing import Any, Callable, Dict, List, Tuple, TypeVar
from typing import Any, Callable, List, Tuple, TypeVar
import allure # type: ignore
from fixtures.log_helper import log
@@ -15,12 +15,12 @@ from psycopg2.extensions import cursor
Fn = TypeVar("Fn", bound=Callable[..., Any])
def get_self_dir() -> Path:
def get_self_dir() -> str:
"""Get the path to the directory where this script lives."""
return Path(__file__).resolve().parent
return os.path.dirname(os.path.abspath(__file__))
def subprocess_capture(capture_dir: Path, cmd: List[str], **kwargs: Any) -> str:
def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str:
"""Run a process and capture its output
Output will go to files named "cmd_NNN.stdout" and "cmd_NNN.stderr"
@@ -30,11 +30,11 @@ def subprocess_capture(capture_dir: Path, cmd: List[str], **kwargs: Any) -> str:
If those files already exist, we will overwrite them.
Returns basepath for files with captured output.
"""
assert isinstance(cmd, list)
base = f"{os.path.basename(cmd[0])}_{global_counter()}"
assert type(cmd) is list
base = os.path.basename(cmd[0]) + "_{}".format(global_counter())
basepath = os.path.join(capture_dir, base)
stdout_filename = f"{basepath}.stdout"
stderr_filename = f"{basepath}.stderr"
stdout_filename = basepath + ".stdout"
stderr_filename = basepath + ".stderr"
try:
with open(stdout_filename, "w") as stdout_f:
@@ -64,7 +64,7 @@ def global_counter() -> int:
return _global_counter
def print_gc_result(row: Dict[str, Any]):
def print_gc_result(row):
log.info("GC duration {elapsed} ms".format_map(row))
log.info(
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_pitr {layers_needed_by_pitr}"
@@ -78,7 +78,8 @@ def etcd_path() -> Path:
path_output = shutil.which("etcd")
if path_output is None:
raise RuntimeError("etcd not found in PATH")
return Path(path_output)
else:
return Path(path_output)
def query_scalar(cur: cursor, query: str) -> Any:
@@ -123,6 +124,7 @@ def get_timeline_dir_size(path: Path) -> int:
# file is a delta layer
_ = parse_delta_layer(dir_entry.name)
sz += dir_entry.stat().st_size
continue
return sz
@@ -155,8 +157,8 @@ def get_scale_for_db(size_mb: int) -> int:
return round(0.06689 * size_mb - 0.5)
ATTACHMENT_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
r"flamegraph\.svg|regression\.diffs|.+\.(?:log|stderr|stdout|filediff|metrics|html)"
ATTACHMENT_NAME_REGEX = re.compile(
r".+\.log|.+\.stderr|.+\.stdout|.+\.filediff|.+\.metrics|flamegraph\.svg|regression\.diffs|.+\.html"
)

View File

@@ -1,22 +1,3 @@
# Running locally
First make a release build. The profiling flag is optional, used only for tests that
generate flame graphs. The `-s` flag just silences a lot of output, and makes it
easier to see if you have compile errors without scrolling up.
`BUILD_TYPE=release CARGO_BUILD_FLAGS="--features=testing,profiling" make -s -j8`
NOTE: the `profiling` flag only works on linux because we use linux-specific
libc APIs like `libc::timer_t`.
Then run the tests
`NEON_BIN=./target/release poetry run pytest test_runner/performance"`
Some handy pytest flags for local development:
- `-x` tells pytest to stop on first error
- `-s` shows test output
- `-k` selects a test to run
- `--timeout=0` disables our default timeout of 300s (see `setup.cfg`)
# What performance tests do we have and how we run them
Performance tests are built using the same infrastructure as our usual python integration tests. There are some extra fixtures that help to collect performance metrics, and to run tests against both vanilla PostgreSQL and Neon for comparison.

View File

@@ -1,31 +0,0 @@
from contextlib import closing
from fixtures.neon_fixtures import NeonEnvBuilder
# This test demonstrates how to collect a read trace. It's useful until
# it gets replaced by a test that actually does stuff with the trace.
def test_read_request_tracing(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant, _ = env.neon_cli.create_tenant(
conf={
"trace_read_requests": "true",
}
)
timeline = env.neon_cli.create_timeline("test_trace_replay", tenant_id=tenant)
pg = env.postgres.create_start("test_trace_replay", "main", tenant)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t (i integer);")
cur.execute(f"insert into t values (generate_series(1,{10000}));")
cur.execute("select count(*) from t;")
# Stop pg so we drop the connection and flush the traces
pg.stop()
trace_path = env.repo_dir / "traces" / str(tenant) / str(timeline)
assert trace_path.exists()

View File

@@ -2,7 +2,7 @@ import statistics
import threading
import time
import timeit
from typing import Any, Callable, List
from typing import Callable
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
@@ -197,7 +197,7 @@ def record_lsn_write_lag(env: PgCompare, run_cond: Callable[[], bool], pool_inte
if not isinstance(env, NeonCompare):
return
lsn_write_lags: List[Any] = []
lsn_write_lags = []
last_received_lsn = Lsn(0)
last_pg_flush_lsn = Lsn(0)
@@ -216,7 +216,6 @@ def record_lsn_write_lag(env: PgCompare, run_cond: Callable[[], bool], pool_inte
)
res = cur.fetchone()
assert isinstance(res, tuple)
lsn_write_lags.append(res[0])
curr_received_lsn = Lsn(res[3])

View File

@@ -24,6 +24,7 @@ if __name__ == "__main__":
if (v := os.environ.get(k, None)) is not None
}
row = asyncio.run(run(**kwargs))
loop = asyncio.new_event_loop()
row = loop.run_until_complete(run(**kwargs))
print(row[0])

View File

@@ -46,9 +46,9 @@ def test_pg_clients(test_output_dir: Path, remote_pg: RemotePostgres, client: st
raise RuntimeError("docker is required for running this test")
build_cmd = [docker_bin, "build", "--tag", image_tag, f"{Path(__file__).parent / client}"]
subprocess_capture(test_output_dir, build_cmd, check=True)
subprocess_capture(str(test_output_dir), build_cmd, check=True)
run_cmd = [docker_bin, "run", "--rm", "--env-file", env_file, image_tag]
basepath = subprocess_capture(test_output_dir, run_cmd, check=True)
basepath = subprocess_capture(str(test_output_dir), run_cmd, check=True)
assert Path(f"{basepath}.stdout").read_text().strip() == "1"

View File

@@ -1,12 +1,12 @@
import os
import re
import shutil
import subprocess
from pathlib import Path
from typing import Any
from typing import Any, Dict, Union
import pytest
import toml # TODO: replace with tomllib for Python >= 3.11
from fixtures.log_helper import log
import toml
from fixtures.neon_fixtures import (
NeonCli,
NeonEnvBuilder,
@@ -19,185 +19,89 @@ from fixtures.neon_fixtures import (
from fixtures.types import Lsn
from pytest import FixtureRequest
#
# A test suite that help to prevent unintentionally breaking backward or forward compatibility between Neon releases.
# - `test_create_snapshot` a script wrapped in a test that creates a data snapshot.
# - `test_backward_compatibility` checks that the current version of Neon can start/read/interract with a data snapshot created by the previous version.
# The path to the snapshot is configured by COMPATIBILITY_SNAPSHOT_DIR environment variable.
# If the breakage is intentional, the test can be xfaild with setting ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE=true.
# - `test_forward_compatibility` checks that a snapshot created by the current version can be started/read/interracted by the previous version of Neon.
# Paths to Neon and Postgres are configured by COMPATIBILITY_NEON_BIN and COMPATIBILITY_POSTGRES_DISTRIB_DIR environment variables.
# If the breakage is intentional, the test can be xfaild with setting ALLOW_FORWARD_COMPATIBILITY_BREAKAGE=true.
#
# The file contains a couple of helper functions:
# - prepare_snapshot copies the snapshot, cleans it up and makes it ready for the current version of Neon (replaces paths and ports in config files).
# - check_neon_works performs the test itself, feel free to add more checks there.
#
DEFAILT_LOCAL_SNAPSHOT_DIR = "test_output/test_prepare_snapshot/compatibility_snapshot_pg14"
# Note: if renaming this test, don't forget to update a reference to it in a workflow file:
# "Upload compatibility snapshot" step in .github/actions/run-python-test-set/action.yml
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(before="test_forward_compatibility")
def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_output_dir: Path):
# The test doesn't really test anything
# it creates a new snapshot for releases after we tested the current version against the previous snapshot in `test_backward_compatibility`.
#
# There's no cleanup here, it allows to adjust the data in `test_backward_compatibility` itself without re-collecting it.
neon_env_builder.pg_version = "14"
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_local_fs_remote_storage()
def dump_differs(first: Path, second: Path, output: Path) -> bool:
"""
Runs diff(1) command on two SQL dumps and write the output to the given output file.
Returns True if the dumps differ, False otherwise.
"""
env = neon_env_builder.init_start()
pg = env.postgres.create_start("main")
pg_bin.run(["pgbench", "--initialize", "--scale=10", pg.connstr()])
pg_bin.run(["pgbench", "--time=60", "--progress=2", pg.connstr()])
pg_bin.run(["pg_dumpall", f"--dbname={pg.connstr()}", f"--file={test_output_dir / 'dump.sql'}"])
with output.open("w") as stdout:
rv = subprocess.run(
[
"diff",
"--unified", # Make diff output more readable
"--ignore-matching-lines=^--", # Ignore changes in comments
"--ignore-blank-lines",
str(first),
str(second),
],
stdout=stdout,
)
snapshot_config = toml.load(test_output_dir / "repo" / "config")
tenant_id = snapshot_config["default_tenant_id"]
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
pageserver_http = env.pageserver.http_client()
lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, lsn)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, lsn)
env.postgres.stop_all()
for sk in env.safekeepers:
sk.stop()
env.pageserver.stop()
shutil.copytree(test_output_dir, test_output_dir / "compatibility_snapshot_pg14")
# Directory `test_output_dir / "compatibility_snapshot_pg14"` is uploaded to S3 in a workflow, keep the name in sync with it
return rv.returncode != 0
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
class PortReplacer(object):
"""
Class-helper for replacing ports in config files.
"""
def __init__(self, port_distributor: PortDistributor):
self.port_distributor = port_distributor
self.port_map: Dict[int, int] = {}
def replace_port(self, value: Union[int, str]) -> Union[int, str]:
if isinstance(value, int):
if (known_port := self.port_map.get(value)) is not None:
return known_port
self.port_map[value] = self.port_distributor.get_port()
return self.port_map[value]
if isinstance(value, str):
# Use regex to find port in a string
# urllib.parse.urlparse produces inconvenient results for cases without scheme like "localhost:5432"
# See https://bugs.python.org/issue27657
ports = re.findall(r":(\d+)(?:/|$)", value)
assert len(ports) == 1, f"can't find port in {value}"
port_int = int(ports[0])
if (known_port := self.port_map.get(port_int)) is not None:
return value.replace(f":{port_int}", f":{known_port}")
self.port_map[port_int] = self.port_distributor.get_port()
return value.replace(f":{port_int}", f":{self.port_map[port_int]}")
raise TypeError(f"unsupported type {type(value)} of {value=}")
@pytest.mark.order(after="test_prepare_snapshot")
def test_backward_compatibility(
pg_bin: PgBin,
port_distributor: PortDistributor,
test_output_dir: Path,
neon_binpath: Path,
pg_distrib_dir: Path,
pg_version: str,
request: FixtureRequest,
pg_bin: PgBin, port_distributor: PortDistributor, test_output_dir: Path, request: FixtureRequest
):
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
assert (
compatibility_snapshot_dir_env is not None
), "COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg14` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)"
compatibility_snapshot_dir = Path(compatibility_snapshot_dir_env).resolve()
# Copy the snapshot to current directory, and prepare for the test
prepare_snapshot(
from_dir=compatibility_snapshot_dir,
to_dir=test_output_dir / "compatibility_snapshot",
port_distributor=port_distributor,
compatibility_snapshot_dir = Path(
os.environ.get("COMPATIBILITY_SNAPSHOT_DIR", DEFAILT_LOCAL_SNAPSHOT_DIR)
)
breaking_changes_allowed = (
os.environ.get("ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"
assert compatibility_snapshot_dir.exists(), (
f"{compatibility_snapshot_dir} doesn't exist. Please run `test_prepare_snapshot` test first "
"to create the snapshot or set COMPATIBILITY_SNAPSHOT_DIR env variable to the existing snapshot"
)
try:
check_neon_works(
test_output_dir / "compatibility_snapshot" / "repo",
neon_binpath,
pg_distrib_dir,
pg_version,
port_distributor,
test_output_dir,
pg_bin,
request,
)
except Exception:
if breaking_changes_allowed:
pytest.xfail(
"Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE env var"
)
else:
raise
compatibility_snapshot_dir = compatibility_snapshot_dir.resolve()
assert (
not breaking_changes_allowed
), "Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
# Make compatibility snapshot artifacts pickupable by Allure
# by copying the snapshot directory to the curent test output directory.
repo_dir = test_output_dir / "compatibility_snapshot" / "repo"
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
def test_forward_compatibility(
test_output_dir: Path,
port_distributor: PortDistributor,
pg_version: str,
request: FixtureRequest,
):
compatibility_neon_bin_env = os.environ.get("COMPATIBILITY_NEON_BIN")
assert compatibility_neon_bin_env is not None, (
"COMPATIBILITY_NEON_BIN is not set. It should be set to a path with Neon binaries "
"(ideally generated by the previous version of Neon)"
)
compatibility_neon_bin = Path(compatibility_neon_bin_env).resolve()
compatibility_postgres_distrib_dir_env = os.environ.get("COMPATIBILITY_POSTGRES_DISTRIB_DIR")
assert (
compatibility_postgres_distrib_dir_env is not None
), "COMPATIBILITY_POSTGRES_DISTRIB_DIR is not set. It should be set to a pg_install directrory (ideally generated by the previous version of Neon)"
compatibility_postgres_distrib_dir = Path(compatibility_postgres_distrib_dir_env).resolve()
compatibility_snapshot_dir = (
test_output_dir.parent / "test_create_snapshot" / "compatibility_snapshot_pg14"
)
# Copy the snapshot to current directory, and prepare for the test
prepare_snapshot(
from_dir=compatibility_snapshot_dir,
to_dir=test_output_dir / "compatibility_snapshot",
port_distributor=port_distributor,
)
breaking_changes_allowed = (
os.environ.get("ALLOW_FORWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"
)
try:
check_neon_works(
test_output_dir / "compatibility_snapshot" / "repo",
compatibility_neon_bin,
compatibility_postgres_distrib_dir,
pg_version,
port_distributor,
test_output_dir,
PgBin(test_output_dir, compatibility_postgres_distrib_dir, pg_version),
request,
)
except Exception:
if breaking_changes_allowed:
pytest.xfail(
"Breaking changes are allowed by ALLOW_FORWARD_COMPATIBILITY_BREAKAGE env var"
)
else:
raise
assert (
not breaking_changes_allowed
), "Breaking changes are allowed by ALLOW_FORWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
def prepare_snapshot(from_dir: Path, to_dir: Path, port_distributor: PortDistributor):
assert from_dir.exists(), f"Snapshot '{from_dir}' doesn't exist"
assert (from_dir / "repo").exists(), f"Snapshot '{from_dir}' doesn't contain a repo directory"
assert (from_dir / "dump.sql").exists(), f"Snapshot '{from_dir}' doesn't contain a dump.sql"
log.info(f"Copying snapshot from {from_dir} to {to_dir}")
shutil.copytree(from_dir, to_dir)
repo_dir = to_dir / "repo"
shutil.copytree(compatibility_snapshot_dir / "repo", repo_dir)
# Remove old logs to avoid confusion in test artifacts
for logfile in repo_dir.glob("**/*.log"):
logfile.unlink()
# Remove tenants data for compute
# Remove tenants data for computes
for tenant in (repo_dir / "pgdatadirs" / "tenants").glob("*"):
shutil.rmtree(tenant)
@@ -206,17 +110,20 @@ def prepare_snapshot(from_dir: Path, to_dir: Path, port_distributor: PortDistrib
shutil.rmtree(tenant / "wal-redo-datadir.___temp")
# Update paths and ports in config files
pr = PortReplacer(port_distributor)
pageserver_toml = repo_dir / "pageserver.toml"
pageserver_config = toml.load(pageserver_toml)
pageserver_config["remote_storage"]["local_path"] = repo_dir / "local_fs_remote_storage"
pageserver_config["listen_http_addr"] = port_distributor.replace_with_new_port(
pageserver_config["listen_http_addr"]
)
pageserver_config["listen_pg_addr"] = port_distributor.replace_with_new_port(
pageserver_config["listen_pg_addr"]
new_local_path = pageserver_config["remote_storage"]["local_path"].replace(
"/test_prepare_snapshot/",
"/test_backward_compatibility/compatibility_snapshot/",
)
pageserver_config["remote_storage"]["local_path"] = new_local_path
pageserver_config["listen_http_addr"] = pr.replace_port(pageserver_config["listen_http_addr"])
pageserver_config["listen_pg_addr"] = pr.replace_port(pageserver_config["listen_pg_addr"])
pageserver_config["broker_endpoints"] = [
port_distributor.replace_with_new_port(ep) for ep in pageserver_config["broker_endpoints"]
pr.replace_port(ep) for ep in pageserver_config["broker_endpoints"]
]
with pageserver_toml.open("w") as f:
@@ -225,18 +132,17 @@ def prepare_snapshot(from_dir: Path, to_dir: Path, port_distributor: PortDistrib
snapshot_config_toml = repo_dir / "config"
snapshot_config = toml.load(snapshot_config_toml)
snapshot_config["etcd_broker"]["broker_endpoints"] = [
port_distributor.replace_with_new_port(ep)
for ep in snapshot_config["etcd_broker"]["broker_endpoints"]
pr.replace_port(ep) for ep in snapshot_config["etcd_broker"]["broker_endpoints"]
]
snapshot_config["pageserver"]["listen_http_addr"] = port_distributor.replace_with_new_port(
snapshot_config["pageserver"]["listen_http_addr"] = pr.replace_port(
snapshot_config["pageserver"]["listen_http_addr"]
)
snapshot_config["pageserver"]["listen_pg_addr"] = port_distributor.replace_with_new_port(
snapshot_config["pageserver"]["listen_pg_addr"] = pr.replace_port(
snapshot_config["pageserver"]["listen_pg_addr"]
)
for sk in snapshot_config["safekeepers"]:
sk["http_port"] = port_distributor.replace_with_new_port(sk["http_port"])
sk["pg_port"] = port_distributor.replace_with_new_port(sk["pg_port"])
sk["http_port"] = pr.replace_port(sk["http_port"])
sk["pg_port"] = pr.replace_port(sk["pg_port"])
with (snapshot_config_toml).open("w") as f:
toml.dump(snapshot_config, f)
@@ -248,7 +154,7 @@ def prepare_snapshot(from_dir: Path, to_dir: Path, port_distributor: PortDistrib
"--recursive",
"--binary-file=without-match",
"--files-with-matches",
"test_create_snapshot/repo",
"test_prepare_snapshot/repo",
str(repo_dir),
],
capture_output=True,
@@ -256,47 +162,42 @@ def prepare_snapshot(from_dir: Path, to_dir: Path, port_distributor: PortDistrib
)
assert (
rv.returncode != 0
), f"there're files referencing `test_create_snapshot/repo`, this path should be replaced with {repo_dir}:\n{rv.stdout}"
), f"there're files referencing `test_prepare_snapshot/repo`, this path should be replaced with {repo_dir}:\n{rv.stdout}"
def check_neon_works(
repo_dir: Path,
neon_binpath: Path,
pg_distrib_dir: Path,
pg_version: str,
port_distributor: PortDistributor,
test_output_dir: Path,
pg_bin: PgBin,
request: FixtureRequest,
):
snapshot_config_toml = repo_dir / "config"
snapshot_config = toml.load(snapshot_config_toml)
snapshot_config["neon_distrib_dir"] = str(neon_binpath)
snapshot_config["postgres_distrib_dir"] = str(pg_distrib_dir)
with (snapshot_config_toml).open("w") as f:
toml.dump(snapshot_config, f)
# TODO: replace with NeonEnvBuilder / NeonEnv
# NeonEnv stub to make NeonCli happy
config: Any = type("NeonEnvStub", (object,), {})
config.rust_log_override = None
config.repo_dir = repo_dir
config.pg_version = pg_version
config.pg_version = "14" # Note: `pg_dumpall` (from pg_bin) version is set by DEFAULT_PG_VERSION_DEFAULT and can be overriden by DEFAULT_PG_VERSION env var
config.initial_tenant = snapshot_config["default_tenant_id"]
config.neon_binpath = neon_binpath
config.pg_distrib_dir = pg_distrib_dir
# Check that we can start the project
cli = NeonCli(config)
cli.raw_cli(["start"])
request.addfinalizer(lambda: cli.raw_cli(["stop"]))
try:
cli.raw_cli(["start"])
request.addfinalizer(lambda: cli.raw_cli(["stop"]))
pg_port = port_distributor.get_port()
cli.pg_start("main", port=pg_port)
request.addfinalizer(lambda: cli.pg_stop("main"))
result = cli.pg_start("main", port=port_distributor.get_port())
request.addfinalizer(lambda: cli.pg_stop("main"))
except Exception:
breaking_changes_allowed = (
os.environ.get("ALLOW_BREAKING_CHANGES", "false").lower() == "true"
)
if breaking_changes_allowed:
pytest.xfail("Breaking changes are allowed by ALLOW_BREAKING_CHANGES env var")
else:
raise
connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres"
connstr_all = re.findall(r"Starting postgres node at '([^']+)'", result.stdout)
assert len(connstr_all) == 1, f"can't parse connstr from {result.stdout}"
connstr = connstr_all[0]
# Check that the project produces the same dump as the previous version.
# The assert itself deferred to the end of the test
# to allow us to perform checks that change data before failing
pg_bin.run(["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"])
initial_dump_differs = dump_differs(
repo_dir.parent / "dump.sql",
compatibility_snapshot_dir / "dump.sql",
test_output_dir / "dump.sql",
test_output_dir / "dump.filediff",
)
@@ -334,23 +235,38 @@ def check_neon_works(
assert not initial_dump_differs, "initial dump differs"
def dump_differs(first: Path, second: Path, output: Path) -> bool:
"""
Runs diff(1) command on two SQL dumps and write the output to the given output file.
Returns True if the dumps differ, False otherwise.
"""
# Note: if renaming this test, don't forget to update a reference to it in a workflow file:
# "Upload compatibility snapshot" step in .github/actions/run-python-test-set/action.yml
def test_prepare_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_output_dir: Path):
# The test doesn't really test anything
# it creates a new snapshot for releases after we tested the current version against the previous snapshot in `test_backward_compatibility`.
#
# There's no cleanup here, it allows to adjust the data in `test_backward_compatibility` itself without re-collecting it.
neon_env_builder.pg_version = "14"
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_local_fs_remote_storage()
with output.open("w") as stdout:
rv = subprocess.run(
[
"diff",
"--unified", # Make diff output more readable
"--ignore-matching-lines=^--", # Ignore changes in comments
"--ignore-blank-lines",
str(first),
str(second),
],
stdout=stdout,
)
env = neon_env_builder.init_start()
pg = env.postgres.create_start("main")
pg_bin.run(["pgbench", "--initialize", "--scale=10", pg.connstr()])
pg_bin.run(["pgbench", "--time=60", "--progress=2", pg.connstr()])
pg_bin.run(["pg_dumpall", f"--dbname={pg.connstr()}", f"--file={test_output_dir / 'dump.sql'}"])
return rv.returncode != 0
snapshot_config = toml.load(test_output_dir / "repo" / "config")
tenant_id = snapshot_config["default_tenant_id"]
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
pageserver_http = env.pageserver.http_client()
lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, lsn)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, lsn)
env.postgres.stop_all()
for sk in env.safekeepers:
sk.stop()
env.pageserver.stop()
shutil.copytree(test_output_dir, test_output_dir / "compatibility_snapshot_pg14")
# Directory `test_output_dir / "compatibility_snapshot_pg14"` is uploaded to S3 in a workflow, keep the name in sync with it

View File

@@ -1,8 +1,13 @@
import os
from pathlib import Path
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PortDistributor, VanillaPostgres
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
PortDistributor,
VanillaPostgres,
pg_distrib_dir,
)
from fixtures.types import Lsn, TimelineId
from fixtures.utils import query_scalar, subprocess_capture
@@ -11,10 +16,7 @@ num_rows = 1000
# Ensure that regular postgres can start from fullbackup
def test_fullbackup(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
port_distributor: PortDistributor,
pg_distrib_dir: Path,
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, port_distributor: PortDistributor
):
env = neon_env_builder.init_start()
@@ -38,7 +40,7 @@ def test_fullbackup(
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
psql_env = {"LD_LIBRARY_PATH": os.path.join(str(pg_distrib_dir), "lib")}
# Get and unpack fullbackup from pageserver
restored_dir_path = env.repo_dir / "restored_datadir"
@@ -47,7 +49,9 @@ def test_fullbackup(
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
tar_output_file = result_basepath + ".stdout"
subprocess_capture(env.repo_dir, ["tar", "-xf", tar_output_file, "-C", str(restored_dir_path)])
subprocess_capture(
str(env.repo_dir), ["tar", "-xf", tar_output_file, "-C", str(restored_dir_path)]
)
# HACK
# fullbackup returns neon specific pg_control and first WAL segment

View File

@@ -13,6 +13,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
Postgres,
pg_distrib_dir,
wait_for_last_record_lsn,
wait_for_upload,
)
@@ -127,7 +128,7 @@ def test_import_from_pageserver_small(pg_bin: PgBin, neon_env_builder: NeonEnvBu
num_rows = 3000
lsn = _generate_data(num_rows, pg)
_import(num_rows, lsn, env, pg_bin, timeline, env.pg_distrib_dir)
_import(num_rows, lsn, env, pg_bin, timeline)
@pytest.mark.timeout(1800)
@@ -155,7 +156,7 @@ def test_import_from_pageserver_multisegment(pg_bin: PgBin, neon_env_builder: Ne
log.info(f"timeline logical size = {logical_size / (1024 ** 2)}MB")
assert logical_size > 1024**3 # = 1GB
tar_output_file = _import(num_rows, lsn, env, pg_bin, timeline, env.pg_distrib_dir)
tar_output_file = _import(num_rows, lsn, env, pg_bin, timeline)
# Check if the backup data contains multiple segment files
cnt_seg_files = 0
@@ -190,12 +191,7 @@ def _generate_data(num_rows: int, pg: Postgres) -> Lsn:
def _import(
expected_num_rows: int,
lsn: Lsn,
env: NeonEnv,
pg_bin: PgBin,
timeline: TimelineId,
pg_distrib_dir: Path,
expected_num_rows: int, lsn: Lsn, env: NeonEnv, pg_bin: PgBin, timeline: TimelineId
) -> str:
"""Test importing backup data to the pageserver.
@@ -209,7 +205,7 @@ def _import(
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
psql_env = {"LD_LIBRARY_PATH": os.path.join(str(pg_distrib_dir), "lib")}
# Get a fullbackup from pageserver
query = f"fullbackup { env.initial_tenant} {timeline} {lsn}"

View File

@@ -1,5 +1,5 @@
import pathlib
import subprocess
from pathlib import Path
from typing import Optional
from fixtures.neon_fixtures import (
@@ -7,18 +7,18 @@ from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PageserverHttpClient,
neon_binpath,
pg_distrib_dir,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
# test that we cannot override node id after init
def test_pageserver_init_node_id(
neon_simple_env: NeonEnv, neon_binpath: Path, pg_distrib_dir: Path
):
def test_pageserver_init_node_id(neon_simple_env: NeonEnv):
repo_dir = neon_simple_env.repo_dir
pageserver_config = repo_dir / "pageserver.toml"
pageserver_bin = neon_binpath / "pageserver"
pageserver_bin = pathlib.Path(neon_binpath) / "pageserver"
def run_pageserver(args):
return subprocess.run(

View File

@@ -1,10 +1,11 @@
#
# This file runs pg_regress-based tests.
#
import os
from pathlib import Path
import pytest
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.neon_fixtures import NeonEnv, base_dir, check_restored_datadir_content, pg_distrib_dir
# Run the main PostgreSQL regression tests, in src/test/regress.
@@ -12,14 +13,7 @@ from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
# This runs for a long time, especially in debug mode, so use a larger-than-default
# timeout.
@pytest.mark.timeout(1800)
def test_pg_regress(
neon_simple_env: NeonEnv,
test_output_dir: Path,
pg_bin,
capsys,
base_dir: Path,
pg_distrib_dir: Path,
):
def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, capsys):
env = neon_simple_env
env.neon_cli.create_branch("test_pg_regress", "empty")
@@ -32,20 +26,20 @@ def test_pg_regress(
(runpath / "testtablespace").mkdir(parents=True)
# Compute all the file locations that pg_regress will need.
build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/regress"
src_path = base_dir / f"vendor/postgres-v{env.pg_version}/src/test/regress"
bindir = pg_distrib_dir / f"v{env.pg_version}/bin"
schedule = src_path / "parallel_schedule"
pg_regress = build_path / "pg_regress"
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/regress").format(env.pg_version)
src_path = os.path.join(base_dir, "vendor/postgres-v{}/src/test/regress").format(env.pg_version)
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "parallel_schedule")
pg_regress = os.path.join(build_path, "pg_regress")
pg_regress_command = [
str(pg_regress),
pg_regress,
'--bindir=""',
"--use-existing",
f"--bindir={bindir}",
f"--dlpath={build_path}",
f"--schedule={schedule}",
f"--inputdir={src_path}",
"--bindir={}".format(bindir),
"--dlpath={}".format(build_path),
"--schedule={}".format(schedule),
"--inputdir={}".format(src_path),
]
env_vars = {
@@ -72,14 +66,7 @@ def test_pg_regress(
# This runs for a long time, especially in debug mode, so use a larger-than-default
# timeout.
@pytest.mark.timeout(1800)
def test_isolation(
neon_simple_env: NeonEnv,
test_output_dir: Path,
pg_bin,
capsys,
base_dir: Path,
pg_distrib_dir: Path,
):
def test_isolation(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, capsys):
env = neon_simple_env
env.neon_cli.create_branch("test_isolation", "empty")
@@ -93,19 +80,21 @@ def test_isolation(
(runpath / "testtablespace").mkdir(parents=True)
# Compute all the file locations that pg_isolation_regress will need.
build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/isolation"
src_path = base_dir / f"vendor/postgres-v{env.pg_version}/src/test/isolation"
bindir = pg_distrib_dir / f"v{env.pg_version}/bin"
schedule = src_path / "isolation_schedule"
pg_isolation_regress = build_path / "pg_isolation_regress"
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/isolation".format(env.pg_version))
src_path = os.path.join(
base_dir, "vendor/postgres-v{}/src/test/isolation".format(env.pg_version)
)
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "isolation_schedule")
pg_isolation_regress = os.path.join(build_path, "pg_isolation_regress")
pg_isolation_regress_command = [
str(pg_isolation_regress),
pg_isolation_regress,
"--use-existing",
f"--bindir={bindir}",
f"--dlpath={build_path}",
f"--inputdir={src_path}",
f"--schedule={schedule}",
"--bindir={}".format(bindir),
"--dlpath={}".format(build_path),
"--inputdir={}".format(src_path),
"--schedule={}".format(schedule),
]
env_vars = {
@@ -123,14 +112,7 @@ def test_isolation(
# Run extra Neon-specific pg_regress-based tests. The tests and their
# schedule file are in the sql_regress/ directory.
def test_sql_regress(
neon_simple_env: NeonEnv,
test_output_dir: Path,
pg_bin,
capsys,
base_dir: Path,
pg_distrib_dir: Path,
):
def test_sql_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, capsys):
env = neon_simple_env
env.neon_cli.create_branch("test_sql_regress", "empty")
@@ -144,19 +126,19 @@ def test_sql_regress(
# Compute all the file locations that pg_regress will need.
# This test runs neon specific tests
build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/regress"
src_path = base_dir / "test_runner/sql_regress"
bindir = pg_distrib_dir / f"v{env.pg_version}/bin"
schedule = src_path / "parallel_schedule"
pg_regress = build_path / "pg_regress"
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/regress").format(env.pg_version)
src_path = os.path.join(base_dir, "test_runner/sql_regress")
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "parallel_schedule")
pg_regress = os.path.join(build_path, "pg_regress")
pg_regress_command = [
str(pg_regress),
pg_regress,
"--use-existing",
f"--bindir={bindir}",
f"--dlpath={build_path}",
f"--schedule={schedule}",
f"--inputdir={src_path}",
"--bindir={}".format(bindir),
"--dlpath={}".format(build_path),
"--schedule={}".format(schedule),
"--inputdir={}".format(src_path),
]
env_vars = {

View File

@@ -129,7 +129,6 @@ async def test_psql_session_id(vanilla_pg: VanillaPostgres, link_proxy: NeonProx
create_and_send_db_info(vanilla_pg, psql_session_id, link_proxy.mgmt_port)
assert proc.stdout is not None
out = (await proc.stdout.read()).decode("utf-8").strip()
assert out == "42"

View File

@@ -1,7 +1,7 @@
import os
import pathlib
import threading
from contextlib import closing, contextmanager
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import pytest
@@ -14,6 +14,9 @@ from fixtures.neon_fixtures import (
PortDistributor,
Postgres,
assert_no_in_progress_downloads_for_tenant,
base_dir,
neon_binpath,
pg_distrib_dir,
wait_for_last_record_lsn,
wait_for_upload,
)
@@ -27,13 +30,12 @@ def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
@contextmanager
def new_pageserver_service(
new_pageserver_dir: Path,
pageserver_bin: Path,
remote_storage_mock_path: Path,
new_pageserver_dir: pathlib.Path,
pageserver_bin: pathlib.Path,
remote_storage_mock_path: pathlib.Path,
pg_port: int,
http_port: int,
broker: Optional[Etcd],
pg_distrib_dir: Path,
):
"""
cannot use NeonPageserver yet because it depends on neon cli
@@ -191,10 +193,10 @@ def switch_pg_to_new_pageserver(
new_pageserver_port: int,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Path:
) -> pathlib.Path:
pg.stop()
pg_config_file_path = Path(pg.config_file_path())
pg_config_file_path = pathlib.Path(pg.config_file_path())
pg_config_file_path.open("a").write(
f"\nneon.pageserver_connstring = 'postgresql://no_user:@localhost:{new_pageserver_port}'"
)
@@ -217,7 +219,7 @@ def switch_pg_to_new_pageserver(
return timeline_to_detach_local_path
def post_migration_check(pg: Postgres, sum_before_migration: int, old_local_path: Path):
def post_migration_check(pg: Postgres, sum_before_migration: int, old_local_path: pathlib.Path):
with pg_cur(pg) as cur:
# check that data is still there
cur.execute("SELECT sum(key) FROM t")
@@ -249,9 +251,7 @@ def post_migration_check(pg: Postgres, sum_before_migration: int, old_local_path
def test_tenant_relocation(
neon_env_builder: NeonEnvBuilder,
port_distributor: PortDistributor,
test_output_dir: Path,
neon_binpath: Path,
base_dir: Path,
test_output_dir,
method: str,
with_load: str,
):
@@ -350,7 +350,7 @@ def test_tenant_relocation(
new_pageserver_pg_port = port_distributor.get_port()
new_pageserver_http_port = port_distributor.get_port()
log.info("new pageserver ports pg %s http %s", new_pageserver_pg_port, new_pageserver_http_port)
pageserver_bin = neon_binpath / "pageserver"
pageserver_bin = pathlib.Path(neon_binpath) / "pageserver"
new_pageserver_http = PageserverHttpClient(
port=new_pageserver_http_port,
@@ -365,7 +365,6 @@ def test_tenant_relocation(
new_pageserver_pg_port,
new_pageserver_http_port,
neon_env_builder.broker,
neon_env_builder.pg_distrib_dir,
):
# Migrate either by attaching from s3 or import/export basebackup
@@ -374,7 +373,7 @@ def test_tenant_relocation(
"poetry",
"run",
"python",
str(base_dir / "scripts/export_import_between_pageservers.py"),
os.path.join(base_dir, "scripts/export_import_between_pageservers.py"),
"--tenant-id",
str(tenant_id),
"--from-host",
@@ -390,9 +389,9 @@ def test_tenant_relocation(
"--to-pg-port",
str(new_pageserver_pg_port),
"--pg-distrib-dir",
str(neon_env_builder.pg_distrib_dir),
pg_distrib_dir,
"--work-dir",
str(test_output_dir),
os.path.join(test_output_dir),
"--tmp-pg-port",
str(port_distributor.get_port()),
]

View File

@@ -1,280 +0,0 @@
import time
from typing import List, Tuple
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PageserverApiException,
wait_for_last_flush_lsn,
)
from fixtures.types import Lsn
def test_empty_tenant_size(neon_simple_env: NeonEnv):
env = neon_simple_env
(tenant_id, _) = env.neon_cli.create_tenant()
http_client = env.pageserver.http_client()
size = http_client.tenant_size(tenant_id)
# we should never have zero, because there should be the initdb however
# this is questionable if we should have anything in this case, as the
# gc_cutoff is negative
assert (
size == 0
), "initial implementation returns zero tenant_size before last_record_lsn is past gc_horizon"
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
with pg.cursor() as cur:
cur.execute("SELECT 1")
row = cur.fetchone()
assert row is not None
assert row[0] == 1
size = http_client.tenant_size(tenant_id)
assert size == 0, "starting idle compute should not change the tenant size"
# the size should be the same, until we increase the size over the
# gc_horizon
size = http_client.tenant_size(tenant_id)
assert size == 0, "tenant_size should not be affected by shutdown of compute"
def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder):
"""
Operate on single branch reading the tenants size after each transaction.
"""
# gc and compaction is not wanted automatically
# the pitr_interval here is quite problematic, so we cannot really use it.
# it'd have to be calibrated per test executing env.
# there was a bug which was hidden if the create table and first batch of
# inserts is larger than gc_horizon. for example 0x20000 here hid the fact
# that there next_gc_cutoff could be smaller than initdb_lsn, which will
# obviously lead to issues when calculating the size.
gc_horizon = 0x30000
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='1h', gc_period='1h', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
branch_name, timeline_id = env.neon_cli.list_timelines(tenant_id)[0]
http_client = env.pageserver.http_client()
collected_responses: List[Tuple[Lsn, int]] = []
with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg:
with pg.cursor() as cur:
cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL)")
batch_size = 100
i = 0
while True:
with pg.cursor() as cur:
cur.execute(
f"INSERT INTO t0(i) SELECT i FROM generate_series({batch_size} * %s, ({batch_size} * (%s + 1)) - 1) s(i)",
(i, i),
)
i += 1
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
size = http_client.tenant_size(tenant_id)
if len(collected_responses) > 0:
prev = collected_responses[-1][1]
if size == 0:
assert prev == 0
else:
assert size > prev
collected_responses.append((current_lsn, size))
if len(collected_responses) > 2:
break
while True:
with pg.cursor() as cur:
cur.execute(
f"UPDATE t0 SET i = -i WHERE i IN (SELECT i FROM t0 WHERE i > 0 LIMIT {batch_size})"
)
updated = cur.rowcount
if updated == 0:
break
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
size = http_client.tenant_size(tenant_id)
prev = collected_responses[-1][1]
assert size > prev, "tenant_size should grow with updates"
collected_responses.append((current_lsn, size))
while True:
with pg.cursor() as cur:
cur.execute(f"DELETE FROM t0 WHERE i IN (SELECT i FROM t0 LIMIT {batch_size})")
deleted = cur.rowcount
if deleted == 0:
break
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
size = http_client.tenant_size(tenant_id)
prev = collected_responses[-1][1]
assert (
size > prev
), "even though rows have been deleted, the tenant_size should increase"
collected_responses.append((current_lsn, size))
with pg.cursor() as cur:
cur.execute("DROP TABLE t0")
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
size = http_client.tenant_size(tenant_id)
prev = collected_responses[-1][1]
assert size > prev, "dropping table grows tenant_size"
collected_responses.append((current_lsn, size))
# this isn't too many lines to forget for a while. observed while
# developing these tests that locally the value is a bit more than what we
# get in the ci.
for lsn, size in collected_responses:
log.info(f"collected: {lsn}, {size}")
env.pageserver.stop()
env.pageserver.start()
size_after = http_client.tenant_size(tenant_id)
prev = collected_responses[-1][1]
assert size_after == prev, "size after restarting pageserver should not have changed"
def test_get_tenant_size_with_multiple_branches(neon_env_builder: NeonEnvBuilder):
"""
Reported size goes up while branches or rows are being added, goes down after removing branches.
"""
gc_horizon = 128 * 1024
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='1h', gc_period='1h', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
main_branch_name, main_timeline_id = env.neon_cli.list_timelines(tenant_id)[0]
http_client = env.pageserver.http_client()
main_pg = env.postgres.create_start(main_branch_name, tenant_id=tenant_id)
batch_size = 10000
with main_pg.cursor() as cur:
cur.execute(
f"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, {batch_size}) s(i)"
)
wait_for_last_flush_lsn(env, main_pg, tenant_id, main_timeline_id)
size_at_branch = http_client.tenant_size(tenant_id)
assert size_at_branch > 0
first_branch_timeline_id = env.neon_cli.create_branch(
"first-branch", main_branch_name, tenant_id
)
# unsure why this happens, the size difference is more than a page alignment
size_after_first_branch = http_client.tenant_size(tenant_id)
assert size_after_first_branch > size_at_branch
assert size_after_first_branch - size_at_branch == gc_horizon
first_branch_pg = env.postgres.create_start("first-branch", tenant_id=tenant_id)
with first_branch_pg.cursor() as cur:
cur.execute(
f"CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, {batch_size}) s(i)"
)
wait_for_last_flush_lsn(env, first_branch_pg, tenant_id, first_branch_timeline_id)
size_after_growing_first_branch = http_client.tenant_size(tenant_id)
assert size_after_growing_first_branch > size_after_first_branch
with main_pg.cursor() as cur:
cur.execute(
f"CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 2*{batch_size}) s(i)"
)
wait_for_last_flush_lsn(env, main_pg, tenant_id, main_timeline_id)
size_after_continuing_on_main = http_client.tenant_size(tenant_id)
assert size_after_continuing_on_main > size_after_growing_first_branch
second_branch_timeline_id = env.neon_cli.create_branch(
"second-branch", main_branch_name, tenant_id
)
size_after_second_branch = http_client.tenant_size(tenant_id)
assert size_after_second_branch > size_after_continuing_on_main
second_branch_pg = env.postgres.create_start("second-branch", tenant_id=tenant_id)
with second_branch_pg.cursor() as cur:
cur.execute(
f"CREATE TABLE t2 AS SELECT i::bigint n FROM generate_series(0, 3*{batch_size}) s(i)"
)
wait_for_last_flush_lsn(env, second_branch_pg, tenant_id, second_branch_timeline_id)
size_after_growing_second_branch = http_client.tenant_size(tenant_id)
assert size_after_growing_second_branch > size_after_second_branch
with second_branch_pg.cursor() as cur:
cur.execute("DROP TABLE t0")
cur.execute("DROP TABLE t1")
cur.execute("VACUUM FULL")
wait_for_last_flush_lsn(env, second_branch_pg, tenant_id, second_branch_timeline_id)
size_after_thinning_branch = http_client.tenant_size(tenant_id)
assert (
size_after_thinning_branch > size_after_growing_second_branch
), "tenant_size should grow with dropped tables and full vacuum"
first_branch_pg.stop_and_destroy()
second_branch_pg.stop_and_destroy()
main_pg.stop()
env.pageserver.stop()
env.pageserver.start()
# chance of compaction and gc on startup might have an effect on the
# tenant_size but so far this has been reliable, even though at least gc
# and tenant_size race for the same locks
size_after = http_client.tenant_size(tenant_id)
assert size_after == size_after_thinning_branch
# teardown, delete branches, and the size should be going down
deleted = False
for _ in range(10):
try:
http_client.timeline_delete(tenant_id, first_branch_timeline_id)
deleted = True
break
except PageserverApiException as e:
# compaction is ok but just retry if this fails; related to #2442
if "cannot lock compaction critical section" in str(e):
time.sleep(1)
continue
raise
assert deleted
size_after_deleting_first = http_client.tenant_size(tenant_id)
assert size_after_deleting_first < size_after_thinning_branch
http_client.timeline_delete(tenant_id, second_branch_timeline_id)
size_after_deleting_second = http_client.tenant_size(tenant_id)
assert size_after_deleting_second < size_after_deleting_first
assert size_after_deleting_second < size_after_continuing_on_main
assert size_after_deleting_second > size_after_first_branch

View File

@@ -338,7 +338,6 @@ def test_timeline_size_metrics(
neon_simple_env: NeonEnv,
test_output_dir: Path,
port_distributor: PortDistributor,
pg_distrib_dir: Path,
pg_version: str,
):
env = neon_simple_env
@@ -383,7 +382,7 @@ def test_timeline_size_metrics(
tl_logical_size_metric = int(matches.group(1))
pgdatadir = test_output_dir / "pgdata-vanilla"
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
pg_bin = PgBin(test_output_dir, pg_version)
port = port_distributor.get_port()
with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
vanilla_pg.configure([f"port={port}"])

View File

@@ -30,6 +30,7 @@ from fixtures.neon_fixtures import (
SafekeeperHttpClient,
SafekeeperPort,
available_remote_storages,
neon_binpath,
wait_for_last_record_lsn,
wait_for_upload,
)
@@ -796,7 +797,6 @@ class SafekeeperEnv:
repo_dir: Path,
port_distributor: PortDistributor,
pg_bin: PgBin,
neon_binpath: Path,
num_safekeepers: int = 1,
):
self.repo_dir = repo_dir
@@ -808,7 +808,7 @@ class SafekeeperEnv:
)
self.pg_bin = pg_bin
self.num_safekeepers = num_safekeepers
self.bin_safekeeper = str(neon_binpath / "safekeeper")
self.bin_safekeeper = os.path.join(str(neon_binpath), "safekeeper")
self.safekeepers: Optional[List[subprocess.CompletedProcess[Any]]] = None
self.postgres: Optional[ProposerPostgres] = None
self.tenant_id: Optional[TenantId] = None
@@ -911,10 +911,7 @@ class SafekeeperEnv:
def test_safekeeper_without_pageserver(
test_output_dir: str,
port_distributor: PortDistributor,
pg_bin: PgBin,
neon_binpath: Path,
test_output_dir: str, port_distributor: PortDistributor, pg_bin: PgBin
):
# Create the environment in the test-specific output dir
repo_dir = Path(os.path.join(test_output_dir, "repo"))
@@ -923,7 +920,6 @@ def test_safekeeper_without_pageserver(
repo_dir,
port_distributor,
pg_bin,
neon_binpath,
)
with env:

View File

@@ -1,6 +1,14 @@
import os
from pathlib import Path
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PortDistributor, VanillaPostgres
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
PortDistributor,
VanillaPostgres,
base_dir,
pg_distrib_dir,
)
from fixtures.types import TenantId
@@ -9,8 +17,6 @@ def test_wal_restore(
pg_bin: PgBin,
test_output_dir: Path,
port_distributor: PortDistributor,
base_dir: Path,
pg_distrib_dir: Path,
):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_wal_restore")
@@ -20,13 +26,11 @@ def test_wal_restore(
env.neon_cli.pageserver_stop()
port = port_distributor.get_port()
data_dir = test_output_dir / "pgsql.restored"
with VanillaPostgres(
data_dir, PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version), port
) as restored:
with VanillaPostgres(data_dir, PgBin(test_output_dir, env.pg_version), port) as restored:
pg_bin.run_capture(
[
str(base_dir / "libs/utils/scripts/restore_from_wal.sh"),
str(pg_distrib_dir / f"v{env.pg_version}/bin"),
os.path.join(base_dir, "libs/utils/scripts/restore_from_wal.sh"),
os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin"),
str(test_output_dir / "repo" / "safekeepers" / "sk1" / str(tenant_id) / "*"),
str(data_dir),
str(port),

View File

@@ -1,104 +0,0 @@
import time
import psutil
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PageserverApiException
from fixtures.types import TenantId
def assert_child_processes(pageserver_pid, wal_redo_present=False, defunct_present=False):
children = psutil.Process(pageserver_pid).children()
for child in children:
if not wal_redo_present:
assert "--wal-redo" not in child.cmdline()
if not defunct_present:
assert child.status() != psutil.STATUS_ZOMBIE
# Check that the pageserver doesn't leave behind WAL redo processes
# when a tenant is detached. We had an issue previously where we failed
# to wait and consume the exit code of the WAL redo process, leaving it behind
# as a zombie process.
def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
pagserver_pid = int((env.repo_dir / "pageserver.pid").read_text())
assert_child_processes(pagserver_pid, wal_redo_present=False, defunct_present=False)
# first check for non existing tenant
tenant_id = TenantId.generate()
with pytest.raises(
expected_exception=PageserverApiException,
match=f"Tenant not found for id {tenant_id}",
):
pageserver_http.tenant_detach(tenant_id)
# create new nenant
tenant_id, _ = env.neon_cli.create_tenant()
# assert tenant exists on disk
assert (env.repo_dir / "tenants" / str(tenant_id)).exists()
pg = env.postgres.create_start("main", tenant_id=tenant_id)
pg_conn = pg.connect()
cur = pg_conn.cursor()
# Create table, and insert some rows. Make it big enough that it doesn't fit in
# shared_buffers, otherwise the SELECT after restart will just return answer
# from shared_buffers without hitting the page server, which defeats the point
# of this test.
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g
"""
)
# Verify that the table is larger than shared_buffers
cur.execute(
"""
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_ize
from pg_settings where name = 'shared_buffers'
"""
)
row = cur.fetchone()
assert row is not None
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])
cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
# After filling the table and doing the SELECT, it is guaranteed that we did some WAL redo.
# So, assert that the WAL redo process is present.
# XXX this is quite brittle as the lifecycle of the WAL redo process is an implementation detail
assert_child_processes(pagserver_pid, wal_redo_present=True, defunct_present=False)
last_error = None
for i in range(3):
try:
pageserver_http.tenant_detach(tenant_id)
except Exception as e:
last_error = e
log.error(f"try {i} error detaching tenant: {e}")
continue
else:
break
# else is called if the loop finished without reaching "break"
else:
pytest.fail(f"could not detach tenant: {last_error}")
# check that nothing is left on disk for deleted tenant
assert not (env.repo_dir / "tenants" / str(tenant_id)).exists()
# Pageserver schedules kill+wait of the WAL redo process to the background runtime,
# asynchronously to tenant detach. Cut it some slack to complete kill+wait before
# checking.
time.sleep(1.0)
assert_child_processes(pagserver_pid, wal_redo_present=False, defunct_present=False)

View File

@@ -21,9 +21,6 @@ clap = { version = "4", features = ["color", "error-context", "help", "std", "st
crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] }
either = { version = "1", features = ["use_std"] }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] }
futures-util = { version = "0.3", default-features = false, features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] }
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
libc = { version = "0.2", features = ["extra_traits", "std"] }
@@ -37,7 +34,6 @@ prost = { version = "0.10", features = ["prost-derive", "std"] }
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
reqwest = { version = "0.11", default-features = false, features = ["__rustls", "__tls", "blocking", "hyper-rustls", "json", "rustls", "rustls-pemfile", "rustls-tls", "rustls-tls-webpki-roots", "serde_json", "tokio-rustls", "webpki-roots"] }
scopeguard = { version = "1", features = ["use_std"] }
serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] }
stable_deref_trait = { version = "1", features = ["alloc", "std"] }