Merge pull request #2203 from neondatabase/release-uuid-ossp

Deploy new storage and compute version to production

Release 2022-08-02
This commit is contained in:
Heikki Linnakangas
2022-08-02 15:08:14 +03:00
committed by GitHub
54 changed files with 1632 additions and 1517 deletions

View File

@@ -38,7 +38,7 @@ runs:
path: ./neon-artifact/
- name: Extract Neon artifact
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
run: |
mkdir -p /tmp/neon/
tar -xf ./neon-artifact/neon.tar.zst -C /tmp/neon/
@@ -59,7 +59,7 @@ runs:
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
run: ./scripts/pysync
- name: Run pytest
@@ -70,7 +70,7 @@ runs:
# this variable will be embedded in perf test report
# and is needed to distinguish different environments
PLATFORM: github-actions-selfhosted
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
run: |
PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)"
rm -rf $PERF_REPORT_DIR
@@ -123,7 +123,7 @@ runs:
fi
- name: Delete all data but logs
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
if: always()
run: |
du -sh /tmp/test_output/*

View File

@@ -5,7 +5,7 @@ runs:
using: "composite"
steps:
- name: Merge coverage data
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
- name: Upload coverage data

View File

@@ -60,7 +60,7 @@ jobs:
- name: Setup cluster
env:
BENCHMARK_CONNSTR: "${{ secrets.BENCHMARK_STAGING_CONNSTR }}"
shell: bash
shell: bash -euxo pipefail {0}
run: |
set -e

View File

@@ -9,7 +9,7 @@ on:
defaults:
run:
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
@@ -517,7 +517,7 @@ jobs:
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
STAGING='{"env_name": "staging", "proxy_job": "neon-proxy", "proxy_config": "staging.proxy", "kubeconfig_secret": "STAGING_KUBECONFIG_DATA"}'
NEON_STRESS='{"env_name": "neon-stress", "proxy_job": "neon-stress-proxy", "proxy_config": "neon-stress.proxy", "kubeconfig_secret": "NEON_STRESS_KUBECONFIG_DATA"}'
echo "::set-output name=include::[$STAGING]"
echo "::set-output name=include::[$STAGING, $NEON_STRESS]"
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
PRODUCTION='{"env_name": "production", "proxy_job": "neon-proxy", "proxy_config": "production.proxy", "kubeconfig_secret": "PRODUCTION_KUBECONFIG_DATA"}'
echo "::set-output name=include::[$PRODUCTION]"

View File

@@ -8,7 +8,7 @@ on:
defaults:
run:
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
@@ -27,7 +27,7 @@ jobs:
# Rust toolchains (e.g. nightly or 1.37.0), add them here.
rust_toolchain: [1.58]
os: [ubuntu-latest, macos-latest]
timeout-minutes: 50
timeout-minutes: 60
name: run regression test suite
runs-on: ${{ matrix.os }}

View File

@@ -40,7 +40,7 @@ jobs:
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
run: ./scripts/pysync
- name: Run pytest
@@ -49,7 +49,7 @@ jobs:
BENCHMARK_CONNSTR: "${{ secrets.BENCHMARK_STAGING_CONNSTR }}"
TEST_OUTPUT: /tmp/test_output
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
shell: bash -ex {0}
shell: bash -euxo pipefail {0}
run: |
# Test framework expects we have psql binary;
# but since we don't really need it in this test, let's mock it

View File

@@ -29,9 +29,11 @@ else
endif
# macOS with brew-installed openssl requires explicit paths
# It can be configured with OPENSSL_PREFIX variable
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Darwin)
PG_CONFIGURE_OPTS += --with-includes=$(HOMEBREW_PREFIX)/opt/openssl/include --with-libraries=$(HOMEBREW_PREFIX)/opt/openssl/lib
OPENSSL_PREFIX ?= $(shell brew --prefix openssl@3)
PG_CONFIGURE_OPTS += --with-includes=$(OPENSSL_PREFIX)/include --with-libraries=$(OPENSSL_PREFIX)/lib
endif
# Choose whether we should be silent or verbose

View File

@@ -1,6 +1,6 @@
# Neon
Neon is a serverless open source alternative to AWS Aurora Postgres. It separates storage and compute and substitutes PostgreSQL storage layer by redistributing data across a cluster of nodes.
Neon is a serverless open-source alternative to AWS Aurora Postgres. It separates storage and compute and substitutes the PostgreSQL storage layer by redistributing data across a cluster of nodes.
The project used to be called "Zenith". Many of the commands and code comments
still refer to "zenith", but we are in the process of renaming things.
@@ -12,32 +12,31 @@ Alternatively, compile and run the project [locally](#running-local-installation
## Architecture overview
A Neon installation consists of compute nodes and Neon storage engine.
A Neon installation consists of compute nodes and a Neon storage engine.
Compute nodes are stateless PostgreSQL nodes, backed by Neon storage engine.
Compute nodes are stateless PostgreSQL nodes backed by the Neon storage engine.
Neon storage engine consists of two major components:
- Pageserver. Scalable storage backend for compute nodes.
- WAL service. The service that receives WAL from compute node and ensures that it is stored durably.
The Neon storage engine consists of two major components:
- Pageserver. Scalable storage backend for the compute nodes.
- WAL service. The service receives WAL from the compute node and ensures that it is stored durably.
Pageserver consists of:
- Repository - Neon storage implementation.
- WAL receiver - service that receives WAL from WAL service and stores it in the repository.
- Page service - service that communicates with compute nodes and responds with pages from the repository.
- WAL redo - service that builds pages from base images and WAL records on Page service request.
- WAL redo - service that builds pages from base images and WAL records on Page service request
## Running local installation
#### Installing dependencies on Linux
1. Install build dependencies and other useful packages
1. Install build dependencies and other applicable packages
* On Ubuntu or Debian this set of packages should be sufficient to build the code:
* On Ubuntu or Debian, this set of packages should be sufficient to build the code:
```bash
apt install build-essential libtool libreadline-dev zlib1g-dev flex bison libseccomp-dev \
libssl-dev clang pkg-config libpq-dev etcd cmake postgresql-client
```
* On Fedora these packages are needed:
* On Fedora, these packages are needed:
```bash
dnf install flex bison readline-devel zlib-devel openssl-devel \
libseccomp-devel perl clang cmake etcd postgresql postgresql-contrib
@@ -69,7 +68,7 @@ brew install libpq
brew link --force libpq
```
#### Building on Linux and OSX
#### Building on Linux
1. Build neon and patched postgres
```
@@ -80,19 +79,35 @@ cd neon
# The preferred and default is to make a debug build. This will create a
# demonstrably slower build than a release build. If you want to use a release
# build, utilize "`BUILD_TYPE=release make -j`nproc``"
# build, utilize "BUILD_TYPE=release make -j`nproc`"
make -j`nproc`
```
#### dependency installation notes
#### Building on OSX
1. Build neon and patched postgres
```
# Note: The path to the neon sources can not contain a space.
git clone --recursive https://github.com/neondatabase/neon.git
cd neon
# The preferred and default is to make a debug build. This will create a
# demonstrably slower build than a release build. If you want to use a release
# build, utilize "BUILD_TYPE=release make -j`sysctl -n hw.logicalcpu`"
make -j`sysctl -n hw.logicalcpu`
```
#### Dependency installation notes
To run the `psql` client, install the `postgresql-client` package or modify `PATH` and `LD_LIBRARY_PATH` to include `tmp_install/bin` and `tmp_install/lib`, respectively.
To run the integration tests or Python scripts (not required to use the code), install
Python (3.9 or higher), and install python3 packages using `./scripts/pysync` (requires poetry) in the project directory.
Python (3.9 or higher), and install python3 packages using `./scripts/pysync` (requires [poetry](https://python-poetry.org/)) in the project directory.
#### running neon database
#### Running neon database
1. Start pageserver and postgres on top of it (should be called from repo root):
```sh
# Create repository in .neon with proper paths to binaries and data
@@ -123,7 +138,7 @@ Starting postgres node at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=pos
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
```
2. Now it is possible to connect to postgres and run some queries:
2. Now, it is possible to connect to postgres and run some queries:
```text
> psql -p55432 -h 127.0.0.1 -U cloud_admin postgres
postgres=# CREATE TABLE t(key int primary key, value text);
@@ -181,14 +196,16 @@ postgres=# select * from t;
(1 row)
```
4. If you want to run tests afterwards (see below), you have to stop all the running the pageserver, safekeeper and postgres instances
you have just started. You can stop them all with one command:
4. If you want to run tests afterward (see below), you must stop all the running of the pageserver, safekeeper, and postgres instances
you have just started. You can terminate them all with one command:
```sh
> ./target/debug/neon_local stop
```
## Running tests
Ensure your dependencies are installed as described [here](https://github.com/neondatabase/neon#dependency-installation-notes).
```sh
git clone --recursive https://github.com/neondatabase/neon.git
make # builds also postgres and installs it to ./tmp_install
@@ -205,8 +222,8 @@ To view your `rustdoc` documentation in a browser, try running `cargo doc --no-d
### Postgres-specific terms
Due to Neon's very close relation with PostgreSQL internals, there are numerous specific terms used.
Same applies to certain spelling: i.e. we use MB to denote 1024 * 1024 bytes, while MiB would be technically more correct, it's inconsistent with what PostgreSQL code and its documentation use.
Due to Neon's very close relation with PostgreSQL internals, numerous specific terms are used.
The same applies to certain spelling: i.e. we use MB to denote 1024 * 1024 bytes, while MiB would be technically more correct, it's inconsistent with what PostgreSQL code and its documentation use.
To get more familiar with this aspect, refer to:

View File

@@ -157,7 +157,7 @@ fn main() -> Result<()> {
exit(code)
}
Err(error) => {
error!("could not start the compute node: {}", error);
error!("could not start the compute node: {:?}", error);
let mut state = compute.state.write().unwrap();
state.error = Some(format!("{:?}", error));

View File

@@ -12,9 +12,9 @@ use anyhow::{bail, Context};
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver::http::models::{TenantConfigRequest, TenantCreateRequest, TimelineCreateRequest};
use pageserver::tenant_mgr::TenantInfo;
use pageserver::timelines::TimelineInfo;
use pageserver::http::models::{
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
};
use postgres::{Config, NoTls};
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};

View File

@@ -9,6 +9,7 @@ use pageserver::config::defaults::{
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
};
use pageserver::http::models::TimelineInfo;
use safekeeper::defaults::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
@@ -25,8 +26,6 @@ use utils::{
zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
};
use pageserver::timelines::TimelineInfo;
// Default id of a safekeeper node, if not specified on the command line.
const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);

View File

@@ -7,6 +7,10 @@ use utils::{
zid::{NodeId, ZTenantId, ZTimelineId},
};
// These enums are used in the API response fields.
use crate::repository::LocalTimelineState;
use crate::tenant_mgr::TenantState;
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
@@ -97,14 +101,59 @@ impl TenantConfigRequest {
}
}
/// A WAL receiver's data stored inside the global `WAL_RECEIVERS`.
/// We keep one WAL receiver active per timeline.
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
pub struct TenantInfo {
#[serde_as(as = "DisplayFromStr")]
pub id: ZTenantId,
pub state: Option<TenantState>,
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
pub has_in_progress_downloads: Option<bool>,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct WalReceiverEntry {
pub wal_producer_connstr: Option<String>,
pub struct LocalTimelineInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<ZTimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub last_record_lsn: Lsn,
#[serde_as(as = "Option<DisplayFromStr>")]
pub prev_record_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub latest_gc_cutoff_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
pub current_logical_size: Option<usize>, // is None when timeline is Unloaded
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
pub current_logical_size_non_incremental: Option<usize>,
pub current_physical_size_non_incremental: Option<u64>,
pub timeline_state: LocalTimelineState,
pub wal_source_connstr: Option<String>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub last_received_msg_lsn: Option<Lsn>,
/// the timestamp (in microseconds) of the last received message
pub last_received_msg_ts: Option<u128>,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RemoteTimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
pub awaits_download: bool,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: ZTenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: ZTimelineId,
pub local: Option<LocalTimelineInfo>,
pub remote: Option<RemoteTimelineInfo>,
}

View File

@@ -207,54 +207,6 @@ paths:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/timeline/{timeline_id}/wal_receiver:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
get:
description: Get wal receiver's data attached to the timeline
responses:
"200":
description: WalReceiverEntry
content:
application/json:
schema:
$ref: "#/components/schemas/WalReceiverEntry"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"404":
description: Error when no wal receiver is running or found
content:
application/json:
schema:
$ref: "#/components/schemas/NotFoundError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/attach:
parameters:
- name: tenant_id
@@ -587,6 +539,8 @@ components:
type: string
state:
type: string
current_physical_size:
type: integer
has_in_progress_downloads:
type: boolean
TenantCreateInfo:
@@ -687,16 +641,7 @@ components:
type: integer
current_physical_size_non_incremental:
type: integer
WalReceiverEntry:
type: object
required:
- thread_id
- wal_producer_connstr
properties:
thread_id:
type: integer
wal_producer_connstr:
wal_source_connstr:
type: string
last_received_msg_lsn:
type: string

View File

@@ -6,16 +6,19 @@ use hyper::{Body, Request, Response, Uri};
use remote_storage::GenericRemoteStorage;
use tracing::*;
use super::models::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo};
use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse,
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest,
};
use crate::repository::Repository;
use crate::layered_repository::metadata::TimelineMetadata;
use crate::pgdatadir_mapping::DatadirTimeline;
use crate::repository::{LocalTimelineState, RepositoryTimeline};
use crate::repository::{Repository, Timeline};
use crate::storage_sync;
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
use crate::tenant_config::TenantConfOpt;
use crate::tenant_mgr::TenantInfo;
use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo};
use crate::TimelineImpl;
use crate::{config::PageServerConf, tenant_mgr, timelines};
use utils::{
auth::JwtAuth,
@@ -26,6 +29,7 @@ use utils::{
request::parse_request_param,
RequestExt, RouterBuilder,
},
lsn::Lsn,
zid::{ZTenantId, ZTenantTimelineId, ZTimelineId},
};
@@ -79,6 +83,123 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
get_state(request).conf
}
// Helper functions to construct a LocalTimelineInfo struct for a timeline
fn local_timeline_info_from_loaded_timeline(
timeline: &TimelineImpl,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> anyhow::Result<LocalTimelineInfo> {
let last_record_lsn = timeline.get_last_record_lsn();
let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
let guard = timeline.last_received_wal.lock().unwrap();
if let Some(info) = guard.as_ref() {
(
Some(info.wal_source_connstr.clone()),
Some(info.last_received_msg_lsn),
Some(info.last_received_msg_ts),
)
} else {
(None, None, None)
}
};
let info = LocalTimelineInfo {
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
ancestor_lsn: {
match timeline.get_ancestor_lsn() {
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
}
},
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
last_record_lsn,
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
timeline_state: LocalTimelineState::Loaded,
current_logical_size: Some(timeline.get_current_logical_size()),
current_physical_size: Some(timeline.get_physical_size()),
current_logical_size_non_incremental: if include_non_incremental_logical_size {
Some(timeline.get_current_logical_size_non_incremental(last_record_lsn)?)
} else {
None
},
current_physical_size_non_incremental: if include_non_incremental_physical_size {
Some(timeline.get_physical_size_non_incremental()?)
} else {
None
},
wal_source_connstr,
last_received_msg_lsn,
last_received_msg_ts,
};
Ok(info)
}
fn local_timeline_info_from_unloaded_timeline(metadata: &TimelineMetadata) -> LocalTimelineInfo {
LocalTimelineInfo {
ancestor_timeline_id: metadata.ancestor_timeline(),
ancestor_lsn: {
match metadata.ancestor_lsn() {
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
}
},
disk_consistent_lsn: metadata.disk_consistent_lsn(),
last_record_lsn: metadata.disk_consistent_lsn(),
prev_record_lsn: metadata.prev_record_lsn(),
latest_gc_cutoff_lsn: metadata.latest_gc_cutoff_lsn(),
timeline_state: LocalTimelineState::Unloaded,
current_logical_size: None,
current_physical_size: None,
current_logical_size_non_incremental: None,
current_physical_size_non_incremental: None,
wal_source_connstr: None,
last_received_msg_lsn: None,
last_received_msg_ts: None,
}
}
fn local_timeline_info_from_repo_timeline(
repo_timeline: &RepositoryTimeline<TimelineImpl>,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> anyhow::Result<LocalTimelineInfo> {
match repo_timeline {
RepositoryTimeline::Loaded(timeline) => local_timeline_info_from_loaded_timeline(
&*timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
),
RepositoryTimeline::Unloaded { metadata } => {
Ok(local_timeline_info_from_unloaded_timeline(metadata))
}
}
}
fn list_local_timelines(
tenant_id: ZTenantId,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> Result<Vec<(ZTimelineId, LocalTimelineInfo)>> {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
.with_context(|| format!("Failed to get repo for tenant {}", tenant_id))?;
let repo_timelines = repo.list_timelines();
let mut local_timeline_info = Vec::with_capacity(repo_timelines.len());
for (timeline_id, repository_timeline) in repo_timelines {
local_timeline_info.push((
timeline_id,
local_timeline_info_from_repo_timeline(
&repository_timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
)?,
))
}
Ok(local_timeline_info)
}
// healthcheck handler
async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let config = get_config(&request);
@@ -93,16 +214,30 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
let new_timeline_info = tokio::task::spawn_blocking(move || {
let _enter = info_span!("/timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, lsn=?request_data.ancestor_start_lsn).entered();
timelines::create_timeline(
match timelines::create_timeline(
get_config(&request),
tenant_id,
request_data.new_timeline_id.map(ZTimelineId::from),
request_data.ancestor_timeline_id.map(ZTimelineId::from),
request_data.ancestor_start_lsn,
)
) {
Ok(Some((new_timeline_id, new_timeline))) => {
// Created. Construct a TimelineInfo for it.
let local_info = local_timeline_info_from_loaded_timeline(new_timeline.as_ref(), false, false)?;
Ok(Some(TimelineInfo {
tenant_id,
timeline_id: new_timeline_id,
local: Some(local_info),
remote: None,
}))
}
Ok(None) => Ok(None), // timeline already exists
Err(err) => Err(err),
}
})
.await
.map_err(ApiError::from_err)??;
.map_err(ApiError::from_err)??;
Ok(match new_timeline_info {
Some(info) => json_response(StatusCode::CREATED, info)?,
@@ -119,7 +254,7 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
query_param_present(&request, "include-non-incremental-physical-size");
let local_timeline_infos = tokio::task::spawn_blocking(move || {
let _enter = info_span!("timeline_list", tenant = %tenant_id).entered();
crate::timelines::get_local_timelines(
list_local_timelines(
tenant_id,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
@@ -184,9 +319,7 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
repo.get_timeline(timeline_id)
.as_ref()
.map(|timeline| {
LocalTimelineInfo::from_repo_timeline(
tenant_id,
timeline_id,
local_timeline_info_from_repo_timeline(
timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
@@ -234,23 +367,6 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
json_response(StatusCode::OK, timeline_info)
}
async fn wal_receiver_get_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
let wal_receiver_entry = crate::walreceiver::get_wal_receiver_entry(tenant_id, timeline_id)
.instrument(info_span!("wal_receiver_get", tenant = %tenant_id, timeline = %timeline_id))
.await
.ok_or_else(|| {
ApiError::NotFound(format!(
"WAL receiver data not found for tenant {tenant_id} and timeline {timeline_id}"
))
})?;
json_response(StatusCode::OK, &wal_receiver_entry)
}
// TODO makes sense to provide tenant config right away the same way as it handled in tenant_create
async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
@@ -438,14 +554,36 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
let index_accessor = remote_index.read().await;
let has_in_progress_downloads = index_accessor
.tenant_entry(&tenant_id)
.ok_or_else(|| ApiError::NotFound("Tenant not found in remote index".to_string()))?
.has_in_progress_downloads();
.map(|t| t.has_in_progress_downloads())
.unwrap_or_else(|| {
info!("Tenant {tenant_id} not found in remote index");
false
});
let current_physical_size =
match tokio::task::spawn_blocking(move || list_local_timelines(tenant_id, false, false))
.await
.map_err(ApiError::from_err)?
{
Err(err) => {
// Getting local timelines can fail when no local repo is on disk (e.g, when tenant data is being downloaded).
// In that case, put a warning message into log and operate normally.
warn!("Failed to get local timelines for tenant {tenant_id}: {err}");
None
}
Ok(local_timeline_infos) => Some(
local_timeline_infos
.into_iter()
.fold(0, |acc, x| acc + x.1.current_physical_size.unwrap()),
),
};
json_response(
StatusCode::OK,
TenantInfo {
id: tenant_id,
state: tenant_state,
current_physical_size,
has_in_progress_downloads: Some(has_in_progress_downloads),
},
)
@@ -615,9 +753,5 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id/detach",
timeline_delete_handler,
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/wal_receiver",
wal_receiver_get_handler,
)
.any(handler_404))
}

View File

@@ -67,6 +67,9 @@ pub use crate::layered_repository::ephemeral_file::writeback as writeback_epheme
// re-export for use in storage_sync.rs
pub use crate::layered_repository::timeline::save_metadata;
// re-export for use in walreceiver
pub use crate::layered_repository::timeline::WalReceiverInfo;
/// Parts of the `.neon/tenants/<tenantid>/timelines/<timelineid>` directory prefix.
pub const TIMELINES_SEGMENT_NAME: &str = "timelines";

View File

@@ -290,6 +290,17 @@ pub struct LayeredTimeline {
/// Current logical size of the "datadir", at the last LSN.
current_logical_size: AtomicIsize,
/// Information about the last processed message by the WAL receiver,
/// or None if WAL receiver has not received anything for this timeline
/// yet.
pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
}
pub struct WalReceiverInfo {
pub wal_source_connstr: String,
pub last_received_msg_lsn: Lsn,
pub last_received_msg_ts: u128,
}
/// Inherit all the functions from DatadirTimeline, to provide the
@@ -605,6 +616,8 @@ impl LayeredTimeline {
current_logical_size: AtomicIsize::new(0),
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
repartition_threshold: 0,
last_received_wal: Mutex::new(None),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result

View File

@@ -277,15 +277,6 @@ pub enum LocalTimelineState {
Unloaded,
}
impl<'a, T> From<&'a RepositoryTimeline<T>> for LocalTimelineState {
fn from(local_timeline_entry: &'a RepositoryTimeline<T>) -> Self {
match local_timeline_entry {
RepositoryTimeline::Loaded(_) => LocalTimelineState::Loaded,
RepositoryTimeline::Unloaded { .. } => LocalTimelineState::Unloaded,
}
}
}
///
/// Result of performing GC
///

View File

@@ -1120,7 +1120,7 @@ where
.instrument(info_span!("download_timeline_data")),
);
if let Some(delete_data) = batch.delete {
if let Some(mut delete_data) = batch.delete {
if upload_result.is_some() {
match validate_task_retries(delete_data, max_sync_errors)
.instrument(info_span!("retries_validation"))
@@ -1153,6 +1153,7 @@ where
}
}
} else {
delete_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
warn!("Skipping delete task due to failed upload tasks, reenqueuing");
}

View File

@@ -2,6 +2,7 @@
//! page server.
use crate::config::PageServerConf;
use crate::http::models::TenantInfo;
use crate::layered_repository::{load_metadata, LayeredRepository};
use crate::repository::Repository;
use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex};
@@ -14,7 +15,6 @@ use crate::{thread_mgr, timelines, walreceiver};
use crate::{RepositoryImpl, TimelineImpl};
use anyhow::Context;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt;
@@ -502,15 +502,9 @@ fn load_local_timeline(
Ok(inmem_timeline)
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
pub struct TenantInfo {
#[serde_as(as = "DisplayFromStr")]
pub id: ZTenantId,
pub state: Option<TenantState>,
pub has_in_progress_downloads: Option<bool>,
}
///
/// Get list of tenants, for the mgmt API
///
pub fn list_tenants(remote_index: &RemoteTimelineIndex) -> Vec<TenantInfo> {
tenants_state::read_tenants()
.iter()
@@ -526,6 +520,7 @@ pub fn list_tenants(remote_index: &RemoteTimelineIndex) -> Vec<TenantInfo> {
TenantInfo {
id: *id,
state: Some(tenant.state),
current_physical_size: None,
has_in_progress_downloads,
}
})

View File

@@ -4,8 +4,6 @@
use anyhow::{bail, ensure, Context, Result};
use postgres_ffi::ControlFileData;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::{
fs,
path::Path,
@@ -20,138 +18,15 @@ use utils::{
zid::{ZTenantId, ZTimelineId},
};
use crate::tenant_mgr;
use crate::{
config::PageServerConf,
layered_repository::metadata::TimelineMetadata,
repository::{LocalTimelineState, Repository},
storage_sync::index::RemoteIndex,
tenant_config::TenantConfOpt,
DatadirTimeline, RepositoryImpl, TimelineImpl,
config::PageServerConf, repository::Repository, storage_sync::index::RemoteIndex,
tenant_config::TenantConfOpt, RepositoryImpl, TimelineImpl,
};
use crate::{import_datadir, LOG_FILE_NAME};
use crate::{layered_repository::LayeredRepository, walredo::WalRedoManager};
use crate::{repository::RepositoryTimeline, tenant_mgr};
use crate::{repository::Timeline, CheckpointConfig};
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LocalTimelineInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<ZTimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub last_record_lsn: Lsn,
#[serde_as(as = "Option<DisplayFromStr>")]
pub prev_record_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub latest_gc_cutoff_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
pub current_logical_size: Option<usize>, // is None when timeline is Unloaded
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
pub current_logical_size_non_incremental: Option<usize>,
pub current_physical_size_non_incremental: Option<u64>,
pub timeline_state: LocalTimelineState,
}
impl LocalTimelineInfo {
pub fn from_loaded_timeline(
timeline: &TimelineImpl,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> anyhow::Result<Self> {
let last_record_lsn = timeline.get_last_record_lsn();
let info = LocalTimelineInfo {
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
ancestor_lsn: {
match timeline.get_ancestor_lsn() {
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
}
},
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
last_record_lsn,
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
timeline_state: LocalTimelineState::Loaded,
current_physical_size: Some(timeline.get_physical_size()),
current_logical_size: Some(timeline.get_current_logical_size()),
current_logical_size_non_incremental: if include_non_incremental_logical_size {
Some(timeline.get_current_logical_size_non_incremental(last_record_lsn)?)
} else {
None
},
current_physical_size_non_incremental: if include_non_incremental_physical_size {
Some(timeline.get_physical_size_non_incremental()?)
} else {
None
},
};
Ok(info)
}
pub fn from_unloaded_timeline(metadata: &TimelineMetadata) -> Self {
LocalTimelineInfo {
ancestor_timeline_id: metadata.ancestor_timeline(),
ancestor_lsn: {
match metadata.ancestor_lsn() {
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
}
},
disk_consistent_lsn: metadata.disk_consistent_lsn(),
last_record_lsn: metadata.disk_consistent_lsn(),
prev_record_lsn: metadata.prev_record_lsn(),
latest_gc_cutoff_lsn: metadata.latest_gc_cutoff_lsn(),
timeline_state: LocalTimelineState::Unloaded,
current_logical_size: None,
current_physical_size: None,
current_logical_size_non_incremental: None,
current_physical_size_non_incremental: None,
}
}
pub fn from_repo_timeline<T>(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
repo_timeline: &RepositoryTimeline<T>,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> anyhow::Result<Self> {
match repo_timeline {
RepositoryTimeline::Loaded(_) => {
let timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id)?;
Self::from_loaded_timeline(
&*timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
)
}
RepositoryTimeline::Unloaded { metadata } => Ok(Self::from_unloaded_timeline(metadata)),
}
}
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RemoteTimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
pub awaits_download: bool,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: ZTenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: ZTimelineId,
pub local: Option<LocalTimelineInfo>,
pub remote: Option<RemoteTimelineInfo>,
}
#[derive(Debug, Clone, Copy)]
pub struct PointInTime {
pub timeline_id: ZTimelineId,
@@ -333,38 +208,22 @@ fn bootstrap_timeline<R: Repository>(
Ok(())
}
pub(crate) fn get_local_timelines(
tenant_id: ZTenantId,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> Result<Vec<(ZTimelineId, LocalTimelineInfo)>> {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
.with_context(|| format!("Failed to get repo for tenant {}", tenant_id))?;
let repo_timelines = repo.list_timelines();
let mut local_timeline_info = Vec::with_capacity(repo_timelines.len());
for (timeline_id, repository_timeline) in repo_timelines {
local_timeline_info.push((
timeline_id,
LocalTimelineInfo::from_repo_timeline(
tenant_id,
timeline_id,
&repository_timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
)?,
))
}
Ok(local_timeline_info)
}
///
/// Create a new timeline.
///
/// Returns the new timeline ID and reference to its Timeline object.
///
/// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
/// the same timeline ID already exists, returns None. If `new_timeline_id` is not given,
/// a new unique ID is generated.
///
pub(crate) fn create_timeline(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
new_timeline_id: Option<ZTimelineId>,
ancestor_timeline_id: Option<ZTimelineId>,
mut ancestor_start_lsn: Option<Lsn>,
) -> Result<Option<TimelineInfo>> {
) -> Result<Option<(ZTimelineId, Arc<TimelineImpl>)>> {
let new_timeline_id = new_timeline_id.unwrap_or_else(ZTimelineId::generate);
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
@@ -373,7 +232,7 @@ pub(crate) fn create_timeline(
return Ok(None);
}
let new_timeline_info = match ancestor_timeline_id {
let _new_timeline = match ancestor_timeline_id {
Some(ancestor_timeline_id) => {
let ancestor_timeline = repo
.get_timeline_load(ancestor_timeline_id)
@@ -401,26 +260,13 @@ pub(crate) fn create_timeline(
}
}
repo.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?;
// load the timeline into memory
let loaded_timeline =
tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
LocalTimelineInfo::from_loaded_timeline(&*loaded_timeline, false, false)
.context("cannot fill timeline info")?
}
None => {
bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?;
// load the timeline into memory
let new_timeline =
tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
LocalTimelineInfo::from_loaded_timeline(&*new_timeline, false, false)
.context("cannot fill timeline info")?
repo.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?
}
None => bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?,
};
Ok(Some(TimelineInfo {
tenant_id,
timeline_id: new_timeline_id,
local: Some(new_timeline_info),
remote: None,
}))
// load the timeline into memory
let loaded_timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
Ok(Some((new_timeline_id, loaded_timeline)))
}

View File

@@ -26,7 +26,6 @@ mod walreceiver_connection;
use anyhow::{ensure, Context};
use etcd_broker::Client;
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::cell::Cell;
use std::collections::{hash_map, HashMap, HashSet};
use std::future::Future;
@@ -36,14 +35,13 @@ use std::thread_local;
use std::time::Duration;
use tokio::{
select,
sync::{mpsc, watch, RwLock},
sync::{mpsc, watch},
task::JoinHandle,
};
use tracing::*;
use url::Url;
use crate::config::PageServerConf;
use crate::http::models::WalReceiverEntry;
use crate::tenant_mgr::{self, LocalTimelineUpdate, TenantState};
use crate::thread_mgr::{self, ThreadKind};
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
@@ -55,23 +53,6 @@ thread_local! {
pub(crate) static IS_WAL_RECEIVER: Cell<bool> = Cell::new(false);
}
/// WAL receiver state for sharing with the outside world.
/// Only entries for timelines currently available in pageserver are stored.
static WAL_RECEIVER_ENTRIES: Lazy<RwLock<HashMap<ZTenantTimelineId, WalReceiverEntry>>> =
Lazy::new(|| RwLock::new(HashMap::new()));
/// Gets the public WAL streaming entry for a certain timeline.
pub async fn get_wal_receiver_entry(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
) -> Option<WalReceiverEntry> {
WAL_RECEIVER_ENTRIES
.read()
.await
.get(&ZTenantTimelineId::new(tenant_id, timeline_id))
.cloned()
}
/// Sets up the main WAL receiver thread that manages the rest of the subtasks inside of it, per timeline.
/// See comments in [`wal_receiver_main_thread_loop_step`] for more details on per timeline activities.
pub fn init_wal_receiver_main_thread(
@@ -281,13 +262,10 @@ async fn wal_receiver_main_thread_loop_step<'a>(
}
None => warn!("Timeline {id} does not have a tenant entry in wal receiver main thread"),
};
{
WAL_RECEIVER_ENTRIES.write().await.remove(&id);
if let Err(e) = join_confirmation_sender.send(()) {
warn!("cannot send wal_receiver shutdown confirmation {e}")
} else {
info!("confirm walreceiver shutdown for {id}");
}
if let Err(e) = join_confirmation_sender.send(()) {
warn!("cannot send wal_receiver shutdown confirmation {e}")
} else {
info!("confirm walreceiver shutdown for {id}");
}
}
// Timeline got attached, retrieve all necessary information to start its broker loop and maintain this loop endlessly.
@@ -322,17 +300,6 @@ async fn wal_receiver_main_thread_loop_step<'a>(
}
};
{
WAL_RECEIVER_ENTRIES.write().await.insert(
id,
WalReceiverEntry {
wal_producer_connstr: None,
last_received_msg_lsn: None,
last_received_msg_ts: None,
},
);
}
vacant_connection_manager_entry.insert(
connection_manager::spawn_connection_manager_task(
id,

View File

@@ -168,7 +168,7 @@ async fn connection_manager_loop_step(
walreceiver_state
.change_connection(
new_candidate.safekeeper_id,
new_candidate.wal_producer_connstr,
new_candidate.wal_source_connstr,
)
.await
}
@@ -302,7 +302,7 @@ impl WalreceiverState {
}
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_producer_connstr: String) {
async fn change_connection(&mut self, new_sk_id: NodeId, new_wal_source_connstr: String) {
if let Some(old_connection) = self.wal_connection.take() {
old_connection.connection_task.shutdown().await
}
@@ -324,7 +324,7 @@ impl WalreceiverState {
.await;
super::walreceiver_connection::handle_walreceiver_connection(
id,
&new_wal_producer_connstr,
&new_wal_source_connstr,
events_sender.as_ref(),
cancellation,
connect_timeout,
@@ -387,7 +387,7 @@ impl WalreceiverState {
Some(existing_wal_connection) => {
let connected_sk_node = existing_wal_connection.sk_id;
let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) =
let (new_sk_id, new_safekeeper_etcd_data, new_wal_source_connstr) =
self.select_connection_candidate(Some(connected_sk_node))?;
let now = Utc::now().naive_utc();
@@ -397,7 +397,7 @@ impl WalreceiverState {
if latest_interaciton > self.lagging_wal_timeout {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_producer_connstr: new_wal_producer_connstr,
wal_source_connstr: new_wal_source_connstr,
reason: ReconnectReason::NoWalTimeout {
last_wal_interaction: Some(
existing_wal_connection.latest_connection_update,
@@ -423,7 +423,7 @@ impl WalreceiverState {
return Some(
NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_producer_connstr: new_wal_producer_connstr,
wal_source_connstr: new_wal_source_connstr,
reason: ReconnectReason::LaggingWal { current_lsn, new_lsn, threshold: self.max_lsn_wal_lag },
});
}
@@ -434,18 +434,18 @@ impl WalreceiverState {
None => {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_producer_connstr: new_wal_producer_connstr,
wal_source_connstr: new_wal_source_connstr,
reason: ReconnectReason::NoEtcdDataForExistingConnection,
})
}
}
}
None => {
let (new_sk_id, _, new_wal_producer_connstr) =
let (new_sk_id, _, new_wal_source_connstr) =
self.select_connection_candidate(None)?;
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_producer_connstr: new_wal_producer_connstr,
wal_source_connstr: new_wal_source_connstr,
reason: ReconnectReason::NoExistingConnection,
});
}
@@ -546,7 +546,7 @@ impl WalreceiverState {
#[derive(Debug, PartialEq, Eq)]
struct NewWalConnectionCandidate {
safekeeper_id: NodeId,
wal_producer_connstr: String,
wal_source_connstr: String,
reason: ReconnectReason,
}
@@ -803,7 +803,7 @@ mod tests {
"Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
);
assert!(only_candidate
.wal_producer_connstr
.wal_source_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
let selected_lsn = 100_000;
@@ -868,7 +868,7 @@ mod tests {
"Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
);
assert!(biggest_wal_candidate
.wal_producer_connstr
.wal_source_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
Ok(())
@@ -985,7 +985,7 @@ mod tests {
"Should select new safekeeper due to missing etcd data, even if there's an existing connection with this safekeeper"
);
assert!(only_candidate
.wal_producer_connstr
.wal_source_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
Ok(())
@@ -1067,7 +1067,7 @@ mod tests {
"Should select bigger WAL safekeeper if it starts to lag enough"
);
assert!(over_threshcurrent_candidate
.wal_producer_connstr
.wal_source_connstr
.contains("advanced by Lsn safekeeper"));
Ok(())
@@ -1134,7 +1134,7 @@ mod tests {
unexpected => panic!("Unexpected reason: {unexpected:?}"),
}
assert!(over_threshcurrent_candidate
.wal_producer_connstr
.wal_source_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
Ok(())
@@ -1190,7 +1190,7 @@ mod tests {
unexpected => panic!("Unexpected reason: {unexpected:?}"),
}
assert!(over_threshcurrent_candidate
.wal_producer_connstr
.wal_source_connstr
.contains(DUMMY_SAFEKEEPER_CONNSTR));
Ok(())

View File

@@ -19,7 +19,7 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use super::TaskEvent;
use crate::{
http::models::WalReceiverEntry,
layered_repository::WalReceiverInfo,
pgdatadir_mapping::DatadirTimeline,
repository::{Repository, Timeline},
tenant_mgr,
@@ -29,18 +29,18 @@ use crate::{
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
/// Opens a conneciton to the given wal producer and streams the WAL, sending progress messages during streaming.
/// Open a connection to the given safekeeper and receive WAL, sending back progress
/// messages as we go.
pub async fn handle_walreceiver_connection(
id: ZTenantTimelineId,
wal_producer_connstr: &str,
wal_source_connstr: &str,
events_sender: &watch::Sender<TaskEvent<ReplicationFeedback>>,
mut cancellation: watch::Receiver<()>,
connect_timeout: Duration,
) -> anyhow::Result<()> {
// Connect to the database in replication mode.
info!("connecting to {wal_producer_connstr}");
let connect_cfg =
format!("{wal_producer_connstr} application_name=pageserver replication=true");
info!("connecting to {wal_source_connstr}");
let connect_cfg = format!("{wal_source_connstr} application_name=pageserver replication=true");
let (mut replication_client, connection) = time::timeout(
connect_timeout,
@@ -232,21 +232,16 @@ pub async fn handle_walreceiver_connection(
let apply_lsn = u64::from(timeline_remote_consistent_lsn);
let ts = SystemTime::now();
// Update the current WAL receiver's data stored inside the global hash table `WAL_RECEIVERS`
{
super::WAL_RECEIVER_ENTRIES.write().await.insert(
id,
WalReceiverEntry {
wal_producer_connstr: Some(wal_producer_connstr.to_owned()),
last_received_msg_lsn: Some(last_lsn),
last_received_msg_ts: Some(
ts.duration_since(SystemTime::UNIX_EPOCH)
.expect("Received message time should be before UNIX EPOCH!")
.as_micros(),
),
},
);
}
// Update the status about what we just received. This is shown in the mgmt API.
let last_received_wal = WalReceiverInfo {
wal_source_connstr: wal_source_connstr.to_owned(),
last_received_msg_lsn: last_lsn,
last_received_msg_ts: ts
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Received message time should be before UNIX EPOCH!")
.as_micros(),
};
*timeline.last_received_wal.lock().unwrap() = Some(last_received_wal);
// Send zenith feedback message.
// Regular standby_status_update fields are put into this message.

1565
poetry.lock generated

File diff suppressed because one or more lines are too long

View File

@@ -8,7 +8,7 @@ authors = []
python = "^3.9"
pytest = "^6.2.5"
psycopg2-binary = "^2.9.1"
typing-extensions = "^3.10.0"
typing-extensions = "^4.1.0"
PyJWT = {version = "^2.1.0", extras = ["crypto"]}
requests = "^2.26.0"
pytest-xdist = "^2.3.0"
@@ -16,20 +16,21 @@ asyncpg = "^0.24.0"
aiopg = "^1.3.1"
cached-property = "^1.5.2"
Jinja2 = "^3.0.2"
types-requests = "^2.27.7"
types-psycopg2 = "^2.9.6"
types-requests = "^2.28.5"
types-psycopg2 = "^2.9.18"
boto3 = "^1.20.40"
boto3-stubs = "^1.20.40"
boto3-stubs = {version = "^1.23.38", extras = ["s3"]}
moto = {version = "^3.0.0", extras = ["server"]}
backoff = "^1.11.1"
pytest-lazy-fixture = "^0.6.3"
prometheus-client = "^0.14.1"
pytest-timeout = "^2.1.0"
Werkzeug = "2.1.2"
[tool.poetry.dev-dependencies]
yapf = "==0.31.0"
flake8 = "^3.9.2"
mypy = "==0.910"
mypy = "==0.971"
[build-system]
requires = ["poetry-core>=1.0.0"]

View File

@@ -1,6 +1,7 @@
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverApiException
from fixtures.utils import query_scalar
#
@@ -25,13 +26,11 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
pg_branch0 = env.postgres.create_start('main', tenant_id=tenant)
branch0_cur = pg_branch0.connect().cursor()
branch0_cur.execute("SHOW neon.timeline_id")
branch0_timeline = branch0_cur.fetchone()[0]
branch0_timeline = query_scalar(branch0_cur, "SHOW neon.timeline_id")
log.info(f"b0 timeline {branch0_timeline}")
# Create table, and insert 100k rows.
branch0_cur.execute('SELECT pg_current_wal_insert_lsn()')
branch0_lsn = branch0_cur.fetchone()[0]
branch0_lsn = query_scalar(branch0_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f"b0 at lsn {branch0_lsn}")
branch0_cur.execute('CREATE TABLE foo (t text) WITH (autovacuum_enabled = off)')
@@ -40,8 +39,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
SELECT '00112233445566778899AABBCCDDEEFF' || ':branch0:' || g
FROM generate_series(1, 100000) g
''')
branch0_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_100 = branch0_cur.fetchone()[0]
lsn_100 = query_scalar(branch0_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f'LSN after 100k rows: {lsn_100}')
# Create branch1.
@@ -50,12 +48,10 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
log.info("postgres is running on 'branch1' branch")
branch1_cur = pg_branch1.connect().cursor()
branch1_cur.execute("SHOW neon.timeline_id")
branch1_timeline = branch1_cur.fetchone()[0]
branch1_timeline = query_scalar(branch1_cur, "SHOW neon.timeline_id")
log.info(f"b1 timeline {branch1_timeline}")
branch1_cur.execute('SELECT pg_current_wal_insert_lsn()')
branch1_lsn = branch1_cur.fetchone()[0]
branch1_lsn = query_scalar(branch1_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f"b1 at lsn {branch1_lsn}")
# Insert 100k rows.
@@ -64,8 +60,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
SELECT '00112233445566778899AABBCCDDEEFF' || ':branch1:' || g
FROM generate_series(1, 100000) g
''')
branch1_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_200 = branch1_cur.fetchone()[0]
lsn_200 = query_scalar(branch1_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f'LSN after 200k rows: {lsn_200}')
# Create branch2.
@@ -74,12 +69,10 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
log.info("postgres is running on 'branch2' branch")
branch2_cur = pg_branch2.connect().cursor()
branch2_cur.execute("SHOW neon.timeline_id")
branch2_timeline = branch2_cur.fetchone()[0]
branch2_timeline = query_scalar(branch2_cur, "SHOW neon.timeline_id")
log.info(f"b2 timeline {branch2_timeline}")
branch2_cur.execute('SELECT pg_current_wal_insert_lsn()')
branch2_lsn = branch2_cur.fetchone()[0]
branch2_lsn = query_scalar(branch2_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f"b2 at lsn {branch2_lsn}")
# Insert 100k rows.
@@ -88,20 +81,16 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
SELECT '00112233445566778899AABBCCDDEEFF' || ':branch2:' || g
FROM generate_series(1, 100000) g
''')
branch2_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_300 = branch2_cur.fetchone()[0]
lsn_300 = query_scalar(branch2_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f'LSN after 300k rows: {lsn_300}')
# Run compaction on branch1.
psconn = env.pageserver.connect()
log.info(f'compact {tenant.hex} {branch1_timeline} {lsn_200}')
psconn.cursor().execute(f'''compact {tenant.hex} {branch1_timeline} {lsn_200}''')
compact = f'compact {tenant.hex} {branch1_timeline} {lsn_200}'
log.info(compact)
env.pageserver.safe_psql(compact)
branch0_cur.execute('SELECT count(*) FROM foo')
assert branch0_cur.fetchone() == (100000, )
assert query_scalar(branch0_cur, 'SELECT count(*) FROM foo') == 100000
branch1_cur.execute('SELECT count(*) FROM foo')
assert branch1_cur.fetchone() == (200000, )
assert query_scalar(branch1_cur, 'SELECT count(*) FROM foo') == 200000
branch2_cur.execute('SELECT count(*) FROM foo')
assert branch2_cur.fetchone() == (300000, )
assert query_scalar(branch2_cur, 'SELECT count(*) FROM foo') == 300000

View File

@@ -3,7 +3,7 @@ import pytest
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import lsn_from_hex
from fixtures.utils import lsn_from_hex, query_scalar
# Test the GC implementation when running with branching.
@@ -76,20 +76,17 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')"
)
main_cur.execute('INSERT INTO foo SELECT FROM generate_series(1, 100000)')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn1 = main_cur.fetchone()[0]
lsn1 = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f'LSN1: {lsn1}')
main_cur.execute('INSERT INTO foo SELECT FROM generate_series(1, 100000)')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn2 = main_cur.fetchone()[0]
lsn2 = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f'LSN2: {lsn2}')
# Set the GC horizon so that lsn1 is inside the horizon, which means
# we can create a new branch starting from lsn1.
env.pageserver.safe_psql(
f'''do_gc {tenant.hex} {timeline_main.hex} {lsn_from_hex(lsn2) - lsn_from_hex(lsn1) + 1024}'''
)
f'do_gc {tenant.hex} {timeline_main.hex} {lsn_from_hex(lsn2) - lsn_from_hex(lsn1) + 1024}')
env.neon_cli.create_branch('test_branch',
'test_main',
@@ -100,8 +97,7 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
branch_cur = pg_branch.connect().cursor()
branch_cur.execute('INSERT INTO foo SELECT FROM generate_series(1, 100000)')
branch_cur.execute('SELECT count(*) FROM foo')
assert branch_cur.fetchone() == (200000, )
assert query_scalar(branch_cur, 'SELECT count(*) FROM foo') == 200000
# This test simulates a race condition happening when branch creation and GC are performed concurrently.

View File

@@ -1,9 +1,7 @@
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.utils import print_gc_result
from fixtures.utils import print_gc_result, query_scalar
from fixtures.neon_fixtures import NeonEnvBuilder
@@ -27,26 +25,22 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
pgmain = env.postgres.create_start('test_branch_behind')
log.info("postgres is running on 'test_branch_behind' branch")
main_pg_conn = pgmain.connect()
main_cur = main_pg_conn.cursor()
main_cur = pgmain.connect().cursor()
main_cur.execute("SHOW neon.timeline_id")
timeline = main_cur.fetchone()[0]
timeline = query_scalar(main_cur, "SHOW neon.timeline_id")
# Create table, and insert the first 100 rows
main_cur.execute('CREATE TABLE foo (t text)')
# keep some early lsn to test branch creation on out of date lsn
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
gced_lsn = main_cur.fetchone()[0]
gced_lsn = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
main_cur.execute('''
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_a = main_cur.fetchone()[0]
lsn_a = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f'LSN after 100 rows: {lsn_a}')
# Insert some more rows. (This generates enough WAL to fill a few segments.)
@@ -55,8 +49,7 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
SELECT 'long string to consume some space' || g
FROM generate_series(1, 200000) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_b = main_cur.fetchone()[0]
lsn_b = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f'LSN after 200100 rows: {lsn_b}')
# Branch at the point where only 100 rows were inserted
@@ -70,10 +63,8 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
SELECT 'long string to consume some space' || g
FROM generate_series(1, 200000) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_c = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_c = main_cur.fetchone()[0]
log.info(f'LSN after 400100 rows: {lsn_c}')
# Branch at the point where only 200100 rows were inserted
@@ -85,20 +76,15 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
pg_more = env.postgres.create_start('test_branch_behind_more')
# On the 'hundred' branch, we should see only 100 rows
hundred_pg_conn = pg_hundred.connect()
hundred_cur = hundred_pg_conn.cursor()
hundred_cur.execute('SELECT count(*) FROM foo')
assert hundred_cur.fetchone() == (100, )
hundred_cur = pg_hundred.connect().cursor()
assert query_scalar(hundred_cur, 'SELECT count(*) FROM foo') == 100
# On the 'more' branch, we should see 100200 rows
more_pg_conn = pg_more.connect()
more_cur = more_pg_conn.cursor()
more_cur.execute('SELECT count(*) FROM foo')
assert more_cur.fetchone() == (200100, )
more_cur = pg_more.connect().cursor()
assert query_scalar(more_cur, 'SELECT count(*) FROM foo') == 200100
# All the rows are visible on the main branch
main_cur.execute('SELECT count(*) FROM foo')
assert main_cur.fetchone() == (400100, )
assert query_scalar(main_cur, 'SELECT count(*) FROM foo') == 400100
# Check bad lsn's for branching
@@ -107,9 +93,7 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
'test_branch_behind',
ancestor_start_lsn="0/3000000")
pg = env.postgres.create_start('test_branch_segment_boundary')
cur = pg.connect().cursor()
cur.execute('SELECT 1')
assert cur.fetchone() == (1, )
assert pg.safe_psql('SELECT 1')[0][0] == 1
# branch at pre-initdb lsn
with pytest.raises(Exception, match="invalid branch start lsn"):
@@ -122,12 +106,11 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
ancestor_start_lsn="0/42")
# check that we cannot create branch based on garbage collected data
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
# call gc to advace latest_gc_cutoff_lsn
pscur.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
with env.pageserver.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
# call gc to advace latest_gc_cutoff_lsn
pscur.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0")
row = pscur.fetchone()
print_gc_result(row)
with pytest.raises(Exception, match="invalid branch start lsn"):
# this gced_lsn is pretty random, so if gc is disabled this woudln't fail
@@ -136,11 +119,8 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
ancestor_start_lsn=gced_lsn)
# check that after gc everything is still there
hundred_cur.execute('SELECT count(*) FROM foo')
assert hundred_cur.fetchone() == (100, )
assert query_scalar(hundred_cur, 'SELECT count(*) FROM foo') == 100
more_cur.execute('SELECT count(*) FROM foo')
assert more_cur.fetchone() == (200100, )
assert query_scalar(more_cur, 'SELECT count(*) FROM foo') == 200100
main_cur.execute('SELECT count(*) FROM foo')
assert main_cur.fetchone() == (400100, )
assert query_scalar(main_cur, 'SELECT count(*) FROM foo') == 400100

View File

@@ -1,10 +1,14 @@
from typing import List, Tuple
from uuid import UUID
import pytest
import concurrent.futures
from contextlib import closing
from fixtures.neon_fixtures import NeonEnvBuilder, NeonEnv
from fixtures.neon_fixtures import NeonEnvBuilder, NeonEnv, Postgres
from fixtures.log_helper import log
import os
from fixtures.utils import query_scalar
# Test restarting page server, while safekeeper and compute node keep
# running.
@@ -13,7 +17,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_timelines = []
tenant_timelines: List[Tuple[str, str, Postgres]] = []
for n in range(4):
tenant_id_uuid, timeline_id_uuid = env.neon_cli.create_tenant()
@@ -21,13 +25,11 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
timeline_id = timeline_id_uuid.hex
pg = env.postgres.create_start(f'main', tenant_id=tenant_id_uuid)
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100), 'payload'")
with pg.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100), 'payload'")
cur.execute("SHOW neon.timeline_id")
timeline_id = cur.fetchone()[0]
timeline_id = query_scalar(cur, "SHOW neon.timeline_id")
pg.stop()
tenant_timelines.append((tenant_id, timeline_id, pg))
@@ -68,10 +70,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
# Tenant 0 should still work
pg0.start()
with closing(pg0.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM t")
assert cur.fetchone()[0] == 100
assert pg0.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100
# But all others are broken
for n in range(1, 4):

View File

@@ -5,6 +5,7 @@ from contextlib import closing
from fixtures.neon_fixtures import NeonEnv
from fixtures.log_helper import log
from fixtures.utils import query_scalar
#
@@ -32,17 +33,16 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
pg.safe_psql('CREATE EXTENSION neon_test_utils')
# Consume many xids to advance clog
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute('select test_consume_xids(1000*1000*10);')
log.info('xids consumed')
with pg.cursor() as cur:
cur.execute('select test_consume_xids(1000*1000*10);')
log.info('xids consumed')
# call a checkpoint to trigger TruncateSubtrans
cur.execute('CHECKPOINT;')
# call a checkpoint to trigger TruncateSubtrans
cur.execute('CHECKPOINT;')
# ensure WAL flush
cur.execute('select txid_current()')
log.info(cur.fetchone())
# ensure WAL flush
cur.execute('select txid_current()')
log.info(cur.fetchone())
# wait for autovacuum to truncate the pg_xact
# XXX Is it worth to add a timeout here?
@@ -54,11 +54,9 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
time.sleep(5)
# checkpoint to advance latest lsn
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute('CHECKPOINT;')
cur.execute('select pg_current_wal_insert_lsn()')
lsn_after_truncation = cur.fetchone()[0]
with pg.cursor() as cur:
cur.execute('CHECKPOINT;')
lsn_after_truncation = query_scalar(cur, 'select pg_current_wal_insert_lsn()')
# create new branch after clog truncation and start a compute node on it
log.info(f'create branch at lsn_after_truncation {lsn_after_truncation}')

View File

@@ -4,6 +4,7 @@ import pathlib
from contextlib import closing
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.log_helper import log
from fixtures.utils import query_scalar
#
@@ -16,15 +17,13 @@ def test_createdb(neon_simple_env: NeonEnv):
pg = env.postgres.create_start('test_createdb')
log.info("postgres is running on 'test_createdb' branch")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute('VACUUM FULL pg_class')
with pg.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute('VACUUM FULL pg_class')
cur.execute('CREATE DATABASE foodb')
cur.execute('CREATE DATABASE foodb')
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn = cur.fetchone()[0]
lsn = query_scalar(cur, 'SELECT pg_current_wal_insert_lsn()')
# Create a branch
env.neon_cli.create_branch('test_createdb2', 'test_createdb', ancestor_start_lsn=lsn)
@@ -32,21 +31,21 @@ def test_createdb(neon_simple_env: NeonEnv):
# Test that you can connect to the new database on both branches
for db in (pg, pg2):
with closing(db.connect(dbname='foodb')) as conn:
with conn.cursor() as cur:
# Check database size in both branches
cur.execute("""
select pg_size_pretty(pg_database_size('foodb')),
pg_size_pretty(
sum(pg_relation_size(oid, 'main'))
+sum(pg_relation_size(oid, 'vm'))
+sum(pg_relation_size(oid, 'fsm'))
) FROM pg_class where relisshared is false
""")
res = cur.fetchone()
# check that dbsize equals sum of all relation sizes, excluding shared ones
# This is how we define dbsize in neon for now
assert res[0] == res[1]
with db.cursor(dbname='foodb') as cur:
# Check database size in both branches
cur.execute("""
select pg_size_pretty(pg_database_size('foodb')),
pg_size_pretty(
sum(pg_relation_size(oid, 'main'))
+sum(pg_relation_size(oid, 'vm'))
+sum(pg_relation_size(oid, 'fsm'))
) FROM pg_class where relisshared is false
""")
res = cur.fetchone()
assert res is not None
# check that dbsize equals sum of all relation sizes, excluding shared ones
# This is how we define dbsize in neon for now
assert res[0] == res[1]
#
@@ -58,24 +57,19 @@ def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
pg = env.postgres.create_start('test_dropdb')
log.info("postgres is running on 'test_dropdb' branch")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute('CREATE DATABASE foodb')
with pg.cursor() as cur:
cur.execute('CREATE DATABASE foodb')
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_before_drop = cur.fetchone()[0]
lsn_before_drop = query_scalar(cur, 'SELECT pg_current_wal_insert_lsn()')
cur.execute("SELECT oid FROM pg_database WHERE datname='foodb';")
dboid = cur.fetchone()[0]
dboid = query_scalar(cur, "SELECT oid FROM pg_database WHERE datname='foodb';")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute('DROP DATABASE foodb')
with pg.cursor() as cur:
cur.execute('DROP DATABASE foodb')
cur.execute('CHECKPOINT')
cur.execute('CHECKPOINT')
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_after_drop = cur.fetchone()[0]
lsn_after_drop = query_scalar(cur, 'SELECT pg_current_wal_insert_lsn()')
# Create two branches before and after database drop.
env.neon_cli.create_branch('test_before_dropdb',

View File

@@ -1,7 +1,6 @@
from contextlib import closing
from fixtures.neon_fixtures import NeonEnv
from fixtures.log_helper import log
from fixtures.utils import query_scalar
#
@@ -13,15 +12,13 @@ def test_createuser(neon_simple_env: NeonEnv):
pg = env.postgres.create_start('test_createuser')
log.info("postgres is running on 'test_createuser' branch")
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute('CREATE USER testuser with password %s', ('testpwd', ))
with pg.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute('CREATE USER testuser with password %s', ('testpwd', ))
cur.execute('CHECKPOINT')
cur.execute('CHECKPOINT')
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn = cur.fetchone()[0]
lsn = query_scalar(cur, 'SELECT pg_current_wal_insert_lsn()')
# Create a branch
env.neon_cli.create_branch('test_createuser2', 'test_createuser', ancestor_start_lsn=lsn)

View File

@@ -1,10 +1,8 @@
from contextlib import closing
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PortDistributor, VanillaPostgres
from fixtures.neon_fixtures import pg_distrib_dir
import os
from fixtures.utils import subprocess_capture
from fixtures.utils import query_scalar, subprocess_capture
num_rows = 1000
@@ -21,19 +19,17 @@ def test_fullbackup(neon_env_builder: NeonEnvBuilder,
pgmain = env.postgres.create_start('test_fullbackup')
log.info("postgres is running on 'test_fullbackup' branch")
timeline = pgmain.safe_psql("SHOW neon.timeline_id")[0][0]
with pgmain.cursor() as cur:
timeline = query_scalar(cur, "SHOW neon.timeline_id")
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
# data loading may take a while, so increase statement timeout
cur.execute("SET statement_timeout='300s'")
cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g
from generate_series(1,{num_rows}) g''')
cur.execute("CHECKPOINT")
# data loading may take a while, so increase statement timeout
cur.execute("SET statement_timeout='300s'")
cur.execute(f'''CREATE TABLE tbl AS SELECT 'long string to consume some space' || g
from generate_series(1,{num_rows}) g''')
cur.execute("CHECKPOINT")
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn = cur.fetchone()[0]
log.info(f"start_backup_lsn = {lsn}")
lsn = query_scalar(cur, 'SELECT pg_current_wal_insert_lsn()')
log.info(f"start_backup_lsn = {lsn}")
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.

View File

@@ -3,6 +3,7 @@ import random
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres
from fixtures.log_helper import log
from fixtures.utils import query_scalar
# Test configuration
#
@@ -59,22 +60,21 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
pg = env.postgres.create_start('test_gc_aggressive')
log.info('postgres is running on test_gc_aggressive branch')
conn = pg.connect()
cur = conn.cursor()
with pg.cursor() as cur:
timeline = query_scalar(cur, "SHOW neon.timeline_id")
cur.execute("SHOW neon.timeline_id")
timeline = cur.fetchone()[0]
# Create table, and insert the first 100 rows
cur.execute('CREATE TABLE foo (id int, counter int, t text)')
cur.execute(f'''
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, {num_rows}) g
''')
cur.execute('CREATE INDEX ON foo(id)')
# Create table, and insert the first 100 rows
cur.execute('CREATE TABLE foo (id int, counter int, t text)')
cur.execute(f'''
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, {num_rows}) g
''')
cur.execute('CREATE INDEX ON foo(id)')
asyncio.run(update_and_gc(env, pg, timeline))
asyncio.run(update_and_gc(env, pg, timeline))
cur.execute('SELECT COUNT(*), SUM(counter) FROM foo')
assert cur.fetchone() == (num_rows, updates_to_perform)
cur.execute('SELECT COUNT(*), SUM(counter) FROM foo')
r = cur.fetchone()
assert r is not None
assert r == (num_rows, updates_to_perform)

View File

@@ -8,6 +8,8 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres
from fixtures.log_helper import log
import time
from fixtures.utils import query_scalar
#
# Test pageserver get_lsn_by_timestamp API
@@ -20,11 +22,8 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
pgmain = env.postgres.create_start("test_lsn_mapping")
log.info("postgres is running on 'test_lsn_mapping' branch")
ps_conn = env.pageserver.connect()
ps_cur = ps_conn.cursor()
conn = pgmain.connect()
cur = conn.cursor()
ps_cur = env.pageserver.connect().cursor()
cur = pgmain.connect().cursor()
# Create table, and insert rows, each in a separate transaction
# Disable synchronous_commit to make this initialization go faster.
#
@@ -35,9 +34,8 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
tbl = []
for i in range(1000):
cur.execute(f"INSERT INTO foo VALUES({i})")
cur.execute(f'SELECT clock_timestamp()')
# Get the timestamp at UTC
after_timestamp = cur.fetchone()[0].replace(tzinfo=None)
after_timestamp = query_scalar(cur, 'SELECT clock_timestamp()').replace(tzinfo=None)
tbl.append([i, after_timestamp])
# Execute one more transaction with synchronous_commit enabled, to flush
@@ -47,18 +45,18 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Check edge cases: timestamp in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
ps_cur.execute(
result = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'"
)
result = ps_cur.fetchone()[0]
assert result == 'future'
# timestamp too the far history
probe_timestamp = tbl[0][1] - timedelta(hours=10)
ps_cur.execute(
result = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'"
)
result = ps_cur.fetchone()[0]
assert result == 'past'
# Probe a bunch of timestamps in the valid range
@@ -66,19 +64,16 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
probe_timestamp = tbl[i][1]
# Call get_lsn_by_timestamp to get the LSN
ps_cur.execute(
lsn = query_scalar(
ps_cur,
f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'"
)
lsn = ps_cur.fetchone()[0]
# Launch a new read-only node at that LSN, and check that only the rows
# that were supposed to be committed at that point in time are visible.
pg_here = env.postgres.create_start(branch_name='test_lsn_mapping',
node_name='test_lsn_mapping_read',
lsn=lsn)
with closing(pg_here.connect()) as conn_here:
with conn_here.cursor() as cur_here:
cur_here.execute("SELECT max(x) FROM foo")
assert cur_here.fetchone()[0] == i
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
pg_here.stop_and_destroy()

View File

@@ -1,5 +1,6 @@
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.log_helper import log
from fixtures.utils import query_scalar
#
@@ -14,16 +15,14 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
pg = env.postgres.create_start('test_multixact')
log.info("postgres is running on 'test_multixact' branch")
pg_conn = pg.connect()
cur = pg_conn.cursor()
cur = pg.connect().cursor()
cur.execute('''
CREATE TABLE t1(i int primary key);
INSERT INTO t1 select * from generate_series(1, 100);
''')
cur.execute('SELECT next_multixact_id FROM pg_control_checkpoint()')
next_multixact_id_old = cur.fetchone()[0]
next_multixact_id_old = query_scalar(cur,
'SELECT next_multixact_id FROM pg_control_checkpoint()')
# Lock entries using parallel connections in a round-robin fashion.
nclients = 20
@@ -53,6 +52,7 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
cur.execute(
'SELECT next_multixact_id, pg_current_wal_insert_lsn() FROM pg_control_checkpoint()')
res = cur.fetchone()
assert res is not None
next_multixact_id = res[0]
lsn = res[1]
@@ -64,11 +64,8 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
pg_new = env.postgres.create_start('test_multixact_new')
log.info("postgres is running on 'test_multixact_new' branch")
pg_new_conn = pg_new.connect()
cur_new = pg_new_conn.cursor()
cur_new.execute('SELECT next_multixact_id FROM pg_control_checkpoint()')
next_multixact_id_new = cur_new.fetchone()[0]
next_multixact_id_new = pg_new.safe_psql(
'SELECT next_multixact_id FROM pg_control_checkpoint()')[0][0]
# Check that we restored pg_controlfile correctly
assert next_multixact_id_new == next_multixact_id

View File

@@ -1,6 +1,6 @@
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.log_helper import log
from fixtures.utils import print_gc_result
from fixtures.utils import print_gc_result, query_scalar
import psycopg2.extras
@@ -26,8 +26,7 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
cur = pg_conn.cursor()
# Get the timeline ID of our branch. We need it for the 'do_gc' command
cur.execute("SHOW neon.timeline_id")
timeline = cur.fetchone()[0]
timeline = query_scalar(cur, "SHOW neon.timeline_id")
psconn = env.pageserver.connect()
pscur = psconn.cursor(cursor_factory=psycopg2.extras.DictCursor)
@@ -48,6 +47,7 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
from pg_settings where name = 'shared_buffers'
''')
row = cur.fetchone()
assert row is not None
log.info(f'shared_buffers is {row[0]}, table size {row[1]}')
assert int(row[0]) < int(row[1])

View File

@@ -47,7 +47,8 @@ def check_client(client: NeonPageserverHttpClient, initial_tenant: UUID):
for timeline in timelines:
timeline_id_str = str(timeline['timeline_id'])
timeline_details = client.timeline_detail(tenant_id=tenant_id,
timeline_id=UUID(timeline_id_str))
timeline_id=UUID(timeline_id_str),
include_non_incremental_logical_size=True)
assert timeline_details['tenant_id'] == tenant_id.hex
assert timeline_details['timeline_id'] == timeline_id_str
@@ -63,13 +64,19 @@ def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv):
tenant_id, timeline_id = env.neon_cli.create_tenant()
empty_response = client.wal_receiver_get(tenant_id, timeline_id)
timeline_details = client.timeline_detail(tenant_id=tenant_id,
timeline_id=timeline_id,
include_non_incremental_logical_size=True)
assert empty_response.get('wal_producer_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert empty_response.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert empty_response.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert timeline_details.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert timeline_details.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert timeline_details.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
# Test the WAL-receiver related fields in the response to `timeline_details` API call
#
# These fields used to be returned by a separate API call, but they're part of
# `timeline_details` now.
def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
env = neon_simple_env
client = env.pageserver.http_client()
@@ -78,18 +85,17 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
pg = env.postgres.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)
def expect_updated_msg_lsn(prev_msg_lsn: Optional[int]) -> int:
res = client.wal_receiver_get(tenant_id, timeline_id)
timeline_details = client.timeline_detail(tenant_id, timeline_id=timeline_id)
# a successful `wal_receiver_get` response must contain the below fields
assert list(res.keys()) == [
"wal_producer_connstr",
"last_received_msg_lsn",
"last_received_msg_ts",
]
# a successful `timeline_details` response must contain the below fields
local_timeline_details = timeline_details['local']
assert "wal_source_connstr" in local_timeline_details.keys()
assert "last_received_msg_lsn" in local_timeline_details.keys()
assert "last_received_msg_ts" in local_timeline_details.keys()
assert res["last_received_msg_lsn"] is not None, "the last received message's LSN is empty"
assert local_timeline_details["last_received_msg_lsn"] is not None, "the last received message's LSN is empty"
last_msg_lsn = lsn_from_hex(res["last_received_msg_lsn"])
last_msg_lsn = lsn_from_hex(local_timeline_details["last_received_msg_lsn"])
assert prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn, \
f"the last received message's LSN {last_msg_lsn} hasn't been updated \
compared to the previous message's LSN {prev_msg_lsn}"
@@ -98,7 +104,7 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
# Wait to make sure that we get a latest WAL receiver data.
# We need to wait here because it's possible that we don't have access to
# the latest WAL during the time the `wal_receiver_get` API is called.
# the latest WAL yet, when the `timeline_detail` API is first called.
# See: https://github.com/neondatabase/neon/issues/1768.
lsn = wait_until(number_of_iterations=5, interval=1, func=lambda: expect_updated_msg_lsn(None))

View File

@@ -30,6 +30,7 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
from pg_settings where name = 'shared_buffers'
''')
row = cur.fetchone()
assert row is not None
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])

View File

@@ -1,10 +1,8 @@
import subprocess
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.utils import print_gc_result
from fixtures.utils import print_gc_result, query_scalar
from fixtures.neon_fixtures import NeonEnvBuilder
@@ -24,9 +22,7 @@ def test_pitr_gc(neon_env_builder: NeonEnvBuilder):
main_pg_conn = pgmain.connect()
main_cur = main_pg_conn.cursor()
main_cur.execute("SHOW neon.timeline_id")
timeline = main_cur.fetchone()[0]
timeline = query_scalar(main_cur, "SHOW neon.timeline_id")
# Create table
main_cur.execute('CREATE TABLE foo (t text)')
@@ -41,12 +37,15 @@ def test_pitr_gc(neon_env_builder: NeonEnvBuilder):
# keep some early lsn to test branch creation after GC
main_cur.execute('SELECT pg_current_wal_insert_lsn(), txid_current()')
res = main_cur.fetchone()
assert res is not None
lsn_a = res[0]
xid_a = res[1]
log.info(f'LSN after 100 rows: {lsn_a} xid {xid_a}')
main_cur.execute('SELECT pg_current_wal_insert_lsn(), txid_current()')
res = main_cur.fetchone()
assert res is not None
debug_lsn = res[0]
debug_xid = res[1]
log.info(f'LSN after 10000 rows: {debug_lsn} xid {debug_xid}')

View File

@@ -6,6 +6,8 @@ from fixtures.log_helper import log
from psycopg2.errors import UndefinedTable
from psycopg2.errors import IoError
from fixtures.utils import query_scalar
pytest_plugins = ("fixtures.neon_fixtures")
extensions = ["pageinspect", "neon_test_utils", "pg_buffercache"]
@@ -32,9 +34,9 @@ def test_read_validation(neon_simple_env: NeonEnv):
c.execute("select lsn, lower, upper from page_header(get_raw_page('foo', 'main', 0));")
first = c.fetchone()
assert first is not None
c.execute("select relfilenode from pg_class where relname = 'foo'")
relfilenode = c.fetchone()[0]
relfilenode = query_scalar(c, "select relfilenode from pg_class where relname = 'foo'")
c.execute("insert into foo values (2);")
c.execute("select lsn, lower, upper from page_header(get_raw_page('foo', 'main', 0));")
@@ -44,22 +46,25 @@ def test_read_validation(neon_simple_env: NeonEnv):
log.info("Test table is populated, validating buffer cache")
c.execute(
cache_entries = query_scalar(
c,
"select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode))
assert c.fetchone()[0] > 0, "No buffers cached for the test relation"
assert cache_entries > 0, "No buffers cached for the test relation"
c.execute(
"select reltablespace, reldatabase, relfilenode from pg_buffercache where relfilenode = {}"
.format(relfilenode))
reln = c.fetchone()
assert reln is not None
log.info("Clear buffer cache to ensure no stale pages are brought into the cache")
c.execute("select clear_buffer_cache()")
c.execute(
cache_entries = query_scalar(
c,
"select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode))
assert c.fetchone()[0] == 0, "Failed to clear buffer cache"
assert cache_entries == 0, "Failed to clear buffer cache"
log.info("Cache is clear, reading stale page version")
@@ -69,9 +74,10 @@ def test_read_validation(neon_simple_env: NeonEnv):
direct_first = c.fetchone()
assert first == direct_first, "Failed fetch page at historic lsn"
c.execute(
cache_entries = query_scalar(
c,
"select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode))
assert c.fetchone()[0] == 0, "relation buffers detected after invalidation"
assert cache_entries == 0, "relation buffers detected after invalidation"
log.info("Cache is clear, reading latest page version without cache")
@@ -81,9 +87,10 @@ def test_read_validation(neon_simple_env: NeonEnv):
direct_latest = c.fetchone()
assert second == direct_latest, "Failed fetch page at latest lsn"
c.execute(
cache_entries = query_scalar(
c,
"select count(*) from pg_buffercache where relfilenode = {}".format(relfilenode))
assert c.fetchone()[0] == 0, "relation buffers detected after invalidation"
assert cache_entries == 0, "relation buffers detected after invalidation"
log.info(
"Cache is clear, reading stale page version without cache using relation identifiers"

View File

@@ -1,6 +1,7 @@
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import query_scalar
#
@@ -27,7 +28,7 @@ def test_readonly_node(neon_simple_env: NeonEnv):
FROM generate_series(1, 100) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_a = main_cur.fetchone()[0]
lsn_a = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info('LSN after 100 rows: ' + lsn_a)
# Insert some more rows. (This generates enough WAL to fill a few segments.)
@@ -36,8 +37,7 @@ def test_readonly_node(neon_simple_env: NeonEnv):
SELECT 'long string to consume some space' || g
FROM generate_series(1, 200000) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_b = main_cur.fetchone()[0]
lsn_b = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info('LSN after 200100 rows: ' + lsn_b)
# Insert many more rows. This generates enough WAL to fill a few segments.
@@ -47,8 +47,7 @@ def test_readonly_node(neon_simple_env: NeonEnv):
FROM generate_series(1, 200000) g
''')
main_cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn_c = main_cur.fetchone()[0]
lsn_c = query_scalar(main_cur, 'SELECT pg_current_wal_insert_lsn()')
log.info('LSN after 400100 rows: ' + lsn_c)
# Create first read-only node at the point where only 100 rows were inserted

View File

@@ -8,7 +8,7 @@ import time
from uuid import UUID
from fixtures.neon_fixtures import NeonEnvBuilder, assert_timeline_local, wait_until, wait_for_last_record_lsn, wait_for_upload
from fixtures.log_helper import log
from fixtures.utils import lsn_from_hex, lsn_to_hex
from fixtures.utils import lsn_from_hex, query_scalar
import pytest
@@ -57,14 +57,12 @@ def test_remote_storage_backup_and_restore(neon_env_builder: NeonEnvBuilder, sto
checkpoint_numbers = range(1, 3)
for checkpoint_number in checkpoint_numbers:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute(f'''
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}');
''')
cur.execute("SELECT pg_current_wal_flush_lsn()")
current_lsn = lsn_from_hex(cur.fetchone()[0])
with pg.cursor() as cur:
cur.execute(f'''
CREATE TABLE t{checkpoint_number}(id int primary key, secret text);
INSERT INTO t{checkpoint_number} VALUES ({data_id}, '{data_secret}|{checkpoint_number}');
''')
current_lsn = lsn_from_hex(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# wait until pageserver receives that data
wait_for_last_record_lsn(client, UUID(tenant_id), UUID(timeline_id), current_lsn)
@@ -123,8 +121,8 @@ def test_remote_storage_backup_and_restore(neon_env_builder: NeonEnvBuilder, sto
assert not detail['remote']['awaits_download']
pg = env.postgres.create_start('main')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
for checkpoint_number in checkpoint_numbers:
cur.execute(f'SELECT secret FROM t{checkpoint_number} WHERE id = {data_id};')
assert cur.fetchone() == (f'{data_secret}|{checkpoint_number}', )
with pg.cursor() as cur:
for checkpoint_number in checkpoint_numbers:
assert query_scalar(cur,
f'SELECT secret FROM t{checkpoint_number} WHERE id = {data_id};'
) == f'{data_secret}|{checkpoint_number}'

View File

@@ -8,6 +8,7 @@
import asyncio
from contextlib import closing
from typing import List, Tuple
from uuid import UUID
import pytest
@@ -59,7 +60,7 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, storage_type: str):
env = neon_env_builder.init_start()
tenants_pgs = []
tenants_pgs: List[Tuple[UUID, Postgres]] = []
for i in range(1, 5):
# Use a tiny checkpoint distance, to create a lot of layers quickly
@@ -80,14 +81,11 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, storage_type: str):
# Wait for the remote storage uploads to finish
pageserver_http = env.pageserver.http_client()
for tenant, pg in tenants_pgs:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("show neon.tenant_id")
tenant_id = cur.fetchone()[0]
cur.execute("show neon.timeline_id")
timeline_id = cur.fetchone()[0]
cur.execute("SELECT pg_current_wal_flush_lsn()")
current_lsn = lsn_from_hex(cur.fetchone()[0])
res = pg.safe_psql_many(
["SHOW neon.tenant_id", "SHOW neon.timeline_id", "SELECT pg_current_wal_flush_lsn()"])
tenant_id = res[0][0][0]
timeline_id = res[1][0][0]
current_lsn = lsn_from_hex(res[2][0][0])
# wait until pageserver receives all the data
wait_for_last_record_lsn(pageserver_http, UUID(tenant_id), UUID(timeline_id), current_lsn)

View File

@@ -1,5 +1,5 @@
from contextlib import closing
import pathlib
import random
from uuid import UUID
import re
import psycopg2.extras
@@ -102,17 +102,14 @@ def wait_for_pageserver_catchup(pgmain: Postgres, polling_interval=1, timeout=60
raise RuntimeError(
f"timed out waiting for pageserver to reach pg_current_wal_flush_lsn()")
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
cur.execute('''
select pg_size_pretty(pg_cluster_size()),
pg_wal_lsn_diff(pg_current_wal_flush_lsn(),received_lsn) as received_lsn_lag
FROM backpressure_lsns();
''')
res = cur.fetchone()
log.info(f"pg_cluster_size = {res[0]}, received_lsn_lag = {res[1]}")
received_lsn_lag = res[1]
res = pgmain.safe_psql('''
SELECT
pg_size_pretty(pg_cluster_size()),
pg_wal_lsn_diff(pg_current_wal_flush_lsn(), received_lsn) as received_lsn_lag
FROM backpressure_lsns();
''')[0]
log.info(f"pg_cluster_size = {res[0]}, received_lsn_lag = {res[1]}")
received_lsn_lag = res[1]
time.sleep(polling_interval)
@@ -298,6 +295,40 @@ def test_timeline_physical_size_metric(neon_simple_env: NeonEnv):
assert tl_physical_size_metric == get_timeline_dir_size(timeline_path)
def test_tenant_physical_size(neon_simple_env: NeonEnv):
random.seed(100)
env = neon_simple_env
client = env.pageserver.http_client()
tenant, timeline = env.neon_cli.create_tenant()
def get_timeline_physical_size(timeline: UUID):
res = client.timeline_detail(tenant, timeline)
return res['local']['current_physical_size_non_incremental']
timeline_total_size = get_timeline_physical_size(timeline)
for i in range(10):
n_rows = random.randint(100, 1000)
timeline = env.neon_cli.create_branch(f"test_tenant_physical_size_{i}", tenant_id=tenant)
pg = env.postgres.create_start(f"test_tenant_physical_size_{i}", tenant_id=tenant)
pg.safe_psql_many([
"CREATE TABLE foo (t text)",
f"INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, {n_rows}) g",
])
env.pageserver.safe_psql(f"checkpoint {tenant.hex} {timeline.hex}")
timeline_total_size += get_timeline_physical_size(timeline)
pg.stop()
tenant_physical_size = int(client.tenant_status(tenant_id=tenant)['current_physical_size'])
assert tenant_physical_size == timeline_total_size
def assert_physical_size(env: NeonEnv, tenant_id: UUID, timeline_id: UUID):
"""Check the current physical size returned from timeline API
matches the total physical size of the timeline on disk"""

View File

@@ -14,13 +14,43 @@ from contextlib import closing
from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.neon_fixtures import PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, neon_binpath, PgProtocol
from fixtures.utils import get_dir_size, lsn_to_hex, lsn_from_hex
from fixtures.neon_fixtures import NeonPageserver, PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, neon_binpath, PgProtocol, wait_for_last_record_lsn, wait_for_upload
from fixtures.utils import get_dir_size, lsn_to_hex, lsn_from_hex, query_scalar
from fixtures.log_helper import log
from typing import List, Optional, Any
from uuid import uuid4
def wait_lsn_force_checkpoint(tenant_id: str,
timeline_id: str,
pg: Postgres,
ps: NeonPageserver,
pageserver_conn_options={}):
lsn = lsn_from_hex(pg.safe_psql('SELECT pg_current_wal_flush_lsn()')[0][0])
log.info(f"pg_current_wal_flush_lsn is {lsn_to_hex(lsn)}, waiting for it on pageserver")
auth_token = None
if 'password' in pageserver_conn_options:
auth_token = pageserver_conn_options['password']
# wait for the pageserver to catch up
wait_for_last_record_lsn(ps.http_client(auth_token=auth_token),
uuid.UUID(hex=tenant_id),
uuid.UUID(hex=timeline_id),
lsn)
# force checkpoint to advance remote_consistent_lsn
with closing(ps.connect(**pageserver_conn_options)) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
# ensure that remote_consistent_lsn is advanced
wait_for_upload(ps.http_client(auth_token=auth_token),
uuid.UUID(hex=tenant_id),
uuid.UUID(hex=timeline_id),
lsn)
@dataclass
class TimelineMetrics:
timeline_id: str
@@ -199,60 +229,7 @@ def test_restarts(neon_env_builder: NeonEnvBuilder):
else:
failed_node.start()
failed_node = None
cur.execute('SELECT sum(key) FROM t')
assert cur.fetchone() == (500500, )
# shut down random subset of acceptors, sleep, wake them up, rinse, repeat
def xmas_garland(acceptors, stop):
while not bool(stop.value):
victims = []
for wa in acceptors:
if random.random() >= 0.5:
victims.append(wa)
for v in victims:
v.stop()
time.sleep(1)
for v in victims:
v.start()
time.sleep(1)
# value which gets unset on exit
@pytest.fixture
def stop_value():
stop = Value('i', 0)
yield stop
stop.value = 1
# do inserts while concurrently getting up/down subsets of acceptors
def test_race_conditions(neon_env_builder: NeonEnvBuilder, stop_value):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_safekeepers_race_conditions')
pg = env.postgres.create_start('test_safekeepers_race_conditions')
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
pg_conn = pg.connect()
cur = pg_conn.cursor()
cur.execute('CREATE TABLE t(key int primary key, value text)')
proc = Process(target=xmas_garland, args=(env.safekeepers, stop_value))
proc.start()
for i in range(1000):
cur.execute("INSERT INTO t values (%s, 'payload');", (i + 1, ))
cur.execute('SELECT sum(key) FROM t')
assert cur.fetchone() == (500500, )
stop_value.value = 1
proc.join()
assert query_scalar(cur, 'SELECT sum(key) FROM t') == 500500
# Test that safekeepers push their info to the broker and learn peer status from it
@@ -275,10 +252,10 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
log.info(f"statuses is {stat_before}")
pg.safe_psql("INSERT INTO t SELECT generate_series(1,100), 'payload'")
# force checkpoint to advance remote_consistent_lsn
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
# force checkpoint in pageserver to advance remote_consistent_lsn
wait_lsn_force_checkpoint(tenant_id, timeline_id, pg, env.pageserver)
# and wait till remote_consistent_lsn propagates to all safekeepers
started_at = time.time()
while True:
@@ -308,12 +285,10 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
env.neon_cli.create_branch('test_safekeepers_wal_removal')
pg = env.postgres.create_start('test_safekeepers_wal_removal')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
cur.execute('CREATE TABLE t(key int primary key, value text)')
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
pg.safe_psql_many([
'CREATE TABLE t(key int primary key, value text)',
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
])
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
@@ -322,9 +297,7 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
pageserver_conn_options = {}
if auth_enabled:
pageserver_conn_options['password'] = env.auth_keys.generate_tenant_token(tenant_id)
with closing(env.pageserver.connect(**pageserver_conn_options)) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
wait_lsn_force_checkpoint(tenant_id, timeline_id, pg, env.pageserver, pageserver_conn_options)
# We will wait for first segment removal. Make sure they exist for starter.
first_segments = [
@@ -493,8 +466,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, storage_type: str):
cur.execute("insert into t select generate_series(1,500000), 'payload'")
expected_sum += 500000 * 500001 // 2
cur.execute("select sum(key) from t")
assert cur.fetchone()[0] == expected_sum
assert query_scalar(cur, "select sum(key) from t") == expected_sum
for sk in env.safekeepers:
wait_segment_offload(tenant_id, timeline_id, sk, seg_end)
@@ -508,8 +480,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, storage_type: str):
# require WAL to be trimmed, so no more than one segment is left on disk
wait_wal_trim(tenant_id, timeline_id, sk, 16 * 1.5)
cur.execute('SELECT pg_current_wal_flush_lsn()')
last_lsn = cur.fetchone()[0]
last_lsn = query_scalar(cur, 'SELECT pg_current_wal_flush_lsn()')
pageserver_lsn = env.pageserver.http_client().timeline_detail(
uuid.UUID(tenant_id), uuid.UUID((timeline_id)))["local"]["last_record_lsn"]
@@ -556,10 +527,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, storage_type: str):
# verify data
pg.create_start('test_s3_wal_replay')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("select sum(key) from t")
assert cur.fetchone()[0] == expected_sum
assert pg.safe_psql("select sum(key) from t")[0][0] == expected_sum
class ProposerPostgres(PgProtocol):
@@ -884,12 +852,10 @@ def test_replace_safekeeper(neon_env_builder: NeonEnvBuilder):
# as waiting for acceptors happens there
cur.execute('CREATE TABLE IF NOT EXISTS t(key int, value text)')
cur.execute("INSERT INTO t VALUES (0, 'something')")
cur.execute('SELECT SUM(key) FROM t')
sum_before = cur.fetchone()[0]
sum_before = query_scalar(cur, 'SELECT SUM(key) FROM t')
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute('SELECT SUM(key) FROM t')
sum_after = cur.fetchone()[0]
sum_after = query_scalar(cur, 'SELECT SUM(key) FROM t')
assert sum_after == sum_before + 5000050000
def show_statuses(safekeepers: List[Safekeeper], tenant_id: str, timeline_id: str):
@@ -974,8 +940,7 @@ def test_wal_deleted_after_broadcast(neon_env_builder: NeonEnvBuilder):
assert pg.pgdata_dir is not None
log.info('executing INSERT to generate WAL')
cur.execute("select pg_current_wal_lsn()")
current_lsn = lsn_from_hex(cur.fetchone()[0]) / 1024 / 1024
current_lsn = lsn_from_hex(query_scalar(cur, "select pg_current_wal_lsn()")) / 1024 / 1024
pg_wal_size = get_dir_size(os.path.join(pg.pgdata_dir, 'pg_wal')) / 1024 / 1024
if enable_logs:
log.info(f"LSN delta: {current_lsn - last_lsn} MB, current WAL size: {pg_wal_size} MB")

View File

@@ -9,6 +9,7 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres, Safekeeper
from fixtures.log_helper import getLogger
from fixtures.utils import lsn_from_hex, lsn_to_hex
from typing import List, Optional
from dataclasses import dataclass
log = getLogger('root.safekeeper_async')
@@ -455,3 +456,67 @@ def test_unavailability(neon_env_builder: NeonEnvBuilder):
pg = env.postgres.create_start('test_safekeepers_unavailability')
asyncio.run(run_unavailability(env, pg))
@dataclass
class RaceConditionTest:
iteration: int
is_stopped: bool
# shut down random subset of safekeeper, sleep, wake them up, rinse, repeat
async def xmas_garland(safekeepers: List[Safekeeper], data: RaceConditionTest):
while not data.is_stopped:
data.iteration += 1
victims = []
for sk in safekeepers:
if random.random() >= 0.5:
victims.append(sk)
log.info(
f'Iteration {data.iteration}: stopping {list(map(lambda sk: sk.id, victims))} safekeepers'
)
for v in victims:
v.stop()
await asyncio.sleep(1)
for v in victims:
v.start()
log.info(f'Iteration {data.iteration} finished')
await asyncio.sleep(1)
async def run_race_conditions(env: NeonEnv, pg: Postgres):
conn = await pg.connect_async()
await conn.execute('CREATE TABLE t(key int primary key, value text)')
data = RaceConditionTest(0, False)
bg_xmas = asyncio.create_task(xmas_garland(env.safekeepers, data))
n_iterations = 5
expected_sum = 0
i = 1
while data.iteration <= n_iterations:
await asyncio.sleep(0.005)
await conn.execute(f"INSERT INTO t values ({i}, 'payload')")
expected_sum += i
i += 1
log.info(f'Executed {i-1} queries')
res = await conn.fetchval('SELECT sum(key) FROM t')
assert res == expected_sum
data.is_stopped = True
await bg_xmas
# do inserts while concurrently getting up/down subsets of acceptors
def test_race_conditions(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_safekeepers_race_conditions')
pg = env.postgres.create_start('test_safekeepers_race_conditions')
asyncio.run(run_race_conditions(env, pg))

View File

@@ -50,7 +50,6 @@ def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: pathlib.Path, pg_
# checkpoint one more time to ensure that the lsn we get is the latest one
pg.safe_psql('CHECKPOINT')
pg.safe_psql('select pg_current_wal_insert_lsn()')[0][0]
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, pg)

View File

@@ -70,6 +70,7 @@ class PgCompare(ABC):
for pg_stat in pg_stats:
cur.execute(pg_stat.query)
row = cur.fetchone()
assert row is not None
assert len(row) == len(pg_stat.columns)
for col, val in zip(pg_stat.columns, row):

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from dataclasses import field
from contextlib import contextmanager
from enum import Flag, auto
import textwrap
from cached_property import cached_property
@@ -306,6 +307,15 @@ class PgProtocol:
conn.autocommit = autocommit
return conn
@contextmanager
def cursor(self, autocommit=True, **kwargs):
"""
Shorthand for pg.connect().cursor().
The cursor and connection are closed when the context is exited.
"""
with closing(self.connect(autocommit=autocommit, **kwargs)) as conn:
yield conn.cursor()
async def connect_async(self, **kwargs) -> asyncpg.Connection:
"""
Connect to the node from async python.
@@ -354,7 +364,7 @@ class PgProtocol:
if cur.description is None:
result.append([]) # query didn't return data
else:
result.append(cast(List[Any], cur.fetchall()))
result.append(cur.fetchall())
return result
@@ -865,10 +875,24 @@ class NeonPageserverHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
def timeline_detail(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
def timeline_detail(self,
tenant_id: uuid.UUID,
timeline_id: uuid.UUID,
include_non_incremental_logical_size: bool = False,
include_non_incremental_physical_size: bool = False) -> Dict[Any, Any]:
include_non_incremental_logical_size_str = "0"
if include_non_incremental_logical_size:
include_non_incremental_logical_size_str = "1"
include_non_incremental_physical_size_str = "0"
if include_non_incremental_physical_size:
include_non_incremental_physical_size_str = "1"
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}" +
"?include-non-incremental-logical-size=1&include-non-incremental-physical-size=1")
"?include-non-incremental-logical-size={include_non_incremental_logical_size_str}" +
"&include-non-incremental-physical-size={include_non_incremental_physical_size_str}")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
@@ -882,15 +906,6 @@ class NeonPageserverHttpClient(requests.Session):
assert res_json is None
return res_json
def wal_receiver_get(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}/wal_receiver"
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def get_metrics(self) -> str:
res = self.get(f"http://localhost:{self.port}/metrics")
self.verbose_error(res)
@@ -2137,12 +2152,8 @@ def list_files_to_compare(pgdata_dir: pathlib.Path):
# pg is the existing and running compute node, that we want to compare with a basebackup
def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Postgres):
# Get the timeline ID. We need it for the 'basebackup' command
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SHOW neon.timeline_id")
timeline = cur.fetchone()[0]
timeline = pg.safe_psql("SHOW neon.timeline_id")[0][0]
# stop postgres to ensure that files won't change
pg.stop()

View File

@@ -6,6 +6,8 @@ import subprocess
from pathlib import Path
from typing import Any, List, Tuple
from psycopg2.extensions import cursor
from fixtures.log_helper import log
@@ -79,6 +81,20 @@ def etcd_path() -> Path:
return Path(path_output)
def query_scalar(cur: cursor, query: str) -> Any:
"""
It is a convenience wrapper to avoid repetitions
of cur.execute(); cur.fetchone()[0]
And this is mypy friendly, because without None
check mypy says that Optional is not indexable.
"""
cur.execute(query)
var = cur.fetchone()
assert var is not None
return var[0]
# Traverse directory to get total size.
def get_dir_size(path: str) -> int:
"""Return size in bytes."""

View File

@@ -9,6 +9,8 @@ import psycopg2.extras
import random
import time
from fixtures.utils import query_scalar
# This is a clear-box test that demonstrates the worst case scenario for the
# "1 segment per layer" implementation of the pageserver. It writes to random
@@ -59,9 +61,7 @@ def test_random_writes(neon_with_baseline: PgCompare):
rows_inserted += rows_to_insert
# Get table size (can't be predicted because padding and alignment)
cur.execute("SELECT pg_relation_size('Big');")
row = cur.fetchone()
table_size = row[0]
table_size = query_scalar(cur, "SELECT pg_relation_size('Big')")
env.zenbenchmark.record("table_size", table_size, 'bytes', MetricReport.TEST_PARAM)
# Decide how much to write, based on knowledge of pageserver implementation.

View File

@@ -34,6 +34,7 @@ def test_seqscans(neon_with_baseline: PgCompare, rows: int, iters: int, workers:
from pg_settings where name = 'shared_buffers'
''')
row = cur.fetchone()
assert row is not None
shared_buffers = row[0]
table_size = row[1]
log.info(f"shared_buffers is {shared_buffers}, table size {table_size}")