Compare commits

..

1 Commits

Author SHA1 Message Date
Thang Pham
2a15415442 use parking_lot::RwLock in for page caches 2022-06-29 12:10:52 -04:00
26 changed files with 235 additions and 384 deletions

View File

@@ -4,11 +4,7 @@ localhost_warning = False
host_key_checking = False
timeout = 30
allow_world_readable_tmpfiles=true
[ssh_connection]
ssh_args = -F ./ansible.ssh.cfg
# teleport doesn't support sftp yet https://github.com/gravitational/teleport/issues/7127
# and scp neither worked for me
transfer_method = piped
scp_if_ssh = True
pipelining = True

View File

@@ -1,6 +1,3 @@
# Remove this once https://github.com/gravitational/teleport/issues/10918 is fixed
PubkeyAcceptedAlgorithms +ssh-rsa-cert-v01@openssh.com
Host tele.zenith.tech
User admin
Port 3023

View File

@@ -1,34 +0,0 @@
- name: cleanup timelines
hosts: safekeepers
gather_facts: False
remote_user: admin
tasks:
- name: install pip
ansible.builtin.apt:
name:
- python3-pip
- acl
become: True
- name: install psycopg2
ansible.builtin.pip:
name: psycopg2
become_user: safekeeper
become: True
- name: upload script
ansible.builtin.copy:
src: /home/ars/zenith/cloud/ops/ad_hoc/clean_timelines.py
dest: clean_timelines.py
tags:
- safekeeper
- name: run script
ansible.builtin.command: python3 clean_timelines.py --really-delete
environment:
CONSOLE_DB: "{{ CONSOLE_DB }}"
become_user: safekeeper
become: True
register: hello
- debug: var=hello

View File

@@ -1,46 +0,0 @@
import os
import shutil
import sys
import time
import json
from pathlib import Path
# Dry run is the default, i.e. not do anything
act = len(sys.argv) == 2 and sys.argv[1] == '--act'
print('act=', act)
move_to_deleted = True
# result of GET 'https://console.neon.tech/api/v1/admin/projects?order=asc&sort=id&show_deleted=true'
with open('projects.json') as f:
projects = json.load(f)
projects = projects['data']
deleted_projects = [p for p in projects if p["deleted"]]
active_projects = [p for p in projects if not p["deleted"]]
print(f"deleted {len(deleted_projects)}, active {len(active_projects)}, total {len(projects)}")
# print(deleted_projects)
for project_to_delete in deleted_projects:
tenant_id = project_to_delete['tenant']
tenant_dir = Path(f"/storage/safekeeper/data/{tenant_id}")
timeline_dir = tenant_dir / project_to_delete['timeline_id']
if not os.path.exists(timeline_dir):
continue
if move_to_deleted:
# move to deleted/
tenant_dir_deleted = (Path(f"/storage/safekeeper/data/deleted/") / tenant_id)
tenant_dir_deleted.mkdir(parents=True, exist_ok=True)
print(f"moving {timeline_dir} to {tenant_dir_deleted}")
if act:
shutil.move(timeline_dir, tenant_dir_deleted)
else:
print(f"removing {timeline_dir}")
if act:
shutil.rmtree(timeline_dir)
if len(os.listdir(tenant_dir)) == 0:
print(f"removing empty tenant directory {tenant_dir}")
shutil.rmtree(tenant_dir)

View File

@@ -1,36 +0,0 @@
- name: remove timelines listed in projects.json
hosts: safekeepers
gather_facts: False
remote_user: admin
tasks:
# - name: install acl
# ansible.builtin.apt:
# name:
# - python3-pip
# - acl
# become: True
- name: upload projects.json
ansible.builtin.copy:
src: /home/ars/zenith/projects.json
dest: projects.json
- name: upload script
ansible.builtin.copy:
src: clean_timelines_json.py
dest: clean_timelines_json.py
- name: run script
ansible.builtin.command: python3 clean_timelines_json.py --act
become_user: safekeeper
become: True
register: hello
- debug: var=hello
- name: remove projects.json
ansible.builtin.file:
path: projects.json
state: absent
tags:
- rm_file

View File

