mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-18 07:40:37 +00:00
Compare commits
23 Commits
ktls
...
test-relsi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
61af437087 | ||
|
|
e986896676 | ||
|
|
f7ab3ffcb7 | ||
|
|
2f8d548a12 | ||
|
|
66db381dc9 | ||
|
|
6744ed19d8 | ||
|
|
ae63ac7488 | ||
|
|
6eb638f4b3 | ||
|
|
7a485b599b | ||
|
|
b1c457898b | ||
|
|
1a9d559be8 | ||
|
|
0e6c0d47a5 | ||
|
|
d645645fab | ||
|
|
7c74112b2a | ||
|
|
a968554a8c | ||
|
|
07b7c63975 | ||
|
|
04752dfa75 | ||
|
|
99c19cad24 | ||
|
|
b83d722369 | ||
|
|
d919770c55 | ||
|
|
f4b3c317f3 | ||
|
|
428b105dde | ||
|
|
75175f3628 |
@@ -23,10 +23,30 @@ platforms = [
|
||||
]
|
||||
|
||||
[final-excludes]
|
||||
# vm_monitor benefits from the same Cargo.lock as the rest of our artifacts, but
|
||||
# it is built primarly in separate repo neondatabase/autoscaling and thus is excluded
|
||||
# from depending on workspace-hack because most of the dependencies are not used.
|
||||
workspace-members = ["vm_monitor"]
|
||||
workspace-members = [
|
||||
# vm_monitor benefits from the same Cargo.lock as the rest of our artifacts, but
|
||||
# it is built primarly in separate repo neondatabase/autoscaling and thus is excluded
|
||||
# from depending on workspace-hack because most of the dependencies are not used.
|
||||
"vm_monitor",
|
||||
# All of these exist in libs and are not usually built independently.
|
||||
# Putting workspace hack there adds a bottleneck for cargo builds.
|
||||
"compute_api",
|
||||
"consumption_metrics",
|
||||
"desim",
|
||||
"metrics",
|
||||
"pageserver_api",
|
||||
"postgres_backend",
|
||||
"postgres_connection",
|
||||
"postgres_ffi",
|
||||
"pq_proto",
|
||||
"remote_storage",
|
||||
"safekeeper_api",
|
||||
"tenant_size_model",
|
||||
"tracing-utils",
|
||||
"utils",
|
||||
"wal_craft",
|
||||
"walproposer",
|
||||
]
|
||||
|
||||
# Write out exact versions rather than a semver range. (Defaults to false.)
|
||||
# exact-versions = true
|
||||
|
||||
@@ -169,10 +169,8 @@ runs:
|
||||
EXTRA_PARAMS="--durations-path $TEST_OUTPUT/benchmark_durations.json $EXTRA_PARAMS"
|
||||
fi
|
||||
|
||||
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
|
||||
if [[ $BUILD_TYPE == "debug" && $RUNNER_ARCH == 'X64' ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
|
||||
elif [[ "${{ inputs.build_type }}" == "release" ]]; then
|
||||
cov_prefix=()
|
||||
else
|
||||
cov_prefix=()
|
||||
fi
|
||||
|
||||
15
.github/workflows/_build-and-test-locally.yml
vendored
15
.github/workflows/_build-and-test-locally.yml
vendored
@@ -94,11 +94,16 @@ jobs:
|
||||
# We run tests with addtional features, that are turned off by default (e.g. in release builds), see
|
||||
# corresponding Cargo.toml files for their descriptions.
|
||||
- name: Set env variables
|
||||
env:
|
||||
ARCH: ${{ inputs.arch }}
|
||||
run: |
|
||||
CARGO_FEATURES="--features testing"
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
|
||||
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
|
||||
CARGO_FLAGS="--locked"
|
||||
elif [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=""
|
||||
CARGO_FLAGS="--locked"
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=""
|
||||
CARGO_FLAGS="--locked --release"
|
||||
@@ -158,6 +163,8 @@ jobs:
|
||||
# Do install *before* running rust tests because they might recompile the
|
||||
# binaries with different features/flags.
|
||||
- name: Install rust binaries
|
||||
env:
|
||||
ARCH: ${{ inputs.arch }}
|
||||
run: |
|
||||
# Install target binaries
|
||||
mkdir -p /tmp/neon/bin/
|
||||
@@ -172,7 +179,7 @@ jobs:
|
||||
done
|
||||
|
||||
# Install test executables and write list of all binaries (for code coverage)
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
|
||||
# Keep bloated coverage data files away from the rest of the artifact
|
||||
mkdir -p /tmp/coverage/
|
||||
|
||||
@@ -243,8 +250,8 @@ jobs:
|
||||
uses: ./.github/actions/save-coverage-data
|
||||
|
||||
regress-tests:
|
||||
# Run test on x64 only
|
||||
if: inputs.arch == 'x64'
|
||||
# Don't run regression tests on debug arm64 builds
|
||||
if: inputs.build-type != 'debug' || inputs.arch != 'arm64'
|
||||
needs: [ build-neon ]
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', inputs.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
container:
|
||||
|
||||
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -198,7 +198,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
arch: [ x64 ]
|
||||
arch: [ x64, arm64 ]
|
||||
# Do not build or run tests in debug for release branches
|
||||
build-type: ${{ fromJson((startsWith(github.ref_name, 'release') && github.event_name == 'push') && '["release"]' || '["debug", "release"]') }}
|
||||
include:
|
||||
|
||||
18
Cargo.lock
generated
18
Cargo.lock
generated
@@ -1208,7 +1208,6 @@ dependencies = [
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1321,7 +1320,6 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_with",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1670,7 +1668,6 @@ dependencies = [
|
||||
"smallvec",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3147,7 +3144,6 @@ dependencies = [
|
||||
"rand 0.8.5",
|
||||
"rand_distr",
|
||||
"twox-hash",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3791,7 +3787,6 @@ dependencies = [
|
||||
"strum_macros",
|
||||
"thiserror",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4193,7 +4188,6 @@ dependencies = [
|
||||
"tokio-rustls 0.25.0",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4206,7 +4200,6 @@ dependencies = [
|
||||
"postgres",
|
||||
"tokio-postgres",
|
||||
"url",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4229,7 +4222,6 @@ dependencies = [
|
||||
"serde",
|
||||
"thiserror",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4267,7 +4259,6 @@ dependencies = [
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4832,7 +4823,6 @@ dependencies = [
|
||||
"toml_edit 0.19.10",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5357,7 +5347,6 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_with",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6193,7 +6182,6 @@ dependencies = [
|
||||
"anyhow",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6794,7 +6782,6 @@ dependencies = [
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7012,7 +6999,6 @@ dependencies = [
|
||||
"url",
|
||||
"uuid",
|
||||
"walkdir",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7091,7 +7077,6 @@ dependencies = [
|
||||
"postgres_ffi",
|
||||
"regex",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7112,7 +7097,6 @@ dependencies = [
|
||||
"bindgen",
|
||||
"postgres_ffi",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7669,8 +7653,6 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-rustls 0.24.0",
|
||||
"tokio-util",
|
||||
"toml_datetime",
|
||||
"toml_edit 0.19.10",
|
||||
"tonic",
|
||||
"tower",
|
||||
"tracing",
|
||||
|
||||
@@ -441,6 +441,11 @@ WAL-log them periodically, from a backgound worker.
|
||||
|
||||
Similarly to replications snapshot files, the CID mapping files generated during VACUUM FULL of a catalog table are WAL-logged
|
||||
|
||||
FIXME: But they're not, AFAICS?
|
||||
|
||||
FIXME: However, we do WAL-log the file in pg_logical/mappings. But AFAICS that's WAL-logged
|
||||
by PostgreSQL too. Why do we need separate WAL-logging for that? See changes in rewriteheap.c
|
||||
|
||||
### How to get rid of the patch
|
||||
|
||||
WAL-log them periodically, from a backgound worker.
|
||||
|
||||
@@ -14,5 +14,3 @@ regex.workspace = true
|
||||
|
||||
utils = { path = "../utils" }
|
||||
remote_storage = { version = "0.1", path = "../remote_storage/" }
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -6,10 +6,8 @@ license = "Apache-2.0"
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
chrono.workspace = true
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -14,5 +14,3 @@ parking_lot.workspace = true
|
||||
hex.workspace = true
|
||||
scopeguard.workspace = true
|
||||
smallvec = { workspace = true, features = ["write"] }
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -12,8 +12,6 @@ chrono.workspace = true
|
||||
twox-hash.workspace = true
|
||||
measured.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
procfs.workspace = true
|
||||
measured-process.workspace = true
|
||||
|
||||
@@ -21,11 +21,9 @@ hex.workspace = true
|
||||
humantime.workspace = true
|
||||
thiserror.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
chrono.workspace = true
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
itertools.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
bincode.workspace = true
|
||||
rand.workspace = true
|
||||
|
||||
@@ -348,7 +348,7 @@ impl AuxFilePolicy {
|
||||
|
||||
/// If a tenant writes aux files without setting `switch_aux_policy`, this value will be used.
|
||||
pub fn default_tenant_config() -> Self {
|
||||
Self::V1
|
||||
Self::V2
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ tokio-rustls.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
pq_proto.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -11,7 +11,5 @@ postgres.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -19,8 +19,6 @@ thiserror.workspace = true
|
||||
serde.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger.workspace = true
|
||||
postgres.workspace = true
|
||||
|
||||
@@ -14,8 +14,6 @@ postgres.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
camino-tempfile.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
regex.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
@@ -11,9 +11,7 @@ itertools.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
rand.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio = { workspace = true, features = ["io-util"] }
|
||||
tracing.workspace = true
|
||||
thiserror.workspace = true
|
||||
serde.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -32,7 +32,7 @@ scopeguard.workspace = true
|
||||
metrics.workspace = true
|
||||
utils.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
azure_core.workspace = true
|
||||
azure_identity.workspace = true
|
||||
azure_storage.workspace = true
|
||||
@@ -46,3 +46,4 @@ sync_wrapper = { workspace = true, features = ["futures"] }
|
||||
camino-tempfile.workspace = true
|
||||
test-context.workspace = true
|
||||
rand.workspace = true
|
||||
tokio = { workspace = true, features = ["test-util"] }
|
||||
|
||||
@@ -9,5 +9,3 @@ serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
const_format.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -9,5 +9,3 @@ license.workspace = true
|
||||
anyhow.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -14,5 +14,3 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tracing.workspace = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -39,7 +39,7 @@ thiserror.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml_edit.workspace = true
|
||||
toml_edit = { workspace = true, features = ["serde"] }
|
||||
tracing.workspace = true
|
||||
tracing-error.workspace = true
|
||||
tracing-subscriber = { workspace = true, features = ["json", "registry"] }
|
||||
@@ -54,7 +54,6 @@ walkdir.workspace = true
|
||||
pq_proto.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
metrics.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
const_format.workspace = true
|
||||
|
||||
@@ -71,6 +70,7 @@ criterion.workspace = true
|
||||
hex-literal.workspace = true
|
||||
camino-tempfile.workspace = true
|
||||
serde_assert.workspace = true
|
||||
tokio = { workspace = true, features = ["test-util"] }
|
||||
|
||||
[[bench]]
|
||||
name = "benchmarks"
|
||||
|
||||
@@ -9,8 +9,6 @@ anyhow.workspace = true
|
||||
utils.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[build-dependencies]
|
||||
anyhow.workspace = true
|
||||
bindgen.workspace = true
|
||||
|
||||
@@ -95,6 +95,7 @@ fn main() -> anyhow::Result<()> {
|
||||
.allowlist_var("ERROR")
|
||||
.allowlist_var("FATAL")
|
||||
.allowlist_var("PANIC")
|
||||
.allowlist_var("PG_VERSION_NUM")
|
||||
.allowlist_var("WPEVENT")
|
||||
.allowlist_var("WL_LATCH_SET")
|
||||
.allowlist_var("WL_SOCKET_READABLE")
|
||||
|
||||
@@ -282,7 +282,11 @@ mod tests {
|
||||
use std::cell::UnsafeCell;
|
||||
use utils::id::TenantTimelineId;
|
||||
|
||||
use crate::{api_bindings::Level, bindings::NeonWALReadResult, walproposer::Wrapper};
|
||||
use crate::{
|
||||
api_bindings::Level,
|
||||
bindings::{NeonWALReadResult, PG_VERSION_NUM},
|
||||
walproposer::Wrapper,
|
||||
};
|
||||
|
||||
use super::ApiImpl;
|
||||
|
||||
@@ -489,41 +493,79 @@ mod tests {
|
||||
|
||||
let (sender, receiver) = sync_channel(1);
|
||||
|
||||
// Messages definitions are at walproposer.h
|
||||
// xxx: it would be better to extract them from safekeeper crate and
|
||||
// use serialization/deserialization here.
|
||||
let greeting_tag = (b'g' as u64).to_ne_bytes();
|
||||
let proto_version = 2_u32.to_ne_bytes();
|
||||
let pg_version: [u8; 4] = PG_VERSION_NUM.to_ne_bytes();
|
||||
let proposer_id = [0; 16];
|
||||
let system_id = 0_u64.to_ne_bytes();
|
||||
let tenant_id = ttid.tenant_id.as_arr();
|
||||
let timeline_id = ttid.timeline_id.as_arr();
|
||||
let pg_tli = 1_u32.to_ne_bytes();
|
||||
let wal_seg_size = 16777216_u32.to_ne_bytes();
|
||||
let proposer_greeting = [
|
||||
greeting_tag.as_slice(),
|
||||
proto_version.as_slice(),
|
||||
pg_version.as_slice(),
|
||||
proposer_id.as_slice(),
|
||||
system_id.as_slice(),
|
||||
tenant_id.as_slice(),
|
||||
timeline_id.as_slice(),
|
||||
pg_tli.as_slice(),
|
||||
wal_seg_size.as_slice(),
|
||||
]
|
||||
.concat();
|
||||
|
||||
let voting_tag = (b'v' as u64).to_ne_bytes();
|
||||
let vote_request_term = 3_u64.to_ne_bytes();
|
||||
let proposer_id = [0; 16];
|
||||
let vote_request = [
|
||||
voting_tag.as_slice(),
|
||||
vote_request_term.as_slice(),
|
||||
proposer_id.as_slice(),
|
||||
]
|
||||
.concat();
|
||||
|
||||
let acceptor_greeting_term = 2_u64.to_ne_bytes();
|
||||
let acceptor_greeting_node_id = 1_u64.to_ne_bytes();
|
||||
let acceptor_greeting = [
|
||||
greeting_tag.as_slice(),
|
||||
acceptor_greeting_term.as_slice(),
|
||||
acceptor_greeting_node_id.as_slice(),
|
||||
]
|
||||
.concat();
|
||||
|
||||
let vote_response_term = 3_u64.to_ne_bytes();
|
||||
let vote_given = 1_u64.to_ne_bytes();
|
||||
let flush_lsn = 0x539_u64.to_ne_bytes();
|
||||
let truncate_lsn = 0x539_u64.to_ne_bytes();
|
||||
let th_len = 1_u32.to_ne_bytes();
|
||||
let th_term = 2_u64.to_ne_bytes();
|
||||
let th_lsn = 0x539_u64.to_ne_bytes();
|
||||
let timeline_start_lsn = 0x539_u64.to_ne_bytes();
|
||||
let vote_response = [
|
||||
voting_tag.as_slice(),
|
||||
vote_response_term.as_slice(),
|
||||
vote_given.as_slice(),
|
||||
flush_lsn.as_slice(),
|
||||
truncate_lsn.as_slice(),
|
||||
th_len.as_slice(),
|
||||
th_term.as_slice(),
|
||||
th_lsn.as_slice(),
|
||||
timeline_start_lsn.as_slice(),
|
||||
]
|
||||
.concat();
|
||||
|
||||
let my_impl: Box<dyn ApiImpl> = Box::new(MockImpl {
|
||||
wait_events: Cell::new(WaitEventsData {
|
||||
sk: std::ptr::null_mut(),
|
||||
event_mask: 0,
|
||||
}),
|
||||
expected_messages: vec![
|
||||
// TODO: When updating Postgres versions, this test will cause
|
||||
// problems. Postgres version in message needs updating.
|
||||
//
|
||||
// Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160003, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
|
||||
vec![
|
||||
103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 3, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110,
|
||||
147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147,
|
||||
188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1,
|
||||
],
|
||||
// VoteRequest(VoteRequest { term: 3 })
|
||||
vec![
|
||||
118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
||||
0, 0, 0, 0, 0, 0,
|
||||
],
|
||||
],
|
||||
expected_messages: vec![proposer_greeting, vote_request],
|
||||
expected_ptr: AtomicUsize::new(0),
|
||||
safekeeper_replies: vec![
|
||||
// Greeting(AcceptorGreeting { term: 2, node_id: NodeId(1) })
|
||||
vec![
|
||||
103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,
|
||||
],
|
||||
// VoteResponse(VoteResponse { term: 3, vote_given: 1, flush_lsn: 0/539, truncate_lsn: 0/539, term_history: [(2, 0/539)], timeline_start_lsn: 0/539 })
|
||||
vec![
|
||||
118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 57,
|
||||
5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0,
|
||||
0, 57, 5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0,
|
||||
],
|
||||
],
|
||||
safekeeper_replies: vec![acceptor_greeting, vote_response],
|
||||
replies_ptr: AtomicUsize::new(0),
|
||||
sync_channel: sender,
|
||||
shmem: UnsafeCell::new(crate::api_bindings::empty_shmem()),
|
||||
|
||||
@@ -10,6 +10,7 @@ use pageserver::{
|
||||
page_cache,
|
||||
repository::Value,
|
||||
task_mgr::TaskKind,
|
||||
tenant::storage_layer::inmemory_layer::SerializedBatch,
|
||||
tenant::storage_layer::InMemoryLayer,
|
||||
virtual_file,
|
||||
};
|
||||
@@ -67,12 +68,16 @@ async fn ingest(
|
||||
let layer =
|
||||
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?;
|
||||
|
||||
let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?;
|
||||
let data = Value::Image(Bytes::from(vec![0u8; put_size]));
|
||||
let data_ser_size = data.serialized_size().unwrap() as usize;
|
||||
let ctx = RequestContext::new(
|
||||
pageserver::task_mgr::TaskKind::WalReceiverConnectionHandler,
|
||||
pageserver::context::DownloadBehavior::Download,
|
||||
);
|
||||
|
||||
const BATCH_SIZE: usize = 16;
|
||||
let mut batch = Vec::new();
|
||||
|
||||
for i in 0..put_count {
|
||||
lsn += put_size as u64;
|
||||
|
||||
@@ -95,7 +100,17 @@ async fn ingest(
|
||||
}
|
||||
}
|
||||
|
||||
layer.put_value(key.to_compact(), lsn, &data, &ctx).await?;
|
||||
batch.push((key.to_compact(), lsn, data_ser_size, data.clone()));
|
||||
if batch.len() >= BATCH_SIZE {
|
||||
let this_batch = std::mem::take(&mut batch);
|
||||
let serialized = SerializedBatch::from_values(this_batch);
|
||||
layer.put_batch(serialized, &ctx).await?;
|
||||
}
|
||||
}
|
||||
if !batch.is_empty() {
|
||||
let this_batch = std::mem::take(&mut batch);
|
||||
let serialized = SerializedBatch::from_values(this_batch);
|
||||
layer.put_batch(serialized, &ctx).await?;
|
||||
}
|
||||
layer.freeze(lsn + 1).await;
|
||||
|
||||
|
||||
@@ -88,6 +88,8 @@ pub async fn shutdown_pageserver(
|
||||
) {
|
||||
use std::time::Duration;
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
// If the orderly shutdown below takes too long, we still want to make
|
||||
// sure that all walredo processes are killed and wait()ed on by us, not systemd.
|
||||
//
|
||||
@@ -241,7 +243,10 @@ pub async fn shutdown_pageserver(
|
||||
walredo_extraordinary_shutdown_thread.join().unwrap();
|
||||
info!("walredo_extraordinary_shutdown_thread done");
|
||||
|
||||
info!("Shut down successfully completed");
|
||||
info!(
|
||||
elapsed_ms = started_at.elapsed().as_millis(),
|
||||
"Shut down successfully completed"
|
||||
);
|
||||
std::process::exit(exit_code);
|
||||
}
|
||||
|
||||
|
||||
@@ -15,12 +15,11 @@ use crate::{aux_file, repository::*};
|
||||
use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::{
|
||||
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
|
||||
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
||||
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
|
||||
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
|
||||
CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
|
||||
};
|
||||
use pageserver_api::keyspace::SparseKeySpace;
|
||||
use pageserver_api::models::AuxFilePolicy;
|
||||
@@ -37,7 +36,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::vec_map::{VecMap, VecMapOrdering};
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
|
||||
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
|
||||
@@ -174,6 +172,7 @@ impl Timeline {
|
||||
pending_deletions: Vec::new(),
|
||||
pending_nblocks: 0,
|
||||
pending_directory_entries: Vec::new(),
|
||||
pending_bytes: 0,
|
||||
lsn,
|
||||
}
|
||||
}
|
||||
@@ -727,7 +726,17 @@ impl Timeline {
|
||||
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
|
||||
let current_policy = self.last_aux_file_policy.load();
|
||||
match current_policy {
|
||||
Some(AuxFilePolicy::V1) | None => self.list_aux_files_v1(lsn, ctx).await,
|
||||
Some(AuxFilePolicy::V1) => {
|
||||
warn!("this timeline is using deprecated aux file policy V1 (policy=V1)");
|
||||
self.list_aux_files_v1(lsn, ctx).await
|
||||
}
|
||||
None => {
|
||||
let res = self.list_aux_files_v1(lsn, ctx).await?;
|
||||
if !res.is_empty() {
|
||||
warn!("this timeline is using deprecated aux file policy V1 (policy=None)");
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await,
|
||||
Some(AuxFilePolicy::CrossValidation) => {
|
||||
let v1_result = self.list_aux_files_v1(lsn, ctx).await;
|
||||
@@ -1022,21 +1031,33 @@ pub struct DatadirModification<'a> {
|
||||
// The put-functions add the modifications here, and they are flushed to the
|
||||
// underlying key-value store by the 'finish' function.
|
||||
pending_lsns: Vec<Lsn>,
|
||||
pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
|
||||
pending_updates: HashMap<Key, Vec<(Lsn, usize, Value)>>,
|
||||
pending_deletions: Vec<(Range<Key>, Lsn)>,
|
||||
pending_nblocks: i64,
|
||||
|
||||
/// For special "directory" keys that store key-value maps, track the size of the map
|
||||
/// if it was updated in this modification.
|
||||
pending_directory_entries: Vec<(DirectoryKind, usize)>,
|
||||
|
||||
/// An **approximation** of how large our EphemeralFile write will be when committed.
|
||||
pending_bytes: usize,
|
||||
}
|
||||
|
||||
impl<'a> DatadirModification<'a> {
|
||||
// When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
|
||||
// contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
|
||||
// additionally specify a limit on how much payload a DatadirModification may contain before it should be committed.
|
||||
pub(crate) const MAX_PENDING_BYTES: usize = 8 * 1024 * 1024;
|
||||
|
||||
/// Get the current lsn
|
||||
pub(crate) fn get_lsn(&self) -> Lsn {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
pub(crate) fn approx_pending_bytes(&self) -> usize {
|
||||
self.pending_bytes
|
||||
}
|
||||
|
||||
/// Set the current lsn
|
||||
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
|
||||
ensure!(
|
||||
@@ -1576,6 +1597,7 @@ impl<'a> DatadirModification<'a> {
|
||||
if aux_files_key_v1.is_empty() {
|
||||
None
|
||||
} else {
|
||||
warn!("this timeline is using deprecated aux file policy V1");
|
||||
self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
|
||||
Some(AuxFilePolicy::V1)
|
||||
}
|
||||
@@ -1769,21 +1791,25 @@ impl<'a> DatadirModification<'a> {
|
||||
// Flush relation and SLRU data blocks, keep metadata.
|
||||
let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
|
||||
for (key, values) in self.pending_updates.drain() {
|
||||
for (lsn, value) in values {
|
||||
let mut write_batch = Vec::new();
|
||||
for (lsn, value_ser_size, value) in values {
|
||||
if key.is_rel_block_key() || key.is_slru_block_key() {
|
||||
// This bails out on first error without modifying pending_updates.
|
||||
// That's Ok, cf this function's doc comment.
|
||||
writer.put(key, lsn, &value, ctx).await?;
|
||||
write_batch.push((key.to_compact(), lsn, value_ser_size, value));
|
||||
} else {
|
||||
retained_pending_updates
|
||||
.entry(key)
|
||||
.or_default()
|
||||
.push((lsn, value));
|
||||
retained_pending_updates.entry(key).or_default().push((
|
||||
lsn,
|
||||
value_ser_size,
|
||||
value,
|
||||
));
|
||||
}
|
||||
}
|
||||
writer.put_batch(write_batch, ctx).await?;
|
||||
}
|
||||
|
||||
self.pending_updates = retained_pending_updates;
|
||||
self.pending_bytes = 0;
|
||||
|
||||
if pending_nblocks != 0 {
|
||||
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
|
||||
@@ -1809,17 +1835,20 @@ impl<'a> DatadirModification<'a> {
|
||||
self.pending_nblocks = 0;
|
||||
|
||||
if !self.pending_updates.is_empty() {
|
||||
// The put_batch call below expects expects the inputs to be sorted by Lsn,
|
||||
// so we do that first.
|
||||
let lsn_ordered_batch: VecMap<Lsn, (Key, Value)> = VecMap::from_iter(
|
||||
self.pending_updates
|
||||
.drain()
|
||||
.map(|(key, vals)| vals.into_iter().map(move |(lsn, val)| (lsn, (key, val))))
|
||||
.kmerge_by(|lhs, rhs| lhs.0 < rhs.0),
|
||||
VecMapOrdering::GreaterOrEqual,
|
||||
);
|
||||
// Ordering: the items in this batch do not need to be in any global order, but values for
|
||||
// a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
|
||||
// this to do efficient updates to its index.
|
||||
let batch: Vec<(CompactKey, Lsn, usize, Value)> = self
|
||||
.pending_updates
|
||||
.drain()
|
||||
.flat_map(|(key, values)| {
|
||||
values.into_iter().map(move |(lsn, val_ser_size, value)| {
|
||||
(key.to_compact(), lsn, val_ser_size, value)
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
writer.put_batch(lsn_ordered_batch, ctx).await?;
|
||||
writer.put_batch(batch, ctx).await?;
|
||||
}
|
||||
|
||||
if !self.pending_deletions.is_empty() {
|
||||
@@ -1844,6 +1873,8 @@ impl<'a> DatadirModification<'a> {
|
||||
writer.update_directory_entries_count(kind, count as u64);
|
||||
}
|
||||
|
||||
self.pending_bytes = 0;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1860,7 +1891,7 @@ impl<'a> DatadirModification<'a> {
|
||||
// Note: we don't check pending_deletions. It is an error to request a
|
||||
// value that has been removed, deletion only avoids leaking storage.
|
||||
if let Some(values) = self.pending_updates.get(&key) {
|
||||
if let Some((_, value)) = values.last() {
|
||||
if let Some((_, _, value)) = values.last() {
|
||||
return if let Value::Image(img) = value {
|
||||
Ok(img.clone())
|
||||
} else {
|
||||
@@ -1888,13 +1919,17 @@ impl<'a> DatadirModification<'a> {
|
||||
fn put(&mut self, key: Key, val: Value) {
|
||||
let values = self.pending_updates.entry(key).or_default();
|
||||
// Replace the previous value if it exists at the same lsn
|
||||
if let Some((last_lsn, last_value)) = values.last_mut() {
|
||||
if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
|
||||
if *last_lsn == self.lsn {
|
||||
*last_value_ser_size = val.serialized_size().unwrap() as usize;
|
||||
*last_value = val;
|
||||
return;
|
||||
}
|
||||
}
|
||||
values.push((self.lsn, val));
|
||||
|
||||
let val_serialized_size = val.serialized_size().unwrap() as usize;
|
||||
self.pending_bytes += val_serialized_size;
|
||||
values.push((self.lsn, val_serialized_size, val));
|
||||
}
|
||||
|
||||
fn delete(&mut self, key_range: Range<Key>) {
|
||||
@@ -2024,7 +2059,7 @@ mod tests {
|
||||
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let tline = tline.raw_timeline().unwrap();
|
||||
|
||||
|
||||
@@ -5932,10 +5932,10 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// the default aux file policy to switch is v1 if not set by the admins
|
||||
// the default aux file policy to switch is v2 if not set by the admins
|
||||
assert_eq!(
|
||||
harness.tenant_conf.switch_aux_file_policy,
|
||||
AuxFilePolicy::V1
|
||||
AuxFilePolicy::default_tenant_config()
|
||||
);
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
@@ -5979,8 +5979,8 @@ mod tests {
|
||||
);
|
||||
assert_eq!(
|
||||
tline.last_aux_file_policy.load(),
|
||||
Some(AuxFilePolicy::V1),
|
||||
"aux file is written with switch_aux_file_policy unset (which is v1), so we should keep v1"
|
||||
Some(AuxFilePolicy::V2),
|
||||
"aux file is written with switch_aux_file_policy unset (which is v2), so we should use v2 there"
|
||||
);
|
||||
|
||||
// we can read everything from the storage
|
||||
@@ -6002,8 +6002,8 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
tline.last_aux_file_policy.load(),
|
||||
Some(AuxFilePolicy::V1),
|
||||
"keep v1 storage format when new files are written"
|
||||
Some(AuxFilePolicy::V2),
|
||||
"keep v2 storage format when new files are written"
|
||||
);
|
||||
|
||||
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
|
||||
@@ -6019,7 +6019,7 @@ mod tests {
|
||||
|
||||
// child copies the last flag even if that is not on remote storage yet
|
||||
assert_eq!(child.get_switch_aux_file_policy(), AuxFilePolicy::V2);
|
||||
assert_eq!(child.last_aux_file_policy.load(), Some(AuxFilePolicy::V1));
|
||||
assert_eq!(child.last_aux_file_policy.load(), Some(AuxFilePolicy::V2));
|
||||
|
||||
let files = child.list_aux_files(lsn, &ctx).await.unwrap();
|
||||
assert_eq!(files.get("pg_logical/mappings/test1"), None);
|
||||
|
||||
@@ -79,6 +79,8 @@ impl EphemeralFile {
|
||||
self.rw.read_blk(blknum, ctx).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
// This is a test helper: outside of tests, we are always written to via a pre-serialized batch.
|
||||
pub(crate) async fn write_blob(
|
||||
&mut self,
|
||||
srcbuf: &[u8],
|
||||
@@ -86,17 +88,30 @@ impl EphemeralFile {
|
||||
) -> Result<u64, io::Error> {
|
||||
let pos = self.rw.bytes_written();
|
||||
|
||||
// Write the length field
|
||||
if srcbuf.len() < 0x80 {
|
||||
// short one-byte length header
|
||||
let len_buf = [srcbuf.len() as u8];
|
||||
let mut len_bytes = std::io::Cursor::new(Vec::new());
|
||||
crate::tenant::storage_layer::inmemory_layer::SerializedBatch::write_blob_length(
|
||||
srcbuf.len(),
|
||||
&mut len_bytes,
|
||||
);
|
||||
let len_bytes = len_bytes.into_inner();
|
||||
|
||||
self.rw.write_all_borrowed(&len_buf, ctx).await?;
|
||||
} else {
|
||||
let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
|
||||
len_buf[0] |= 0x80;
|
||||
self.rw.write_all_borrowed(&len_buf, ctx).await?;
|
||||
}
|
||||
// Write the length field
|
||||
self.rw.write_all_borrowed(&len_bytes, ctx).await?;
|
||||
|
||||
// Write the payload
|
||||
self.rw.write_all_borrowed(srcbuf, ctx).await?;
|
||||
|
||||
Ok(pos)
|
||||
}
|
||||
|
||||
/// Returns the offset at which the first byte of the input was written, for use
|
||||
/// in constructing indices over the written value.
|
||||
pub(crate) async fn write_raw(
|
||||
&mut self,
|
||||
srcbuf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, io::Error> {
|
||||
let pos = self.rw.bytes_written();
|
||||
|
||||
// Write the payload
|
||||
self.rw.write_all_borrowed(srcbuf, ctx).await?;
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
pub mod delta_layer;
|
||||
pub mod image_layer;
|
||||
pub(crate) mod inmemory_layer;
|
||||
pub mod inmemory_layer;
|
||||
pub(crate) mod layer;
|
||||
mod layer_desc;
|
||||
mod layer_name;
|
||||
|
||||
@@ -33,7 +33,7 @@ use std::fmt::Write;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{
|
||||
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
|
||||
@@ -320,6 +320,82 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Offset of a particular Value within a serialized batch.
|
||||
struct SerializedBatchOffset {
|
||||
key: CompactKey,
|
||||
lsn: Lsn,
|
||||
/// offset in bytes from the start of the batch's buffer to the Value's serialized size header.
|
||||
offset: u64,
|
||||
}
|
||||
|
||||
pub struct SerializedBatch {
|
||||
/// Blobs serialized in EphemeralFile's native format, ready for passing to [`EphemeralFile::write_raw`].
|
||||
pub(crate) raw: Vec<u8>,
|
||||
|
||||
/// Index of values in [`Self::raw`], using offsets relative to the start of the buffer.
|
||||
offsets: Vec<SerializedBatchOffset>,
|
||||
|
||||
/// The highest LSN of any value in the batch
|
||||
pub(crate) max_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl SerializedBatch {
|
||||
/// Write a blob length in the internal format of the EphemeralFile
|
||||
pub(crate) fn write_blob_length(len: usize, cursor: &mut std::io::Cursor<Vec<u8>>) {
|
||||
use std::io::Write;
|
||||
|
||||
if len < 0x80 {
|
||||
// short one-byte length header
|
||||
let len_buf = [len as u8];
|
||||
|
||||
cursor
|
||||
.write_all(&len_buf)
|
||||
.expect("Writing to Vec is infallible");
|
||||
} else {
|
||||
let mut len_buf = u32::to_be_bytes(len as u32);
|
||||
len_buf[0] |= 0x80;
|
||||
cursor
|
||||
.write_all(&len_buf)
|
||||
.expect("Writing to Vec is infallible");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_values(batch: Vec<(CompactKey, Lsn, usize, Value)>) -> Self {
|
||||
// Pre-allocate a big flat buffer to write into. This should be large but not huge: it is soft-limited in practice by
|
||||
// [`crate::pgdatadir_mapping::DatadirModification::MAX_PENDING_BYTES`]
|
||||
let buffer_size = batch.iter().map(|i| i.2).sum::<usize>() + 4 * batch.len();
|
||||
let mut cursor = std::io::Cursor::new(Vec::<u8>::with_capacity(buffer_size));
|
||||
|
||||
let mut offsets: Vec<SerializedBatchOffset> = Vec::with_capacity(batch.len());
|
||||
let mut max_lsn: Lsn = Lsn(0);
|
||||
for (key, lsn, val_ser_size, val) in batch {
|
||||
let relative_off = cursor.position();
|
||||
|
||||
Self::write_blob_length(val_ser_size, &mut cursor);
|
||||
val.ser_into(&mut cursor)
|
||||
.expect("Writing into in-memory buffer is infallible");
|
||||
|
||||
offsets.push(SerializedBatchOffset {
|
||||
key,
|
||||
lsn,
|
||||
offset: relative_off,
|
||||
});
|
||||
max_lsn = std::cmp::max(max_lsn, lsn);
|
||||
}
|
||||
|
||||
let buffer = cursor.into_inner();
|
||||
|
||||
// Assert that we didn't do any extra allocations while building buffer.
|
||||
debug_assert!(buffer.len() <= buffer_size);
|
||||
|
||||
Self {
|
||||
raw: buffer,
|
||||
offsets,
|
||||
max_lsn,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn inmem_layer_display(mut f: impl Write, start_lsn: Lsn, end_lsn: Lsn) -> std::fmt::Result {
|
||||
write!(f, "inmem-{:016X}-{:016X}", start_lsn.0, end_lsn.0)
|
||||
}
|
||||
@@ -380,37 +456,20 @@ impl InMemoryLayer {
|
||||
})
|
||||
}
|
||||
|
||||
// Write operations
|
||||
|
||||
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
||||
/// Adds the page version to the in-memory tree
|
||||
pub async fn put_value(
|
||||
// Write path.
|
||||
pub async fn put_batch(
|
||||
&self,
|
||||
key: CompactKey,
|
||||
lsn: Lsn,
|
||||
buf: &[u8],
|
||||
serialized_batch: SerializedBatch,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
let mut inner = self.inner.write().await;
|
||||
self.assert_writable();
|
||||
self.put_value_locked(&mut inner, key, lsn, buf, ctx).await
|
||||
}
|
||||
|
||||
async fn put_value_locked(
|
||||
&self,
|
||||
locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
|
||||
key: CompactKey,
|
||||
lsn: Lsn,
|
||||
buf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
|
||||
|
||||
let off = {
|
||||
locked_inner
|
||||
let base_off = {
|
||||
inner
|
||||
.file
|
||||
.write_blob(
|
||||
buf,
|
||||
.write_raw(
|
||||
&serialized_batch.raw,
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build(),
|
||||
@@ -418,15 +477,23 @@ impl InMemoryLayer {
|
||||
.await?
|
||||
};
|
||||
|
||||
let vec_map = locked_inner.index.entry(key).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
|
||||
if old.is_some() {
|
||||
// We already had an entry for this LSN. That's odd..
|
||||
warn!("Key {} at {} already exists", key, lsn);
|
||||
for SerializedBatchOffset {
|
||||
key,
|
||||
lsn,
|
||||
offset: relative_off,
|
||||
} in serialized_batch.offsets
|
||||
{
|
||||
let off = base_off + relative_off;
|
||||
let vec_map = inner.index.entry(key).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
|
||||
if old.is_some() {
|
||||
// We already had an entry for this LSN. That's odd..
|
||||
warn!("Key {} at {} already exists", key, lsn);
|
||||
}
|
||||
}
|
||||
|
||||
let size = locked_inner.file.len();
|
||||
locked_inner.resource_units.maybe_publish_size(size);
|
||||
let size = inner.file.len();
|
||||
inner.resource_units.maybe_publish_size(size);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -22,8 +22,8 @@ use handle::ShardTimelineId;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
key::{
|
||||
KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE,
|
||||
NON_INHERITED_SPARSE_RANGE,
|
||||
CompactKey, KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX,
|
||||
NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE,
|
||||
},
|
||||
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
|
||||
models::{
|
||||
@@ -44,10 +44,8 @@ use tokio::{
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::{
|
||||
bin_ser::BeSer,
|
||||
fs_ext, pausable_failpoint,
|
||||
sync::gate::{Gate, GateGuard},
|
||||
vec_map::VecMap,
|
||||
};
|
||||
|
||||
use std::pin::pin;
|
||||
@@ -137,7 +135,10 @@ use self::layer_manager::LayerManager;
|
||||
use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::{config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized};
|
||||
use super::{
|
||||
config::TenantConf, storage_layer::inmemory_layer, storage_layer::LayerVisibilityHint,
|
||||
upload_queue::NotInitialized,
|
||||
};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
use super::{
|
||||
@@ -2233,6 +2234,11 @@ impl Timeline {
|
||||
|
||||
handles: Default::default(),
|
||||
};
|
||||
|
||||
if aux_file_policy == Some(AuxFilePolicy::V1) {
|
||||
warn!("this timeline is using deprecated aux file policy V1");
|
||||
}
|
||||
|
||||
result.repartition_threshold =
|
||||
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
|
||||
|
||||
@@ -3589,34 +3595,6 @@ impl Timeline {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
// FIXME(auxfilesv2): support multiple metadata key partitions might need initdb support as well?
|
||||
// This code path will not be hit during regression tests. After #7099 we have a single partition
|
||||
// with two key ranges. If someone wants to fix initdb optimization in the future, this might need
|
||||
// to be fixed.
|
||||
|
||||
// For metadata, always create delta layers.
|
||||
let delta_layer = if !metadata_partition.parts.is_empty() {
|
||||
assert_eq!(
|
||||
metadata_partition.parts.len(),
|
||||
1,
|
||||
"currently sparse keyspace should only contain a single metadata keyspace"
|
||||
);
|
||||
let metadata_keyspace = &metadata_partition.parts[0];
|
||||
self.create_delta_layer(
|
||||
&frozen_layer,
|
||||
Some(
|
||||
metadata_keyspace.0.ranges.first().unwrap().start
|
||||
..metadata_keyspace.0.ranges.last().unwrap().end,
|
||||
),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| FlushLayerError::from_anyhow(self, e))?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// For image layers, we add them immediately into the layer map.
|
||||
let mut layers_to_upload = Vec::new();
|
||||
layers_to_upload.extend(
|
||||
self.create_image_layers(
|
||||
@@ -3627,13 +3605,27 @@ impl Timeline {
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
if let Some(delta_layer) = delta_layer {
|
||||
layers_to_upload.push(delta_layer.clone());
|
||||
(layers_to_upload, Some(delta_layer))
|
||||
} else {
|
||||
(layers_to_upload, None)
|
||||
if !metadata_partition.parts.is_empty() {
|
||||
assert_eq!(
|
||||
metadata_partition.parts.len(),
|
||||
1,
|
||||
"currently sparse keyspace should only contain a single metadata keyspace"
|
||||
);
|
||||
layers_to_upload.extend(
|
||||
self.create_image_layers(
|
||||
// Safety: create_image_layers treat sparse keyspaces differently that it does not scan
|
||||
// every single key within the keyspace, and therefore, it's safe to force converting it
|
||||
// into a dense keyspace before calling this function.
|
||||
&metadata_partition.into_dense(),
|
||||
self.initdb_lsn,
|
||||
ImageLayerCreationMode::Initial,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
}
|
||||
|
||||
(layers_to_upload, None)
|
||||
} else {
|
||||
// Normal case, write out a L0 delta layer file.
|
||||
// `create_delta_layer` will not modify the layer map.
|
||||
@@ -4043,8 +4035,6 @@ impl Timeline {
|
||||
mode: ImageLayerCreationMode,
|
||||
start: Key,
|
||||
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
|
||||
assert!(!matches!(mode, ImageLayerCreationMode::Initial));
|
||||
|
||||
// Metadata keys image layer creation.
|
||||
let mut reconstruct_state = ValuesReconstructState::default();
|
||||
let data = self
|
||||
@@ -4210,15 +4200,13 @@ impl Timeline {
|
||||
"metadata keys must be partitioned separately"
|
||||
);
|
||||
}
|
||||
if mode == ImageLayerCreationMode::Initial {
|
||||
return Err(CreateImageLayersError::Other(anyhow::anyhow!("no image layer should be created for metadata keys when flushing frozen layers")));
|
||||
}
|
||||
if mode == ImageLayerCreationMode::Try && !check_for_image_layers {
|
||||
// Skip compaction if there are not enough updates. Metadata compaction will do a scan and
|
||||
// might mess up with evictions.
|
||||
start = img_range.end;
|
||||
continue;
|
||||
}
|
||||
// For initial and force modes, we always generate image layers for metadata keys.
|
||||
} else if let ImageLayerCreationMode::Try = mode {
|
||||
// check_for_image_layers = false -> skip
|
||||
// check_for_image_layers = true -> check time_for_new_image_layer -> skip/generate
|
||||
@@ -4226,7 +4214,8 @@ impl Timeline {
|
||||
start = img_range.end;
|
||||
continue;
|
||||
}
|
||||
} else if let ImageLayerCreationMode::Force = mode {
|
||||
}
|
||||
if let ImageLayerCreationMode::Force = mode {
|
||||
// When forced to create image layers, we might try and create them where they already
|
||||
// exist. This mode is only used in tests/debug.
|
||||
let layers = self.layers.read().await;
|
||||
@@ -4240,6 +4229,7 @@ impl Timeline {
|
||||
img_range.start,
|
||||
img_range.end
|
||||
);
|
||||
start = img_range.end;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -5590,44 +5580,6 @@ enum OpenLayerAction {
|
||||
}
|
||||
|
||||
impl<'a> TimelineWriter<'a> {
|
||||
/// Put a new page version that can be constructed from a WAL record
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
pub(crate) async fn put(
|
||||
&mut self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
value: &Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// Avoid doing allocations for "small" values.
|
||||
// In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
|
||||
// https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061
|
||||
let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
|
||||
value.ser_into(&mut buf)?;
|
||||
let buf_size: u64 = buf.len().try_into().expect("oversized value buf");
|
||||
|
||||
let action = self.get_open_layer_action(lsn, buf_size);
|
||||
let layer = self.handle_open_layer_action(lsn, action, ctx).await?;
|
||||
let res = layer.put_value(key.to_compact(), lsn, &buf, ctx).await;
|
||||
|
||||
if res.is_ok() {
|
||||
// Update the current size only when the entire write was ok.
|
||||
// In case of failures, we may have had partial writes which
|
||||
// render the size tracking out of sync. That's ok because
|
||||
// the checkpoint distance should be significantly smaller
|
||||
// than the S3 single shot upload limit of 5GiB.
|
||||
let state = self.write_guard.as_mut().unwrap();
|
||||
|
||||
state.current_size += buf_size;
|
||||
state.prev_lsn = Some(lsn);
|
||||
state.max_lsn = std::cmp::max(state.max_lsn, Some(lsn));
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
async fn handle_open_layer_action(
|
||||
&mut self,
|
||||
at: Lsn,
|
||||
@@ -5733,18 +5685,58 @@ impl<'a> TimelineWriter<'a> {
|
||||
}
|
||||
|
||||
/// Put a batch of keys at the specified Lsns.
|
||||
///
|
||||
/// The batch is sorted by Lsn (enforced by usage of [`utils::vec_map::VecMap`].
|
||||
pub(crate) async fn put_batch(
|
||||
&mut self,
|
||||
batch: VecMap<Lsn, (Key, Value)>,
|
||||
batch: Vec<(CompactKey, Lsn, usize, Value)>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
for (lsn, (key, val)) in batch {
|
||||
self.put(key, lsn, &val, ctx).await?
|
||||
if batch.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
let serialized_batch = inmemory_layer::SerializedBatch::from_values(batch);
|
||||
let batch_max_lsn = serialized_batch.max_lsn;
|
||||
let buf_size: u64 = serialized_batch.raw.len() as u64;
|
||||
|
||||
let action = self.get_open_layer_action(batch_max_lsn, buf_size);
|
||||
let layer = self
|
||||
.handle_open_layer_action(batch_max_lsn, action, ctx)
|
||||
.await?;
|
||||
|
||||
let res = layer.put_batch(serialized_batch, ctx).await;
|
||||
|
||||
if res.is_ok() {
|
||||
// Update the current size only when the entire write was ok.
|
||||
// In case of failures, we may have had partial writes which
|
||||
// render the size tracking out of sync. That's ok because
|
||||
// the checkpoint distance should be significantly smaller
|
||||
// than the S3 single shot upload limit of 5GiB.
|
||||
let state = self.write_guard.as_mut().unwrap();
|
||||
|
||||
state.current_size += buf_size;
|
||||
state.prev_lsn = Some(batch_max_lsn);
|
||||
state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn));
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
/// Test helper, for tests that would like to poke individual values without composing a batch
|
||||
pub(crate) async fn put(
|
||||
&mut self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
value: &Value,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
use utils::bin_ser::BeSer;
|
||||
let val_ser_size = value.serialized_size().unwrap() as usize;
|
||||
self.put_batch(
|
||||
vec![(key.to_compact(), lsn, val_ser_size, value.clone())],
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_batch(
|
||||
|
||||
@@ -27,8 +27,8 @@ use super::TaskStateUpdate;
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
metrics::{LIVE_CONNECTIONS, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::WALRECEIVER_RUNTIME,
|
||||
pgdatadir_mapping::DatadirModification,
|
||||
task_mgr::{TaskKind, WALRECEIVER_RUNTIME},
|
||||
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
|
||||
walingest::WalIngest,
|
||||
walrecord::DecodedWALRecord,
|
||||
@@ -345,7 +345,10 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// Commit every ingest_batch_size records. Even if we filtered out
|
||||
// all records, we still need to call commit to advance the LSN.
|
||||
uncommitted_records += 1;
|
||||
if uncommitted_records >= ingest_batch_size {
|
||||
if uncommitted_records >= ingest_batch_size
|
||||
|| modification.approx_pending_bytes()
|
||||
> DatadirModification::MAX_PENDING_BYTES
|
||||
{
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(uncommitted_records - filtered_records);
|
||||
|
||||
@@ -284,6 +284,9 @@ extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum,
|
||||
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, const void *buffer, bool skipFsync);
|
||||
#endif
|
||||
|
||||
extern PGDLLEXPORT void neon_dump_relsize_cache(void);
|
||||
|
||||
extern void neon_writeback(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, BlockNumber nblocks);
|
||||
extern BlockNumber neon_nblocks(SMgrRelation reln, ForkNumber forknum);
|
||||
|
||||
@@ -110,7 +110,8 @@ get_cached_relsize(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber *size)
|
||||
|
||||
tag.rinfo = rinfo;
|
||||
tag.forknum = forknum;
|
||||
LWLockAcquire(relsize_lock, LW_SHARED);
|
||||
/* We need exclusive lock here because of LRU list manipulation */
|
||||
LWLockAcquire(relsize_lock, LW_EXCLUSIVE);
|
||||
entry = hash_search(relsize_hash, &tag, HASH_FIND, NULL);
|
||||
if (entry != NULL)
|
||||
{
|
||||
@@ -276,3 +277,62 @@ relsize_shmem_request(void)
|
||||
RequestNamedLWLockTranche("neon_relsize", 1);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
/*
|
||||
* A debugging function, to print the contents of the relsize cache as NOTICE
|
||||
* messages. This is exposed in the neon_test_utils extension.
|
||||
*/
|
||||
void
|
||||
neon_dump_relsize_cache(void)
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
RelSizeEntry *entry;
|
||||
dlist_iter iter;
|
||||
int cnt;
|
||||
|
||||
if (relsize_hash_size == 0)
|
||||
{
|
||||
elog(NOTICE, "relsize cache is disable");
|
||||
return;
|
||||
}
|
||||
|
||||
LWLockAcquire(relsize_lock, LW_EXCLUSIVE);
|
||||
|
||||
elog(NOTICE, "stats: size %lu hits: " UINT64_FORMAT " misses " UINT64_FORMAT " writes " UINT64_FORMAT,
|
||||
(unsigned long) relsize_ctl->size, relsize_ctl->hits, relsize_ctl->misses, relsize_ctl->writes);
|
||||
|
||||
elog(NOTICE, "hash:");
|
||||
cnt = 0;
|
||||
hash_seq_init(&status, relsize_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
cnt++;
|
||||
elog(NOTICE, "hash entry %d: rel %u/%u/%u.%u size %u",
|
||||
cnt,
|
||||
RelFileInfoFmt(entry->tag.rinfo),
|
||||
entry->tag.forknum,
|
||||
entry->size);
|
||||
}
|
||||
|
||||
elog(NOTICE, "LRU:");
|
||||
cnt = 0;
|
||||
dlist_foreach(iter, &relsize_ctl->lru)
|
||||
{
|
||||
entry = dlist_container(RelSizeEntry, lru_node, iter.cur);
|
||||
cnt++;
|
||||
elog(NOTICE, "LRU entry %d: rel %u/%u/%u.%u size %u",
|
||||
cnt,
|
||||
RelFileInfoFmt(entry->tag.rinfo),
|
||||
entry->tag.forknum,
|
||||
entry->size);
|
||||
|
||||
if (cnt > relsize_hash_size * 2)
|
||||
{
|
||||
elog(NOTICE, "broken LRU chain??");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
LWLockRelease(relsize_lock);
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ OBJS = \
|
||||
neontest.o
|
||||
|
||||
EXTENSION = neon_test_utils
|
||||
DATA = neon_test_utils--1.3.sql
|
||||
DATA = neon_test_utils--1.4.sql
|
||||
PGFILEDESC = "neon_test_utils - helpers for neon testing and debugging"
|
||||
|
||||
PG_CONFIG = pg_config
|
||||
|
||||
@@ -69,3 +69,8 @@ BEGIN
|
||||
PERFORM trigger_segfault();
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE FUNCTION dump_relsize_cache()
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'dump_relsize_cache'
|
||||
LANGUAGE C PARALLEL UNSAFE;
|
||||
@@ -1,6 +1,6 @@
|
||||
# neon_test_utils extension
|
||||
comment = 'helpers for neon testing and debugging'
|
||||
default_version = '1.3'
|
||||
default_version = '1.4'
|
||||
module_pathname = '$libdir/neon_test_utils'
|
||||
relocatable = true
|
||||
trusted = true
|
||||
|
||||
@@ -45,6 +45,7 @@ PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
|
||||
PG_FUNCTION_INFO_V1(neon_xlogflush);
|
||||
PG_FUNCTION_INFO_V1(trigger_panic);
|
||||
PG_FUNCTION_INFO_V1(trigger_segfault);
|
||||
PG_FUNCTION_INFO_V1(dump_relsize_cache);
|
||||
|
||||
/*
|
||||
* Linkage to functions in neon module.
|
||||
@@ -60,6 +61,10 @@ typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, B
|
||||
|
||||
static neon_read_at_lsn_type neon_read_at_lsn_ptr;
|
||||
|
||||
typedef void (*neon_dump_relsize_cache_type) (void);
|
||||
|
||||
static neon_dump_relsize_cache_type neon_dump_relsize_cache_ptr;
|
||||
|
||||
/*
|
||||
* Module initialize function: fetch function pointers for cross-module calls.
|
||||
*/
|
||||
@@ -68,12 +73,18 @@ _PG_init(void)
|
||||
{
|
||||
/* Asserts verify that typedefs above match original declarations */
|
||||
AssertVariableIsOfType(&neon_read_at_lsn, neon_read_at_lsn_type);
|
||||
AssertVariableIsOfType(&neon_dump_relsize_cache, neon_dump_relsize_cache_type);
|
||||
neon_read_at_lsn_ptr = (neon_read_at_lsn_type)
|
||||
load_external_function("$libdir/neon", "neon_read_at_lsn",
|
||||
true, NULL);
|
||||
|
||||
neon_dump_relsize_cache_ptr = (neon_dump_relsize_cache_type)
|
||||
load_external_function("$libdir/neon", "neon_dump_relsize_cache",
|
||||
true, NULL);
|
||||
}
|
||||
|
||||
#define neon_read_at_lsn neon_read_at_lsn_ptr
|
||||
#define neon_dump_relsize_cache neon_dump_relsize_cache_ptr
|
||||
|
||||
/*
|
||||
* test_consume_oids(int4), for rapidly consuming OIDs, to test wraparound.
|
||||
@@ -528,3 +539,11 @@ trigger_segfault(PG_FUNCTION_ARGS)
|
||||
*ptr = 42;
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
Datum
|
||||
dump_relsize_cache(PG_FUNCTION_ARGS)
|
||||
{
|
||||
neon_dump_relsize_cache();
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import argparse
|
||||
import enum
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import List
|
||||
@@ -93,7 +94,7 @@ if __name__ == "__main__":
|
||||
"--no-color",
|
||||
action="store_true",
|
||||
help="disable colored output",
|
||||
default=not sys.stdout.isatty(),
|
||||
default=not sys.stdout.isatty() or os.getenv("TERM") == "dumb",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
|
||||
@@ -1,4 +1,92 @@
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
// rustc lints/lint groups
|
||||
// https://doc.rust-lang.org/rustc/lints/groups.html
|
||||
#![deny(
|
||||
deprecated,
|
||||
future_incompatible,
|
||||
// TODO: consider let_underscore
|
||||
nonstandard_style,
|
||||
rust_2024_compatibility
|
||||
)]
|
||||
#![warn(clippy::all, clippy::pedantic, clippy::cargo)]
|
||||
// List of denied lints from the clippy::restriction group.
|
||||
// https://rust-lang.github.io/rust-clippy/master/index.html#?groups=restriction
|
||||
#![warn(
|
||||
clippy::undocumented_unsafe_blocks,
|
||||
clippy::dbg_macro,
|
||||
clippy::empty_enum_variants_with_brackets,
|
||||
clippy::exit,
|
||||
clippy::float_cmp_const,
|
||||
clippy::lossy_float_literal,
|
||||
clippy::macro_use_imports,
|
||||
clippy::manual_ok_or,
|
||||
// TODO: consider clippy::map_err_ignore
|
||||
// TODO: consider clippy::mem_forget
|
||||
clippy::rc_mutex,
|
||||
clippy::rest_pat_in_fully_bound_structs,
|
||||
clippy::string_add,
|
||||
clippy::string_to_string,
|
||||
clippy::todo,
|
||||
// TODO: consider clippy::unimplemented
|
||||
// TODO: consider clippy::unwrap_used
|
||||
)]
|
||||
// List of permanently allowed lints.
|
||||
#![allow(
|
||||
// It's ok to cast u8 to bool, etc.
|
||||
clippy::cast_lossless,
|
||||
)]
|
||||
// List of temporarily allowed lints.
|
||||
// TODO: Switch to except() once stable with 1.81.
|
||||
// TODO: fix code and reduce list or move to permanent list above.
|
||||
#![allow(
|
||||
clippy::cargo_common_metadata,
|
||||
clippy::cast_possible_truncation,
|
||||
clippy::cast_possible_wrap,
|
||||
clippy::cast_precision_loss,
|
||||
clippy::cast_sign_loss,
|
||||
clippy::default_trait_access,
|
||||
clippy::doc_markdown,
|
||||
clippy::explicit_iter_loop,
|
||||
clippy::float_cmp,
|
||||
clippy::if_not_else,
|
||||
clippy::ignored_unit_patterns,
|
||||
clippy::implicit_hasher,
|
||||
clippy::inconsistent_struct_constructor,
|
||||
clippy::inline_always,
|
||||
clippy::items_after_statements,
|
||||
clippy::manual_assert,
|
||||
clippy::manual_let_else,
|
||||
clippy::manual_string_new,
|
||||
clippy::match_bool,
|
||||
clippy::match_same_arms,
|
||||
clippy::match_wild_err_arm,
|
||||
clippy::missing_errors_doc,
|
||||
clippy::missing_panics_doc,
|
||||
clippy::module_name_repetitions,
|
||||
clippy::multiple_crate_versions,
|
||||
clippy::must_use_candidate,
|
||||
clippy::needless_for_each,
|
||||
clippy::needless_pass_by_value,
|
||||
clippy::needless_raw_string_hashes,
|
||||
clippy::option_as_ref_cloned,
|
||||
clippy::redundant_closure_for_method_calls,
|
||||
clippy::redundant_else,
|
||||
clippy::return_self_not_must_use,
|
||||
clippy::similar_names,
|
||||
clippy::single_char_pattern,
|
||||
clippy::single_match_else,
|
||||
clippy::struct_excessive_bools,
|
||||
clippy::struct_field_names,
|
||||
clippy::too_many_lines,
|
||||
clippy::uninlined_format_args,
|
||||
clippy::unnested_or_patterns,
|
||||
clippy::unreadable_literal,
|
||||
clippy::unused_async,
|
||||
clippy::unused_self,
|
||||
clippy::used_underscore_binding,
|
||||
clippy::wildcard_imports
|
||||
)]
|
||||
// List of temporarily allowed lints to unblock beta/nightly.
|
||||
#![allow(unknown_lints, clippy::manual_inspect)]
|
||||
|
||||
use std::convert::Infallible;
|
||||
|
||||
|
||||
@@ -114,6 +114,16 @@ fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Res
|
||||
})
|
||||
}
|
||||
|
||||
/// List all (not deleted) timelines.
|
||||
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
let res: Vec<TenantTimelineId> = GlobalTimelines::get_all()
|
||||
.iter()
|
||||
.map(|tli| tli.ttid)
|
||||
.collect();
|
||||
json_response(StatusCode::OK, res)
|
||||
}
|
||||
|
||||
/// Report info about timeline.
|
||||
async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
@@ -562,6 +572,9 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
.post("/v1/tenant/timeline", |r| {
|
||||
request_span(r, timeline_create_handler)
|
||||
})
|
||||
.get("/v1/tenant/timeline", |r| {
|
||||
request_span(r, timeline_list_handler)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
||||
request_span(r, timeline_status_handler)
|
||||
})
|
||||
|
||||
@@ -18,6 +18,7 @@ import psycopg2
|
||||
from psycopg2.extras import execute_values
|
||||
|
||||
CREATE_TABLE = """
|
||||
CREATE TYPE arch AS ENUM ('ARM64', 'X64', 'UNKNOWN');
|
||||
CREATE TABLE IF NOT EXISTS results (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
parent_suite TEXT NOT NULL,
|
||||
@@ -28,6 +29,7 @@ CREATE TABLE IF NOT EXISTS results (
|
||||
stopped_at TIMESTAMPTZ NOT NULL,
|
||||
duration INT NOT NULL,
|
||||
flaky BOOLEAN NOT NULL,
|
||||
arch arch DEFAULT 'X64',
|
||||
build_type TEXT NOT NULL,
|
||||
pg_version INT NOT NULL,
|
||||
run_id BIGINT NOT NULL,
|
||||
@@ -35,7 +37,7 @@ CREATE TABLE IF NOT EXISTS results (
|
||||
reference TEXT NOT NULL,
|
||||
revision CHAR(40) NOT NULL,
|
||||
raw JSONB COMPRESSION lz4 NOT NULL,
|
||||
UNIQUE (parent_suite, suite, name, build_type, pg_version, started_at, stopped_at, run_id)
|
||||
UNIQUE (parent_suite, suite, name, arch, build_type, pg_version, started_at, stopped_at, run_id)
|
||||
);
|
||||
"""
|
||||
|
||||
@@ -50,6 +52,7 @@ class Row:
|
||||
stopped_at: datetime
|
||||
duration: int
|
||||
flaky: bool
|
||||
arch: str
|
||||
build_type: str
|
||||
pg_version: int
|
||||
run_id: int
|
||||
@@ -121,6 +124,14 @@ def ingest_test_result(
|
||||
raw.pop("labels")
|
||||
raw.pop("extra")
|
||||
|
||||
# All allure parameters are prefixed with "__", see test_runner/fixtures/parametrize.py
|
||||
parameters = {
|
||||
p["name"].removeprefix("__"): p["value"]
|
||||
for p in test["parameters"]
|
||||
if p["name"].startswith("__")
|
||||
}
|
||||
arch = parameters.get("arch", "UNKNOWN").strip("'")
|
||||
|
||||
build_type, pg_version, unparametrized_name = parse_test_name(test["name"])
|
||||
labels = {label["name"]: label["value"] for label in test["labels"]}
|
||||
row = Row(
|
||||
@@ -132,6 +143,7 @@ def ingest_test_result(
|
||||
stopped_at=datetime.fromtimestamp(test["time"]["stop"] / 1000, tz=timezone.utc),
|
||||
duration=test["time"]["duration"],
|
||||
flaky=test["flaky"] or test["retriesStatusChange"],
|
||||
arch=arch,
|
||||
build_type=build_type,
|
||||
pg_version=pg_version,
|
||||
run_id=run_id,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import random
|
||||
from dataclasses import dataclass
|
||||
from functools import total_ordering
|
||||
from typing import Any, Type, TypeVar, Union
|
||||
from typing import Any, Dict, Type, TypeVar, Union
|
||||
|
||||
T = TypeVar("T", bound="Id")
|
||||
|
||||
@@ -147,6 +147,19 @@ class TimelineId(Id):
|
||||
return self.id.hex()
|
||||
|
||||
|
||||
@dataclass
|
||||
class TenantTimelineId:
|
||||
tenant_id: TenantId
|
||||
timeline_id: TimelineId
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: Dict[str, Any]) -> "TenantTimelineId":
|
||||
return TenantTimelineId(
|
||||
tenant_id=TenantId(d["tenant_id"]),
|
||||
timeline_id=TimelineId(d["timeline_id"]),
|
||||
)
|
||||
|
||||
|
||||
# Workaround for compat with python 3.9, which does not have `typing.Self`
|
||||
TTenantShardId = TypeVar("TTenantShardId", bound="TenantShardId")
|
||||
|
||||
|
||||
@@ -61,8 +61,6 @@ from fixtures.pageserver.common_types import IndexPartDump, LayerName, parse_lay
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
wait_for_upload_queue_empty,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
@@ -5347,9 +5345,7 @@ def last_flush_lsn_upload(
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
ps_http = pageserver.http_client(auth_token=auth_token)
|
||||
wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn)
|
||||
# force a checkpoint to trigger upload
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, timeline_id)
|
||||
wait_for_upload(ps_http, tenant_shard_id, timeline_id, last_flush_lsn)
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, timeline_id, wait_until_uploaded=True)
|
||||
return last_flush_lsn
|
||||
|
||||
|
||||
@@ -5434,9 +5430,5 @@ def generate_uploads_and_deletions(
|
||||
# ensures that the pageserver is in a fully idle state: there will be no more
|
||||
# background ingest, no more uploads pending, and therefore no non-determinism
|
||||
# in subsequent actions like pageserver restarts.
|
||||
final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
# Finish uploads
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn)
|
||||
# Finish all remote writes (including deletions)
|
||||
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
|
||||
flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import os
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
import toml
|
||||
from _pytest.python import Metafunc
|
||||
@@ -91,3 +92,23 @@ def pytest_generate_tests(metafunc: Metafunc):
|
||||
and (platform := os.getenv("PLATFORM")) is not None
|
||||
):
|
||||
metafunc.parametrize("platform", [platform.lower()])
|
||||
|
||||
|
||||
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
|
||||
def pytest_runtest_makereport(*args, **kwargs):
|
||||
# Add test parameters to Allue report to distinguish the same tests with different parameters.
|
||||
# Names has `__` prefix to avoid conflicts with `pytest.mark.parametrize` parameters
|
||||
|
||||
# A mapping between `uname -m` and `RUNNER_ARCH` values.
|
||||
# `RUNNER_ARCH` environment variable is set on GitHub Runners,
|
||||
# possible values are X86, X64, ARM, or ARM64.
|
||||
# See https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
|
||||
uname_m = {
|
||||
"aarch64": "ARM64",
|
||||
"arm64": "ARM64",
|
||||
"x86_64": "X64",
|
||||
}.get(os.uname().machine, "UNKNOWN")
|
||||
arch = os.getenv("RUNNER_ARCH", uname_m)
|
||||
allure.dynamic.parameter("__arch", arch)
|
||||
|
||||
yield
|
||||
|
||||
@@ -5,7 +5,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.common_types import Lsn, TenantId, TenantTimelineId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
|
||||
|
||||
@@ -144,6 +144,12 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def timeline_list(self) -> List[TenantTimelineId]:
|
||||
res = self.get(f"http://localhost:{self.port}/v1/tenant/timeline")
|
||||
res.raise_for_status()
|
||||
resj = res.json()
|
||||
return [TenantTimelineId.from_json(ttidj) for ttidj in resj]
|
||||
|
||||
def timeline_create(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -10,7 +10,7 @@ from fixtures.neon_fixtures import (
|
||||
tenant_get_shards,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
|
||||
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
|
||||
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.
|
||||
@@ -174,8 +174,9 @@ class Workload:
|
||||
|
||||
if upload:
|
||||
# Wait for written data to be uploaded to S3 (force a checkpoint to trigger upload)
|
||||
ps_http.timeline_checkpoint(tenant_shard_id, self.timeline_id)
|
||||
wait_for_upload(ps_http, tenant_shard_id, self.timeline_id, last_flush_lsn)
|
||||
ps_http.timeline_checkpoint(
|
||||
tenant_shard_id, self.timeline_id, wait_until_uploaded=True
|
||||
)
|
||||
log.info(f"Churn: waiting for remote LSN {last_flush_lsn}")
|
||||
else:
|
||||
log.info(f"Churn: not waiting for upload, disk LSN {last_flush_lsn}")
|
||||
|
||||
@@ -5,8 +5,12 @@ from typing import Any, Dict, Tuple
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
|
||||
from fixtures.pageserver.utils import wait_for_upload_queue_empty
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
flush_ep_to_pageserver,
|
||||
)
|
||||
from fixtures.remote_storage import s3_storage
|
||||
from fixtures.utils import humantime_to_ms
|
||||
|
||||
@@ -62,9 +66,6 @@ def test_download_churn(
|
||||
|
||||
run_benchmark(env, pg_bin, record, io_engine, concurrency_per_target, duration)
|
||||
|
||||
# see https://github.com/neondatabase/neon/issues/8712
|
||||
env.stop(immediate=True)
|
||||
|
||||
|
||||
def setup_env(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
remote_storage_kind = s3_storage()
|
||||
@@ -98,9 +99,9 @@ def setup_env(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
f"INSERT INTO data SELECT lpad(i::text, {bytes_per_row}, '0') FROM generate_series(1, {int(nrows)}) as i",
|
||||
options="-c statement_timeout=0",
|
||||
)
|
||||
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
|
||||
# TODO: this is a bit imprecise, there could be frozen layers being written out that we don't observe here
|
||||
wait_for_upload_queue_empty(client, tenant_id, timeline_id)
|
||||
flush_ep_to_pageserver(env, ep, tenant_id, timeline_id)
|
||||
|
||||
client.timeline_checkpoint(tenant_id, timeline_id, compact=False, wait_until_uploaded=True)
|
||||
|
||||
return env
|
||||
|
||||
|
||||
@@ -1,20 +1,21 @@
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
|
||||
|
||||
|
||||
#
|
||||
# Benchmark searching the layer map, when there are a lot of small layer files.
|
||||
#
|
||||
def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
env = neon_env_builder.init_start()
|
||||
"""Benchmark searching the layer map, when there are a lot of small layer files."""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
n_iters = 10
|
||||
n_records = 100000
|
||||
|
||||
env.start()
|
||||
|
||||
# We want to have a lot of lot of layer files to exercise the layer map. Disable
|
||||
# GC, and make checkpoint_distance very small, so that we get a lot of small layer
|
||||
# files.
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
tenant, timeline = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"gc_period": "0s",
|
||||
"checkpoint_distance": "16384",
|
||||
@@ -24,8 +25,7 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
}
|
||||
)
|
||||
|
||||
env.neon_cli.create_timeline("test_layer_map", tenant_id=tenant)
|
||||
endpoint = env.endpoints.create_start("test_layer_map", tenant_id=tenant)
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute("create table t(x integer)")
|
||||
for _ in range(n_iters):
|
||||
@@ -33,9 +33,12 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
time.sleep(1)
|
||||
|
||||
cur.execute("vacuum t")
|
||||
|
||||
with zenbenchmark.record_duration("test_query"):
|
||||
cur.execute("SELECT count(*) from t")
|
||||
assert cur.fetchone() == (n_iters * n_records,)
|
||||
|
||||
# see https://github.com/neondatabase/neon/issues/8712
|
||||
env.stop(immediate=True)
|
||||
flush_ep_to_pageserver(env, endpoint, tenant, timeline)
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
tenant, timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
|
||||
|
||||
|
||||
def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
|
||||
@@ -34,7 +34,7 @@ def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
|
||||
|
||||
# Clear the cache, so that we exercise reconstructing the pages
|
||||
# from WAL
|
||||
cur.execute("SELECT clear_buffer_cache()")
|
||||
endpoint.clear_shared_buffers()
|
||||
|
||||
# Check that the cursor opened earlier still works. If the
|
||||
# combocids are not restored correctly, it won't.
|
||||
@@ -43,6 +43,10 @@ def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
|
||||
assert len(rows) == 500
|
||||
|
||||
cur.execute("rollback")
|
||||
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
|
||||
|
||||
def test_combocid_delete(neon_env_builder: NeonEnvBuilder):
|
||||
@@ -92,7 +96,7 @@ def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
|
||||
cur.execute("delete from t")
|
||||
# Clear the cache, so that we exercise reconstructing the pages
|
||||
# from WAL
|
||||
cur.execute("SELECT clear_buffer_cache()")
|
||||
endpoint.clear_shared_buffers()
|
||||
|
||||
# Check that the cursor opened earlier still works. If the
|
||||
# combocids are not restored correctly, it won't.
|
||||
@@ -102,6 +106,11 @@ def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
cur.execute("rollback")
|
||||
|
||||
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
|
||||
|
||||
def test_combocid(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -137,3 +146,8 @@ def test_combocid(neon_env_builder: NeonEnvBuilder):
|
||||
assert cur.rowcount == n_records
|
||||
|
||||
cur.execute("rollback")
|
||||
|
||||
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
|
||||
@@ -9,14 +9,17 @@ from typing import List, Optional
|
||||
|
||||
import pytest
|
||||
import toml
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
flush_ep_to_pageserver,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import RemoteStorageKind, S3Storage, s3_storage
|
||||
@@ -122,11 +125,9 @@ def test_create_snapshot(
|
||||
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
|
||||
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, lsn)
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(pageserver_http, tenant_id, timeline_id, lsn)
|
||||
flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id)
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
|
||||
|
||||
env.endpoints.stop_all()
|
||||
for sk in env.safekeepers:
|
||||
@@ -300,7 +301,7 @@ def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, r
|
||||
pg_version = env.pg_version
|
||||
|
||||
# Stop endpoint while we recreate timeline
|
||||
ep.stop()
|
||||
flush_ep_to_pageserver(env, ep, tenant_id, timeline_id)
|
||||
|
||||
try:
|
||||
pageserver_http.timeline_preserve_initdb_archive(tenant_id, timeline_id)
|
||||
@@ -348,6 +349,11 @@ def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, r
|
||||
assert not dump_from_wal_differs, "dump from WAL differs"
|
||||
assert not initial_dump_differs, "initial dump differs"
|
||||
|
||||
flush_ep_to_pageserver(env, ep, tenant_id, timeline_id)
|
||||
pageserver_http.timeline_checkpoint(
|
||||
tenant_id, timeline_id, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
|
||||
|
||||
def dump_differs(
|
||||
first: Path, second: Path, output: Path, allowed_diffs: Optional[List[str]] = None
|
||||
|
||||
@@ -18,7 +18,6 @@ from fixtures.neon_fixtures import (
|
||||
from fixtures.pageserver.utils import (
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.utils import assert_pageserver_backups_equal, subprocess_capture
|
||||
@@ -144,7 +143,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
|
||||
# Wait for data to land in s3
|
||||
wait_for_last_record_lsn(client, tenant, timeline, Lsn(end_lsn))
|
||||
wait_for_upload(client, tenant, timeline, Lsn(end_lsn))
|
||||
client.timeline_checkpoint(tenant, timeline, compact=False, wait_until_uploaded=True)
|
||||
|
||||
# Check it worked
|
||||
endpoint = env.endpoints.create_start(branch_name, tenant_id=tenant)
|
||||
@@ -290,7 +289,7 @@ def _import(
|
||||
|
||||
# Wait for data to land in s3
|
||||
wait_for_last_record_lsn(client, tenant, timeline, lsn)
|
||||
wait_for_upload(client, tenant, timeline, lsn)
|
||||
client.timeline_checkpoint(tenant, timeline, compact=False, wait_until_uploaded=True)
|
||||
|
||||
# Check it worked
|
||||
endpoint = env.endpoints.create_start(branch_name, tenant_id=tenant, lsn=lsn)
|
||||
|
||||
@@ -1,27 +1,31 @@
|
||||
import os
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
logical_replication_sync,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
if env.pg_version != PgVersion.V16:
|
||||
def test_layer_bloating(neon_env_builder: NeonEnvBuilder, vanilla_pg):
|
||||
if neon_env_builder.pg_version != PgVersion.V16:
|
||||
pytest.skip("pg_log_standby_snapshot() function is available only in PG16")
|
||||
|
||||
timeline = env.neon_cli.create_branch("test_logical_replication", "empty")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_logical_replication", config_lines=["log_statement=all"]
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
"compaction_threshold": 99999,
|
||||
"image_creation_threshold": 99999,
|
||||
}
|
||||
)
|
||||
|
||||
timeline = env.initial_timeline
|
||||
endpoint = env.endpoints.create_start("main", config_lines=["log_statement=all"])
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
@@ -54,7 +58,7 @@ def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
# Wait logical replication to sync
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, timeline)
|
||||
time.sleep(10)
|
||||
env.pageserver.http_client().timeline_checkpoint(env.initial_tenant, timeline, compact=False)
|
||||
|
||||
# Check layer file sizes
|
||||
timeline_path = f"{env.pageserver.workdir}/tenants/{env.initial_tenant}/timelines/{timeline}/"
|
||||
@@ -63,3 +67,5 @@ def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
if filename.startswith("00000"):
|
||||
log.info(f"layer {filename} size is {os.path.getsize(timeline_path + filename)}")
|
||||
assert os.path.getsize(timeline_path + filename) < 512_000_000
|
||||
|
||||
env.stop(immediate=True)
|
||||
|
||||
@@ -22,7 +22,7 @@ def random_string(n: int):
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.V2, AuxFileStore.CrossValidation]
|
||||
"pageserver_aux_file_policy", [AuxFileStore.V2, AuxFileStore.CrossValidation]
|
||||
)
|
||||
def test_aux_file_v2_flag(neon_simple_env: NeonEnv, pageserver_aux_file_policy: AuxFileStore):
|
||||
env = neon_simple_env
|
||||
@@ -31,9 +31,7 @@ def test_aux_file_v2_flag(neon_simple_env: NeonEnv, pageserver_aux_file_policy:
|
||||
assert pageserver_aux_file_policy == tenant_config["switch_aux_file_policy"]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation]
|
||||
)
|
||||
@pytest.mark.parametrize("pageserver_aux_file_policy", [AuxFileStore.CrossValidation])
|
||||
def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
@@ -175,9 +173,7 @@ COMMIT;
|
||||
|
||||
|
||||
# Test that neon.logical_replication_max_snap_files works
|
||||
@pytest.mark.parametrize(
|
||||
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation]
|
||||
)
|
||||
@pytest.mark.parametrize("pageserver_aux_file_policy", [AuxFileStore.CrossValidation])
|
||||
def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
def slot_removed(ep):
|
||||
assert (
|
||||
@@ -355,9 +351,7 @@ FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of
|
||||
#
|
||||
# Most pages start with a contrecord, so we don't do anything special
|
||||
# to ensure that.
|
||||
@pytest.mark.parametrize(
|
||||
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation]
|
||||
)
|
||||
@pytest.mark.parametrize("pageserver_aux_file_policy", [AuxFileStore.CrossValidation])
|
||||
def test_restart_endpoint(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
@@ -402,9 +396,7 @@ def test_restart_endpoint(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
# logical replication bug as such, but without logical replication,
|
||||
# records passed ot the WAL redo process are never large enough to hit
|
||||
# the bug.
|
||||
@pytest.mark.parametrize(
|
||||
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation]
|
||||
)
|
||||
@pytest.mark.parametrize("pageserver_aux_file_policy", [AuxFileStore.CrossValidation])
|
||||
def test_large_records(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
@@ -476,9 +468,7 @@ def test_slots_and_branching(neon_simple_env: NeonEnv):
|
||||
ws_cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation]
|
||||
)
|
||||
@pytest.mark.parametrize("pageserver_aux_file_policy", [AuxFileStore.CrossValidation])
|
||||
def test_replication_shutdown(neon_simple_env: NeonEnv):
|
||||
# Ensure Postgres can exit without stuck when a replication job is active + neon extension installed
|
||||
env = neon_simple_env
|
||||
|
||||
187
test_runner/regress/test_relsize_cache.py
Normal file
187
test_runner/regress/test_relsize_cache.py
Normal file
@@ -0,0 +1,187 @@
|
||||
import concurrent.futures
|
||||
import time
|
||||
from contextlib import closing
|
||||
import random
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
def test_relsize_cache(neon_simple_env: NeonEnv):
|
||||
"""Stress tests the relsize cache in compute
|
||||
|
||||
The test runs a few different workloads in parallel on the same
|
||||
table:
|
||||
* INSERTs
|
||||
* SELECT with seqscan
|
||||
* VACUUM
|
||||
|
||||
The table is created with 100 indexes, to exercise the relation
|
||||
extension codepath as much as possible.
|
||||
|
||||
At the same time, we run yet another thread which creates a new
|
||||
target table, and switches 'tblname' a global variable, so that
|
||||
all the other threads start to use that too. Sometimes (with 50%
|
||||
probability ), it also TRUNCATEs the old table after switching, so
|
||||
that the relsize "forget" function also gets exercised.
|
||||
|
||||
This test was written to test a bug in locking of the relsize
|
||||
cache's LRU list, which lead to a corrupted LRU list, causing the
|
||||
effective size of the relsize cache to shrink to just a few
|
||||
entries over time as old entries were missing from the LRU list
|
||||
and thus "leaked", with the right workload. This is probably more
|
||||
complicated than necessary to reproduce that particular bug, but
|
||||
it gives a nice variety of concurrent activities on the relsize
|
||||
cache.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_relsize_cache", "empty")
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_relsize_cache",
|
||||
config_lines=[
|
||||
# Make the relsize cache small, so that the LRU-based
|
||||
# eviction gets exercised
|
||||
"neon.relsize_hash_size=100",
|
||||
|
||||
# Use a large shared buffers and LFC, so that it's not
|
||||
# slowed down by getpage requests to storage. They are not
|
||||
# interesting for this test, and we want as much
|
||||
# contention on the relsize cache as possible.
|
||||
"shared_buffers='1000 MB'",
|
||||
"neon.file_cache_path='file.cache'",
|
||||
"neon.max_file_cache_size=512MB",
|
||||
"neon.file_cache_size_limit=512MB",
|
||||
],
|
||||
)
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("CREATE EXTENSION amcheck")
|
||||
|
||||
# Function to create the target table
|
||||
def create_tbl(wcur, new_tblname: str):
|
||||
wcur.execute(f"CREATE TABLE {new_tblname} (x bigint, y bigint, z bigint)")
|
||||
for i in range(0, 100):
|
||||
wcur.execute(f"CREATE INDEX relsize_test_idx_{new_tblname}_{i} ON {new_tblname} (x, y, z)")
|
||||
|
||||
# create initial table
|
||||
tblname = "tbl_initial"
|
||||
create_tbl(cur, tblname)
|
||||
|
||||
inserters_running = 0
|
||||
total_inserts = 0
|
||||
|
||||
# XXX
|
||||
def insert_thread(id: int):
|
||||
nonlocal tblname, inserters_running, total_inserts
|
||||
log.info(f"i{id}: inserter thread started")
|
||||
with closing(endpoint.connect()) as wconn:
|
||||
with wconn.cursor() as wcur:
|
||||
|
||||
wcur.execute("set synchronous_commit=off")
|
||||
|
||||
for i in range(0, 100):
|
||||
this_tblname = tblname
|
||||
wcur.execute(
|
||||
f"INSERT INTO {this_tblname} SELECT 1000000000*random(), g, g FROM generate_series(1, 100) g"
|
||||
)
|
||||
total_inserts += 100
|
||||
log.info(f"i{id}: inserted to {this_tblname}")
|
||||
|
||||
inserters_running -= 1
|
||||
log.info(f"inserter thread {id} finished!")
|
||||
|
||||
# This thread periodically creates a new target table
|
||||
def switcher_thread():
|
||||
nonlocal tblname, inserters_running, total_inserts
|
||||
log.info("switcher thread started")
|
||||
wconn = endpoint.connect()
|
||||
wcur = wconn.cursor()
|
||||
|
||||
tblcounter = 0
|
||||
while inserters_running > 0:
|
||||
time.sleep(0.01)
|
||||
old_tblname = tblname
|
||||
|
||||
# Create a new target table and change the global 'tblname' variable to
|
||||
# switch to it
|
||||
tblcounter += 1
|
||||
new_tblname = f"tbl{tblcounter}"
|
||||
create_tbl(wcur, new_tblname)
|
||||
tblname = new_tblname
|
||||
|
||||
# With 50% probability, also truncate the old table, to exercise the
|
||||
# relsize "forget" codepath too
|
||||
if random.random() < 0.5:
|
||||
wcur.execute(f"TRUNCATE {old_tblname}")
|
||||
|
||||
# print a "progress repot"
|
||||
log.info(f"switched to {new_tblname} ({total_inserts} inserts done)")
|
||||
|
||||
# Continuously run vacuum on the target table.
|
||||
#
|
||||
# Vacuum has the effect of invalidating the cached relation size in relcache
|
||||
def vacuum_thread():
|
||||
nonlocal tblname, inserters_running
|
||||
log.info("vacuum thread started")
|
||||
wconn = endpoint.connect()
|
||||
wcur = wconn.cursor()
|
||||
|
||||
while inserters_running > 0:
|
||||
wcur.execute(f"vacuum {tblname}")
|
||||
|
||||
# Continuously query the current target table
|
||||
#
|
||||
# This actually queries not just the latest target table, but a
|
||||
# few latest ones. This is implemented by only updating the target
|
||||
# table with 10% probability on each iteration. This gives a bit
|
||||
# more variability on the relsize entries that are requested from
|
||||
# the cache.
|
||||
def query_thread(id: int):
|
||||
nonlocal tblname, inserters_running
|
||||
log.info(f"q{id}: query thread started")
|
||||
wconn = endpoint.connect()
|
||||
wcur = wconn.cursor()
|
||||
wcur.execute("set max_parallel_workers_per_gather=0")
|
||||
|
||||
this_tblname = tblname
|
||||
while inserters_running > 0:
|
||||
if random.random() < 0.1:
|
||||
this_tblname = tblname
|
||||
wcur.execute(f"select count(*) from {this_tblname}")
|
||||
|
||||
log.info(f"q{id}: query thread finished!")
|
||||
|
||||
# With 'with', this waits for all the threads to finish
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
|
||||
futures = []
|
||||
|
||||
# Launch all the threads
|
||||
f = executor.submit(switcher_thread)
|
||||
futures.append(f)
|
||||
f = executor.submit(vacuum_thread)
|
||||
futures.append(f)
|
||||
|
||||
# 5 inserter threads
|
||||
for i in range(0, 5):
|
||||
f = executor.submit(insert_thread, i)
|
||||
futures.append(f)
|
||||
inserters_running += 1
|
||||
|
||||
# 20 query threads
|
||||
for i in range(0, 20):
|
||||
f = executor.submit(query_thread, i)
|
||||
futures.append(f)
|
||||
|
||||
for f in concurrent.futures.as_completed(futures):
|
||||
ex = f.exception()
|
||||
if ex:
|
||||
log.info(f"exception from thread, stopping: {ex}")
|
||||
inserters_running = 0 # abort the other threads
|
||||
f.result()
|
||||
|
||||
# Finally, run amcheck on all the indexes. Most relsize cache bugs
|
||||
# would result in runtime ERRORs, but doesn't hurt to do more sanity
|
||||
# checking.
|
||||
cur.execute(f"select bt_index_check(oid, true) from pg_class where relname like 'relsize_test_idx%'")
|
||||
@@ -152,6 +152,9 @@ def test_scrubber_physical_gc(neon_env_builder: NeonEnvBuilder, shard_count: Opt
|
||||
# This write includes remote upload, will generate an index in this generation
|
||||
workload.write_rows(1)
|
||||
|
||||
# We will use a min_age_secs=1 threshold for deletion, let it pass
|
||||
time.sleep(2)
|
||||
|
||||
# With a high min_age, the scrubber should decline to delete anything
|
||||
gc_summary = env.storage_scrubber.pageserver_physical_gc(min_age_secs=3600)
|
||||
assert gc_summary["remote_storage_errors"] == 0
|
||||
|
||||
@@ -37,9 +37,7 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
|
||||
scur.execute("CREATE TABLE t (pk integer primary key, sk integer)")
|
||||
# scur.execute("CREATE INDEX on t(sk)") # slowdown applying WAL at replica
|
||||
pub_conn = f"host=localhost port={pub.pg_port} dbname=postgres user=cloud_admin"
|
||||
# synchronous_commit=on to test a hypothesis for why this test has been flaky.
|
||||
# XXX: Add link to the issue
|
||||
query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_conn}' PUBLICATION pub with (synchronous_commit=on)"
|
||||
query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_conn}' PUBLICATION pub"
|
||||
scur.execute(query)
|
||||
time.sleep(2) # let initial table sync complete
|
||||
|
||||
|
||||
@@ -757,6 +757,9 @@ def test_lsn_lease_size(neon_env_builder: NeonEnvBuilder, test_output_dir: Path,
|
||||
|
||||
assert_size_approx_equal_for_lease_test(lease_res, ro_branch_res)
|
||||
|
||||
# we are writing a lot, and flushing all of that to disk is not important for this test
|
||||
env.stop(immediate=True)
|
||||
|
||||
|
||||
def insert_with_action(
|
||||
env: NeonEnv,
|
||||
|
||||
@@ -254,6 +254,10 @@ def test_many_timelines(neon_env_builder: NeonEnvBuilder):
|
||||
assert max(init_m[2].flush_lsns) <= min(final_m[2].flush_lsns) < middle_lsn
|
||||
assert max(init_m[2].commit_lsns) <= min(final_m[2].commit_lsns) < middle_lsn
|
||||
|
||||
# Test timeline_list endpoint.
|
||||
http_cli = env.safekeepers[0].http_client()
|
||||
assert len(http_cli.timeline_list()) == 3
|
||||
|
||||
|
||||
# Check that dead minority doesn't prevent the commits: execute insert n_inserts
|
||||
# times, with fault_probability chance of getting a wal acceptor down or up
|
||||
@@ -1296,6 +1300,8 @@ def test_lagging_sk(neon_env_builder: NeonEnvBuilder):
|
||||
# Check that WALs are the same.
|
||||
cmp_sk_wal([sk1, sk2, sk3], tenant_id, timeline_id)
|
||||
|
||||
env.stop(immediate=True)
|
||||
|
||||
|
||||
# Smaller version of test_one_sk_down testing peer recovery in isolation: that
|
||||
# it works without compute at all.
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 3fd7a45f8a...b6910406e2
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 46b4b235f3...76063bff63
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 47a9122a5a...8efa089aa7
12
vendor/revisions.json
vendored
12
vendor/revisions.json
vendored
@@ -1,14 +1,14 @@
|
||||
{
|
||||
"v16": [
|
||||
"16.3",
|
||||
"47a9122a5a150a3217fafd3f3d4fe8e020ea718a"
|
||||
"16.4",
|
||||
"8efa089aa7786381543a4f9efc69b92d43eab8c0"
|
||||
],
|
||||
"v15": [
|
||||
"15.7",
|
||||
"46b4b235f38413ab5974bb22c022f9b829257674"
|
||||
"15.8",
|
||||
"76063bff638ccce7afa99fc9037ac51338b9823d"
|
||||
],
|
||||
"v14": [
|
||||
"14.12",
|
||||
"3fd7a45f8aae85c080df6329e3c85887b7f3a737"
|
||||
"14.13",
|
||||
"b6910406e2d05a2c94baa2e530ec882733047759"
|
||||
]
|
||||
}
|
||||
|
||||
@@ -259,7 +259,7 @@ files:
|
||||
from
|
||||
(values ('5m'),('15m'),('1h')) as t (x);
|
||||
|
||||
- metric_name: current_lsn
|
||||
- metric_name: compute_current_lsn
|
||||
type: gauge
|
||||
help: 'Current LSN of the database'
|
||||
key_labels:
|
||||
@@ -272,6 +272,19 @@ files:
|
||||
else (pg_current_wal_lsn() - '0/0')::FLOAT8
|
||||
end as lsn;
|
||||
|
||||
- metric_name: compute_receive_lsn
|
||||
type: gauge
|
||||
help: 'Returns the last write-ahead log location that has been received and synced to disk by streaming replication'
|
||||
key_labels:
|
||||
values: [lsn]
|
||||
query: |
|
||||
SELECT
|
||||
CASE
|
||||
WHEN pg_catalog.pg_is_in_recovery()
|
||||
THEN (pg_last_wal_receive_lsn() - '0/0')::FLOAT8
|
||||
ELSE 0
|
||||
END AS lsn;
|
||||
|
||||
- metric_name: replication_delay_bytes
|
||||
type: gauge
|
||||
help: 'Bytes between received and replayed LSN'
|
||||
@@ -312,6 +325,22 @@ files:
|
||||
query: |
|
||||
SELECT checkpoints_timed FROM pg_stat_bgwriter;
|
||||
|
||||
- metric_name: compute_logical_snapshot_files
|
||||
type: guage
|
||||
help: 'Number of snapshot files in pg_logical/snapshot'
|
||||
key_labels:
|
||||
- tenant_id
|
||||
- timeline_id
|
||||
values: [num_logical_snapshot_files]
|
||||
query: |
|
||||
SELECT
|
||||
(SELECT setting FROM pg_settings WHERE name = 'neon.tenant_id') AS tenant_id,
|
||||
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
|
||||
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp. These
|
||||
-- temporary snapshot files are renamed to the actual snapshot files after they are
|
||||
-- completely built. We only WAL-log the completely built snapshot files.
|
||||
(SELECT COUNT(*) FROM pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS num_logical_snapshot_files;
|
||||
|
||||
# In all the below metrics, we cast LSNs to floats because Prometheus only supports floats.
|
||||
# It's probably fine because float64 can store integers from -2^53 to +2^53 exactly.
|
||||
|
||||
|
||||
@@ -80,8 +80,6 @@ time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
|
||||
tokio-rustls = { version = "0.24" }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||
toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
|
||||
toml_edit = { version = "0.19", features = ["serde"] }
|
||||
tonic = { version = "0.9", features = ["tls-roots"] }
|
||||
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "log", "timeout", "util"] }
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
@@ -124,7 +122,6 @@ serde = { version = "1", features = ["alloc", "derive"] }
|
||||
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit"] }
|
||||
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] }
|
||||
time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] }
|
||||
toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
|
||||
zstd = { version = "0.13" }
|
||||
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
|
||||
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }
|
||||
|
||||
Reference in New Issue
Block a user