@@ -100,8 +100,10 @@ jobs:
name: Rust build << parameters.build_type >>
command: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
CARGO_FLAGS="--release --features profiling"
fi
@@ -110,7 +112,7 @@ jobs:
export RUSTC_WRAPPER=cachepot
export AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}"
export AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}"
mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
cachepot -s
- save_cache:
@@ -126,24 +128,32 @@ jobs:
name: cargo test
command: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
CARGO_FLAGS=--release
fi
cargo test $CARGO_FLAGS
"${cov_prefix[@]}" cargo test $CARGO_FLAGS
# Install the rust binaries, for use by test jobs
- run:
name: Install rust binaries
command: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
fi
binaries=$(
cargo metadata --format-version=1 --no-deps |
"${cov_prefix[@]}" cargo metadata --format-version=1 --no-deps |
jq -r '.packages[].targets[] | select(.kind | index("bin")) | .name'
)
test_exe_paths=$(
cargo test --message-format=json --no-run |
"${cov_prefix[@]}" cargo test --message-format=json --no-run |
jq -r '.executable | select(. != null)'
)
@@ -156,15 +166,34 @@ jobs:
SRC=target/$BUILD_TYPE/$bin
DST=/tmp/zenith/bin/$bin
cp $SRC $DST
echo $DST >> /tmp/zenith/etc/binaries.list
done
# Install test executables (for code coverage)
if [[ $BUILD_TYPE == "debug" ]]; then
for bin in $test_exe_paths; do
SRC=$bin
DST=/tmp/zenith/test_bin/$(basename $bin)
cp $SRC $DST
echo $DST >> /tmp/zenith/etc/binaries.list
done
fi
# Install the postgres binaries, for use by test jobs
- run:
name: Install postgres binaries
command: |
cp -a tmp_install /tmp/zenith/pg_install
# Save rust binaries for other jobs in the workflow
- run:
name: Merge coverage data
command: |
# This will speed up workspace uploads
if [[ $BUILD_TYPE == "debug" ]]; then
scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage merge
fi
# Save the rust binaries and coverage data for other jobs in this workflow.
- persist_to_workspace:
root: /tmp/zenith
paths:
@@ -285,6 +314,12 @@ jobs:
export GITHUB_SHA=$CIRCLE_SHA1
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
fi
# Run the tests.
#
# The junit.xml file allows CircleCI to display more fine-grained test information
@@ -295,7 +330,7 @@ jobs:
# -n4 uses four processes to run tests via pytest-xdist
# -s is not used to prevent pytest from capturing output, because tests are running
# in parallel and logs are mixed between different tests
./scripts/pytest \
"${cov_prefix[@]}" ./scripts/pytest \
--junitxml=$TEST_OUTPUT/junit.xml \
--tb=short \
--verbose \
@@ -324,12 +359,67 @@ jobs:
# The store_test_results step tells CircleCI where to find the junit.xml file.
- store_test_results:
path: /tmp/test_output
# Save data (if any)
- run:
name: Merge coverage data
command: |
# This will speed up workspace uploads
if [[ $BUILD_TYPE == "debug" ]]; then
scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage merge
fi
# Save coverage data (if any)
- persist_to_workspace:
root: /tmp/zenith
paths:
- "*"
coverage-report:
executor: neon-xlarge-executor
steps:
- attach_workspace:
at: /tmp/zenith
- checkout
- restore_cache:
name: Restore rust cache
keys:
# Require an exact match. While an out of date cache might speed up the build,
# there's no way to clean out old packages, so the cache grows every time something
# changes.
- v04-rust-cache-deps-debug-{{ checksum "Cargo.lock" }}
- run:
name: Build coverage report
command: |
COMMIT_URL=https://github.com/neondatabase/neon/commit/$CIRCLE_SHA1
scripts/coverage \
--dir=/tmp/zenith/coverage report \
--input-objects=/tmp/zenith/etc/binaries.list \
--commit-url=$COMMIT_URL \
--format=github
- run:
name: Upload coverage report
command: |
LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME
REPORT_URL=https://neondatabase.github.io/zenith-coverage-data/$CIRCLE_SHA1
COMMIT_URL=https://github.com/neondatabase/neon/commit/$CIRCLE_SHA1
scripts/git-upload \
--repo=https://$VIP_VAP_ACCESS_TOKEN@github.com/neondatabase/zenith-coverage-data.git \
--message="Add code coverage for $COMMIT_URL" \
copy /tmp/zenith/coverage/report $CIRCLE_SHA1 # COPY FROM TO_RELATIVE
# Add link to the coverage report to the commit
curl -f -X POST \
https://api.github.com/repos/$LOCAL_REPO/statuses/$CIRCLE_SHA1 \
-H "Accept: application/vnd.github.v3+json" \
--user "$CI_ACCESS_TOKEN" \
--data \
"{
\"state\": \"success\",
\"context\": \"zenith-coverage\",
\"description\": \"Coverage report is ready\",
\"target_url\": \"$REPORT_URL\"
}"
# Build neondatabase/neon:latest image and push it to Docker hub
docker-image:
docker:
@@ -640,6 +730,12 @@ workflows:
save_perf_report: true
requires:
- build-neon-release
- coverage-report:
# Context passes credentials for gh api
context: CI_ACCESS_TOKEN
requires:
# TODO: consider adding more
- other-tests-debug
- docker-image:
# Context gives an ability to login
context: Docker Hub

View File

@@ -92,7 +92,7 @@ runs:
fi
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
elif [[ "${{ inputs.build_type }}" == "release" ]]; then
cov_prefix=()
fi

View File

@@ -6,7 +6,7 @@ runs:
steps:
- name: Merge coverage data
shell: bash -ex {0}
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage/ merge
- name: Upload coverage data
uses: actions/upload-artifact@v3
@@ -14,4 +14,4 @@ runs:
retention-days: 7
if-no-files-found: error
name: coverage-data-artifact
path: /tmp/coverage/
path: /tmp/neon/coverage/

View File

@@ -17,6 +17,11 @@ concurrency:
env:
RUST_BACKTRACE: 1
COPT: '-Werror'
AWS_ACCESS_KEY_ID: ${{ secrets.CACHEPOT_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.CACHEPOT_AWS_SECRET_ACCESS_KEY }}
CACHEPOT_BUCKET: zenith-rust-cachepot
RUSTC_WRAPPER: cachepot
jobs:
build-postgres:
@@ -49,7 +54,7 @@ jobs:
- name: Build postgres
if: steps.cache_pg.outputs.cache-hit != 'true'
run: mold -run make postgres -j$(nproc)
run: COPT='-Werror' mold -run make postgres -j$(nproc)
# actions/cache@v3 does not allow concurrently using the same cache across job steps, so use a separate cache
- name: Prepare postgres artifact
@@ -101,15 +106,12 @@ jobs:
~/.cargo/registry/
~/.cargo/git/
target/
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
key: |
v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-
key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
- name: Run cargo build
run: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
@@ -117,11 +119,12 @@ jobs:
fi
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
cachepot -s
- name: Run cargo test
run: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
CARGO_FLAGS=
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
@@ -133,7 +136,7 @@ jobs:
- name: Install rust binaries
run: |
if [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=()
fi
@@ -151,9 +154,7 @@ jobs:
mkdir -p /tmp/neon/bin/
mkdir -p /tmp/neon/test_bin/
mkdir -p /tmp/neon/etc/
# Keep bloated coverage data files away from the rest of the artifact
mkdir -p /tmp/coverage/
mkdir -p /tmp/neon/coverage/
# Install target binaries
for bin in $binaries; do
@@ -165,13 +166,13 @@ jobs:
# Install test executables and write list of all binaries (for code coverage)
if [[ $BUILD_TYPE == "debug" ]]; then
for bin in $binaries; do
echo "/tmp/neon/bin/$bin" >> /tmp/coverage/binaries.list
echo "/tmp/neon/bin/$bin" >> /tmp/neon/coverage/binaries.list
done
for bin in $test_exe_paths; do
SRC=$bin
DST=/tmp/neon/test_bin/$(basename $bin)
cp "$SRC" "$DST"
echo "$DST" >> /tmp/coverage/binaries.list
echo "$DST" >> /tmp/neon/coverage/binaries.list
done
fi
@@ -315,10 +316,7 @@ jobs:
uses: actions/download-artifact@v3
with:
name: coverage-data-artifact
path: /tmp/coverage/
- name: Merge coverage data
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
path: /tmp/neon/coverage/
- name: Build and upload coverage report
run: |
@@ -327,8 +325,8 @@ jobs:
COMMIT_URL=https://github.com/${{ github.repository }}/commit/$COMMIT_SHA
scripts/coverage \
--dir=/tmp/coverage report \
--input-objects=/tmp/coverage/binaries.list \
--dir=/tmp/neon/coverage report \
--input-objects=/tmp/neon/coverage/binaries.list \
--commit-url=$COMMIT_URL \
--format=github
@@ -337,7 +335,7 @@ jobs:
scripts/git-upload \
--repo=https://${{ secrets.VIP_VAP_ACCESS_TOKEN }}@github.com/${{ github.repository_owner }}/zenith-coverage-data.git \
--message="Add code coverage for $COMMIT_URL" \
copy /tmp/coverage/report $COMMIT_SHA # COPY FROM TO_RELATIVE
copy /tmp/neon/coverage/report $COMMIT_SHA # COPY FROM TO_RELATIVE
# Add link to the coverage report to the commit
curl -f -X POST \

View File

@@ -26,7 +26,7 @@ jobs:
# Rust toolchains (e.g. nightly or 1.37.0), add them here.
rust_toolchain: [1.58]
os: [ubuntu-latest, macos-latest]
timeout-minutes: 50
timeout-minutes: 30
name: run regression test suite
runs-on: ${{ matrix.os }}

19
Cargo.lock generated
View File

@@ -461,7 +461,7 @@ dependencies = [
"tar",
"tokio",
"tokio-postgres",
"url",
"urlencoding",
"workspace_hack",
]
@@ -650,7 +650,7 @@ dependencies = [
"crossterm_winapi",
"libc",
"mio",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"signal-hook",
"signal-hook-mio",
"winapi",
@@ -1899,6 +1899,7 @@ dependencies = [
"metrics",
"nix",
"once_cell",
"parking_lot 0.12.1",
"postgres",
"postgres-protocol",
"postgres-types",
@@ -1939,9 +1940,9 @@ dependencies = [
[[package]]
name = "parking_lot"
version = "0.12.0"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core 0.9.2",
@@ -2308,7 +2309,7 @@ dependencies = [
"lazy_static",
"md5",
"metrics",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"pin-project-lite",
"rand",
"rcgen",
@@ -3356,7 +3357,7 @@ dependencies = [
"fallible-iterator",
"futures",
"log",
"parking_lot 0.12.0",
"parking_lot 0.12.1",
"percent-encoding",
"phf",
"pin-project-lite",
@@ -3685,6 +3686,12 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "urlencoding"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68b90931029ab9b034b300b797048cf23723400aa757e8a2bfb9d748102f9821"
[[package]]
name = "utils"
version = "0.1.0"

View File

@@ -18,5 +18,5 @@ serde_json = "1"
tar = "0.4"
tokio = { version = "1.17", features = ["macros", "rt", "rt-multi-thread"] }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
url = "2.2.2"
urlencoding = "2.1.0"
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -33,7 +33,7 @@ use std::process::exit;
use std::sync::{Arc, RwLock};
use std::{thread, time::Duration};
use anyhow::{Context, Result};
use anyhow::Result;
use chrono::Utc;
use clap::Arg;
use log::{error, info};
@@ -45,7 +45,6 @@ use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::pg_helpers::*;
use compute_tools::spec::*;
use url::Url;
fn main() -> Result<()> {
// TODO: re-use `utils::logging` later
@@ -132,7 +131,7 @@ fn main() -> Result<()> {
let compute_state = ComputeNode {
start_time: Utc::now(),
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
connstr: connstr.to_string(),
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
spec,

View File

@@ -1,3 +1,5 @@
use std::sync::Arc;
use anyhow::{anyhow, Result};
use log::error;
use postgres::Client;
@@ -21,8 +23,9 @@ pub fn create_writablity_check_data(client: &mut Client) -> Result<()> {
Ok(())
}
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
pub async fn check_writability(compute: &Arc<ComputeNode>) -> Result<()> {
let connstr = &compute.connstr;
let (client, connection) = tokio_postgres::connect(connstr, NoTls).await?;
if client.is_closed() {
return Err(anyhow!("connection to postgres closed"));
}

View File

@@ -35,8 +35,7 @@ use crate::spec::*;
/// Compute node info shared across several `compute_ctl` threads.
pub struct ComputeNode {
pub start_time: DateTime<Utc>,
// Url type maintains proper escaping
pub connstr: url::Url,
pub connstr: String,
pub pgdata: String,
pub pgbin: String,
pub spec: ComputeSpec,
@@ -269,25 +268,21 @@ impl ComputeNode {
// In this case we need to connect with old `zenith_admin`name
// and create new user. We cannot simply rename connected user,
// but we can create a new one and grant it all privileges.
let mut client = match Client::connect(self.connstr.as_str(), NoTls) {
let mut client = match Client::connect(&self.connstr, NoTls) {
Err(e) => {
info!(
"cannot connect to postgres: {}, retrying with `zenith_admin` username",
e
);
let mut zenith_admin_connstr = self.connstr.clone();
let zenith_admin_connstr = self.connstr.replacen("cloud_admin", "zenith_admin", 1);
zenith_admin_connstr
.set_username("zenith_admin")
.map_err(|_| anyhow::anyhow!("invalid connstr"))?;
let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?;
let mut client = Client::connect(&zenith_admin_connstr, NoTls)?;
client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
client.simple_query("GRANT zenith_admin TO cloud_admin")?;
drop(client);
// reconnect with connsting with expected name
Client::connect(self.connstr.as_str(), NoTls)?
Client::connect(&self.connstr, NoTls)?
}
Ok(client) => client,
};

View File

@@ -13,11 +13,11 @@ const MONITOR_CHECK_INTERVAL: u64 = 500; // milliseconds
// Spin in a loop and figure out the last activity time in the Postgres.
// Then update it in the shared state. This function never errors out.
// XXX: the only expected panic is at `RwLock` unwrap().
fn watch_compute_activity(compute: &ComputeNode) {
fn watch_compute_activity(compute: &Arc<ComputeNode>) {
// Suppose that `connstr` doesn't change
let connstr = compute.connstr.as_str();
let connstr = compute.connstr.clone();
// Define `client` outside of the loop to reuse existing connection if it's active.
let mut client = Client::connect(connstr, NoTls);
let mut client = Client::connect(&connstr, NoTls);
let timeout = time::Duration::from_millis(MONITOR_CHECK_INTERVAL);
info!("watching Postgres activity at {}", connstr);
@@ -32,7 +32,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
info!("connection to postgres closed, trying to reconnect");
// Connection is closed, reconnect and try again.
client = Client::connect(connstr, NoTls);
client = Client::connect(&connstr, NoTls);
continue;
}
@@ -93,7 +93,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
debug!("cannot connect to postgres: {}, retrying", e);
// Establish a new connection and try again.
client = Client::connect(connstr, NoTls);
client = Client::connect(&connstr, NoTls);
}
}
}

View File

@@ -4,6 +4,7 @@ use anyhow::Result;
use log::{info, log_enabled, warn, Level};
use postgres::{Client, NoTls};
use serde::Deserialize;
use urlencoding::encode;
use crate::compute::ComputeNode;
use crate::config;
@@ -230,11 +231,9 @@ pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<
fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()> {
for db in &node.spec.cluster.databases {
if db.owner != *role_name {
let mut connstr = node.connstr.clone();
// database name is always the last and the only component of the path
connstr.set_path(&db.name);
let mut client = Client::connect(connstr.as_str(), NoTls)?;
let db_name_encoded = format!("/{}", encode(&db.name));
let db_connstr = node.connstr.replacen("/postgres", &db_name_encoded, 1);
let mut client = Client::connect(&db_connstr, NoTls)?;
// This will reassign all dependent objects to the db owner
let reassign_query = format!(

View File

@@ -63,6 +63,8 @@ workspace_hack = { version = "0.1", path = "../workspace_hack" }
close_fds = "0.3.2"
walkdir = "2.3.2"
parking_lot = "0.12.1"
[dev-dependencies]
hex-literal = "0.3"
tempfile = "3.2"

View File

@@ -2210,9 +2210,6 @@ impl LayeredTimeline {
LsnForTimestamp::Past(lsn) => {
debug!("past({})", lsn);
}
LsnForTimestamp::NoData(lsn) => {
debug!("nodata({})", lsn);
}
}
debug!("pitr_cutoff_lsn = {:?}", pitr_cutoff_lsn)
}

View File

@@ -36,13 +36,12 @@
//! mapping is automatically removed and the slot is marked free.
//!
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
sync::{
atomic::{AtomicU8, AtomicUsize, Ordering},
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError,
},
sync::atomic::{AtomicU8, AtomicUsize, Ordering},
};
use once_cell::sync::OnceCell;
@@ -385,7 +384,7 @@ impl PageCache {
for slot_idx in 0..self.slots.len() {
let slot = &self.slots[slot_idx];
let mut inner = slot.inner.write().unwrap();
let mut inner = slot.inner.write();
if let Some(key) = &inner.key {
match key {
CacheKey::EphemeralPage { file_id, blkno: _ } if *file_id == drop_file_id => {
@@ -413,7 +412,7 @@ impl PageCache {
for slot_idx in 0..self.slots.len() {
let slot = &self.slots[slot_idx];
let mut inner = slot.inner.write().unwrap();
let mut inner = slot.inner.write();
if let Some(key) = &inner.key {
match key {
CacheKey::ImmutableFilePage { file_id, blkno: _ }
@@ -454,7 +453,7 @@ impl PageCache {
// that it's still what we expected (because we released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.read().unwrap();
let inner = slot.inner.read();
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageReadGuard(inner));
@@ -543,7 +542,7 @@ impl PageCache {
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().unwrap();
let inner = slot.inner.write();
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageWriteGuard { inner, valid: true });
@@ -611,7 +610,7 @@ impl PageCache {
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
match cache_key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
let map = self.materialized_page_map.read();
let versions = map.get(hash_key)?;
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
@@ -624,11 +623,11 @@ impl PageCache {
Some(version.slot_idx)
}
CacheKey::EphemeralPage { file_id, blkno } => {
let map = self.ephemeral_page_map.read().unwrap();
let map = self.ephemeral_page_map.read();
Some(*map.get(&(*file_id, *blkno))?)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
let map = self.immutable_page_map.read();
Some(*map.get(&(*file_id, *blkno))?)
}
}
@@ -641,7 +640,7 @@ impl PageCache {
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
match key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
let map = self.materialized_page_map.read();
let versions = map.get(hash_key)?;
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
@@ -651,11 +650,11 @@ impl PageCache {
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let map = self.ephemeral_page_map.read().unwrap();
let map = self.ephemeral_page_map.read();
Some(*map.get(&(*file_id, *blkno))?)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
let map = self.immutable_page_map.read();
Some(*map.get(&(*file_id, *blkno))?)
}
}
@@ -670,7 +669,7 @@ impl PageCache {
hash_key: old_hash_key,
lsn: old_lsn,
} => {
let mut map = self.materialized_page_map.write().unwrap();
let mut map = self.materialized_page_map.write();
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
let versions = old_entry.get_mut();
@@ -685,12 +684,12 @@ impl PageCache {
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let mut map = self.ephemeral_page_map.write().unwrap();
let mut map = self.ephemeral_page_map.write();
map.remove(&(*file_id, *blkno))
.expect("could not find old key in mapping");
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
let mut map = self.immutable_page_map.write();
map.remove(&(*file_id, *blkno))
.expect("could not find old key in mapping");
}
@@ -708,7 +707,7 @@ impl PageCache {
hash_key: new_key,
lsn: new_lsn,
} => {
let mut map = self.materialized_page_map.write().unwrap();
let mut map = self.materialized_page_map.write();
let versions = map.entry(new_key.clone()).or_default();
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
Ok(version_idx) => Some(versions[version_idx].slot_idx),
@@ -725,7 +724,7 @@ impl PageCache {
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let mut map = self.ephemeral_page_map.write().unwrap();
let mut map = self.ephemeral_page_map.write();
match map.entry((*file_id, *blkno)) {
Entry::Occupied(entry) => Some(*entry.get()),
Entry::Vacant(entry) => {
@@ -735,7 +734,7 @@ impl PageCache {
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
let mut map = self.immutable_page_map.write();
match map.entry((*file_id, *blkno)) {
Entry::Occupied(entry) => Some(*entry.get()),
Entry::Vacant(entry) => {
@@ -765,11 +764,8 @@ impl PageCache {
if slot.dec_usage_count() == 0 {
let mut inner = match slot.inner.try_write() {
Ok(inner) => inner,
Err(TryLockError::Poisoned(err)) => {
panic!("buffer lock was poisoned: {:?}", err)
}
Err(TryLockError::WouldBlock) => {
Some(inner) => inner,
None => {
// If we have looped through the whole buffer pool 10 times
// and still haven't found a victim buffer, something's wrong.
// Maybe all the buffers were in locked. That could happen in

View File

@@ -554,7 +554,7 @@ impl PageServerHandler {
// Create empty timeline
info!("creating new timeline");
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let timeline = repo.create_empty_timeline(timeline_id, base_lsn)?;
let timeline = repo.create_empty_timeline(timeline_id, Lsn(0))?;
let repartition_distance = repo.get_checkpoint_distance();
let mut datadir_timeline =
DatadirTimeline::<LayeredRepository>::new(timeline, repartition_distance);
@@ -1151,7 +1151,6 @@ impl postgres_backend::Handler for PageServerHandler {
LsnForTimestamp::Present(lsn) => format!("{}", lsn),
LsnForTimestamp::Future(_lsn) => "future".into(),
LsnForTimestamp::Past(_lsn) => "past".into(),
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
};
pgb.write_message_noflush(&BeMessage::DataRow(&[Some(result.as_bytes())]))?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;

View File

@@ -51,7 +51,6 @@ pub enum LsnForTimestamp {
Present(Lsn),
Future(Lsn),
Past(Lsn),
NoData(Lsn),
}
impl<R: Repository> DatadirTimeline<R> {
@@ -264,7 +263,7 @@ impl<R: Repository> DatadirTimeline<R> {
(false, false) => {
// This can happen if no commit records have been processed yet, e.g.
// just after importing a cluster.
Ok(LsnForTimestamp::NoData(max_lsn))
bail!("no commit timestamps found");
}
(true, false) => {
// Didn't find any commit timestamps larger than the request

View File

@@ -37,7 +37,7 @@ pub mod defaults {
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10_000;
}
/// Per-tenant configuration options

View File

@@ -178,7 +178,7 @@ async fn shutdown_all_wal_connections(
/// That may lead to certain events not being observed by the listener.
#[derive(Debug)]
struct TaskHandle<E> {
handle: JoinHandle<Result<(), String>>,
handle: JoinHandle<()>,
events_receiver: watch::Receiver<TaskEvent<E>>,
cancellation: watch::Sender<()>,
}
@@ -205,8 +205,8 @@ impl<E: Clone> TaskHandle<E> {
let sender = Arc::clone(&events_sender);
let handle = tokio::task::spawn(async move {
events_sender.send(TaskEvent::Started).ok();
task(sender, cancellation_receiver).await
let task_result = task(sender, cancellation_receiver).await;
events_sender.send(TaskEvent::End(task_result)).ok();
});
TaskHandle {
@@ -216,16 +216,6 @@ impl<E: Clone> TaskHandle<E> {
}
}
async fn next_task_event(&mut self) -> TaskEvent<E> {
select! {
next_task_event = self.events_receiver.changed() => match next_task_event {
Ok(()) => self.events_receiver.borrow().clone(),
Err(_task_channel_part_dropped) => join_on_handle(&mut self.handle).await,
},
task_completion_result = join_on_handle(&mut self.handle) => task_completion_result,
}
}
/// Aborts current task, waiting for it to finish.
async fn shutdown(self) {
self.cancellation.send(()).ok();
@@ -235,19 +225,6 @@ impl<E: Clone> TaskHandle<E> {
}
}
async fn join_on_handle<E>(handle: &mut JoinHandle<Result<(), String>>) -> TaskEvent<E> {
match handle.await {
Ok(task_result) => TaskEvent::End(task_result),
Err(e) => {
if e.is_cancelled() {
TaskEvent::End(Ok(()))
} else {
TaskEvent::End(Err(format!("WAL receiver task panicked: {e}")))
}
}
}
}
/// A step to process timeline attach/detach events to enable/disable the corresponding WAL receiver machinery.
/// In addition to WAL streaming management, the step ensures that corresponding tenant has its service threads enabled or disabled.
/// This is done here, since only walreceiver knows when a certain tenant has no streaming enabled.

View File

@@ -104,29 +104,49 @@ async fn connection_manager_loop_step(
Some(wal_connection_update) = async {
match walreceiver_state.wal_connection.as_mut() {
Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
Some(wal_connection) => {
let receiver = &mut wal_connection.connection_task.events_receiver;
Some(match receiver.changed().await {
Ok(()) => receiver.borrow().clone(),
Err(_cancellation_error) => TaskEvent::End(Ok(())),
})
}
None => None,
}
} => {
let wal_connection = walreceiver_state.wal_connection.as_mut().expect("Should have a connection, as checked by the corresponding select! guard");
match &wal_connection_update {
TaskEvent::Started => {
wal_connection.latest_connection_update = Utc::now().naive_utc();
*walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0) += 1;
},
TaskEvent::NewEvent(replication_feedback) => {
wal_connection.latest_connection_update = DateTime::<Local>::from(replication_feedback.ps_replytime).naive_utc();
// reset connection attempts here only, the only place where both nodes
// explicitly confirmn with replication feedback that they are connected to each other
walreceiver_state.wal_connection_attempts.remove(&wal_connection.sk_id);
},
let (connection_update, reset_connection_attempts) = match &wal_connection_update {
TaskEvent::Started => (Some(Utc::now().naive_utc()), true),
TaskEvent::NewEvent(replication_feedback) => (Some(DateTime::<Local>::from(replication_feedback.ps_replytime).naive_utc()), true),
TaskEvent::End(end_result) => {
match end_result {
Ok(()) => debug!("WAL receiving task finished"),
Err(e) => warn!("WAL receiving task failed: {e}"),
let should_reset_connection_attempts = match end_result {
Ok(()) => {
debug!("WAL receiving task finished");
true
},
Err(e) => {
warn!("WAL receiving task failed: {e}");
false
},
};
walreceiver_state.wal_connection = None;
(None, should_reset_connection_attempts)
},
};
if let Some(connection_update) = connection_update {
match &mut walreceiver_state.wal_connection {
Some(wal_connection) => {
wal_connection.latest_connection_update = connection_update;
let attempts_entry = walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0);
if reset_connection_attempts {
*attempts_entry = 0;
} else {
*attempts_entry += 1;
}
},
None => error!("Received connection update for WAL connection that is not active, update: {wal_connection_update:?}"),
}
}
},
@@ -386,8 +406,10 @@ impl WalreceiverState {
Some(existing_wal_connection) => {
let connected_sk_node = existing_wal_connection.sk_id;
let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) =
self.select_connection_candidate(Some(connected_sk_node))?;
let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) = self
.applicable_connection_candidates()
.filter(|&(sk_id, _, _)| sk_id != connected_sk_node)
.max_by_key(|(_, info, _)| info.commit_lsn)?;
let now = Utc::now().naive_utc();
if let Ok(latest_interaciton) =
@@ -440,8 +462,9 @@ impl WalreceiverState {
}
}
None => {
let (new_sk_id, _, new_wal_producer_connstr) =
self.select_connection_candidate(None)?;
let (new_sk_id, _, new_wal_producer_connstr) = self
.applicable_connection_candidates()
.max_by_key(|(_, info, _)| info.commit_lsn)?;
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_producer_connstr: new_wal_producer_connstr,
@@ -453,49 +476,6 @@ impl WalreceiverState {
None
}
/// Selects the best possible candidate, based on the data collected from etcd updates about the safekeepers.
/// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
///
/// The candidate that is chosen:
/// * has fewest connection attempts from pageserver to safekeeper node (reset every time the WAL replication feedback is sent)
/// * has greatest data Lsn among the ones that are left
///
/// NOTE:
/// We evict timeline data received from etcd based on time passed since it was registered, along with its connection attempts values, but
/// otherwise to reset the connection attempts, a successful connection to that node is needed.
/// That won't happen now, before all nodes with less connection attempts are connected to first, which might leave the sk node with more advanced state to be ignored.
fn select_connection_candidate(
&self,
node_to_omit: Option<NodeId>,
) -> Option<(NodeId, &SkTimelineInfo, String)> {
let all_candidates = self
.applicable_connection_candidates()
.filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
.collect::<Vec<_>>();
let smallest_attempts_allowed = all_candidates
.iter()
.map(|(sk_id, _, _)| {
self.wal_connection_attempts
.get(sk_id)
.copied()
.unwrap_or(0)
})
.min()?;
all_candidates
.into_iter()
.filter(|(sk_id, _, _)| {
smallest_attempts_allowed
>= self
.wal_connection_attempts
.get(sk_id)
.copied()
.unwrap_or(0)
})
.max_by_key(|(_, info, _)| info.commit_lsn)
}
fn applicable_connection_candidates(
&self,
) -> impl Iterator<Item = (NodeId, &SkTimelineInfo, String)> {
@@ -520,25 +500,15 @@ impl WalreceiverState {
}
fn cleanup_old_candidates(&mut self) {
let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
self.wal_stream_candidates.retain(|node_id, etcd_info| {
self.wal_stream_candidates.retain(|_, etcd_info| {
if let Ok(time_since_latest_etcd_update) =
(Utc::now().naive_utc() - etcd_info.latest_update).to_std()
{
let should_retain = time_since_latest_etcd_update < self.lagging_wal_timeout;
if !should_retain {
node_ids_to_remove.push(*node_id);
}
should_retain
time_since_latest_etcd_update < self.lagging_wal_timeout
} else {
true
}
});
for node_id in node_ids_to_remove {
self.wal_connection_attempts.remove(&node_id);
}
}
}
@@ -873,64 +843,6 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
let harness = RepoHarness::create("candidate_with_many_connection_failures")?;
let mut state = dummy_state(&harness);
let now = Utc::now().naive_utc();
let current_lsn = Lsn(100_000).align();
let bigger_lsn = Lsn(current_lsn.0 + 100).align();
state.wal_connection = None;
state.wal_stream_candidates = HashMap::from([
(
NodeId(0),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(bigger_lsn),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
latest_update: now,
},
),
(
NodeId(1),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(current_lsn),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
},
etcd_version: 0,
latest_update: now,
},
),
]);
state.wal_connection_attempts = HashMap::from([(NodeId(0), 1), (NodeId(1), 0)]);
let candidate_with_less_errors = state
.next_connection_candidate()
.expect("Expected one candidate selected, but got none");
assert_eq!(
candidate_with_less_errors.safekeeper_id,
NodeId(1),
"Should select the node with less connection errors"
);
Ok(())
}
#[tokio::test]
async fn connection_no_etcd_data_candidate() -> anyhow::Result<()> {
let harness = RepoHarness::create("connection_no_etcd_data_candidate")?;

View File

@@ -191,8 +191,3 @@ def test_import_from_pageserver(test_output_dir, pg_bin, vanilla_pg, neon_env_bu
# Check it's the same as the first fullbackup
# TODO pageserver should be checking checksum
assert os.path.getsize(tar_output_file) == os.path.getsize(new_tar_output_file)
# Check that gc works
psconn = env.pageserver.connect()
pscur = psconn.cursor()
pscur.execute(f"do_gc {tenant.hex} {timeline} 0")