mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-10 22:20:38 +00:00
Compare commits
27 Commits
jcsp/index
...
alek/isola
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79756c90e4 | ||
|
|
4c36e4969e | ||
|
|
9ffccb55f1 | ||
|
|
3a6b99f03c | ||
|
|
d39fd66773 | ||
|
|
73d7a9bc6e | ||
|
|
3a71cf38c1 | ||
|
|
25c66dc635 | ||
|
|
538373019a | ||
|
|
c58b22bacb | ||
|
|
17aea78aa7 | ||
|
|
71f9d9e5a3 | ||
|
|
119b86480f | ||
|
|
fa1f87b268 | ||
|
|
db48f7e40d | ||
|
|
e157b16c24 | ||
|
|
94ad9204bb | ||
|
|
c8aed107c5 | ||
|
|
da128a509a | ||
|
|
5993b2bedc | ||
|
|
4ce7aa9ffe | ||
|
|
cbd04f5140 | ||
|
|
1037a8ddd9 | ||
|
|
6661f4fd44 | ||
|
|
b9f84b9609 | ||
|
|
459253879e | ||
|
|
0fa85aa08e |
@@ -2,6 +2,12 @@ name: 'Create Allure report'
|
||||
description: 'Generate Allure report from uploaded by actions/allure-report-store tests results'
|
||||
|
||||
outputs:
|
||||
base-url:
|
||||
description: 'Base URL for Allure report'
|
||||
value: ${{ steps.generate-report.outputs.base-url }}
|
||||
base-s3-url:
|
||||
description: 'Base S3 URL for Allure report'
|
||||
value: ${{ steps.generate-report.outputs.base-s3-url }}
|
||||
report-url:
|
||||
description: 'Allure report URL'
|
||||
value: ${{ steps.generate-report.outputs.report-url }}
|
||||
@@ -63,8 +69,8 @@ runs:
|
||||
rm -f ${ALLURE_ZIP}
|
||||
fi
|
||||
env:
|
||||
ALLURE_VERSION: 2.22.1
|
||||
ALLURE_ZIP_SHA256: fdc7a62d94b14c5e0bf25198ae1feded6b005fdbed864b4d3cb4e5e901720b0b
|
||||
ALLURE_VERSION: 2.23.1
|
||||
ALLURE_ZIP_SHA256: 11141bfe727504b3fd80c0f9801eb317407fd0ac983ebb57e671f14bac4bcd86
|
||||
|
||||
# Potentially we could have several running build for the same key (for example, for the main branch), so we use improvised lock for this
|
||||
- name: Acquire lock
|
||||
@@ -102,6 +108,11 @@ runs:
|
||||
REPORT_PREFIX=reports/${BRANCH_OR_PR}
|
||||
RAW_PREFIX=reports-raw/${BRANCH_OR_PR}/${GITHUB_RUN_ID}
|
||||
|
||||
BASE_URL=https://${BUCKET}.s3.amazonaws.com/${REPORT_PREFIX}/${GITHUB_RUN_ID}
|
||||
BASE_S3_URL=s3://${BUCKET}/${REPORT_PREFIX}/${GITHUB_RUN_ID}
|
||||
REPORT_URL=${BASE_URL}/index.html
|
||||
REPORT_JSON_URL=${BASE_URL}/data/suites.json
|
||||
|
||||
# Get previously uploaded data for this run
|
||||
ZSTD_NBTHREADS=0
|
||||
|
||||
@@ -110,10 +121,9 @@ runs:
|
||||
# There's no previously uploaded data for this $GITHUB_RUN_ID
|
||||
exit 0
|
||||
fi
|
||||
for S3_FILEPATH in ${S3_FILEPATHS}; do
|
||||
time aws s3 cp --only-show-errors "s3://${BUCKET}/${S3_FILEPATH}" "${WORKDIR}"
|
||||
|
||||
archive=${WORKDIR}/$(basename $S3_FILEPATH)
|
||||
time aws s3 cp --recursive --only-show-errors "s3://${BUCKET}/${RAW_PREFIX}/" "${WORKDIR}/"
|
||||
for archive in $(find ${WORKDIR} -name "*.tar.zst"); do
|
||||
mkdir -p ${archive%.tar.zst}
|
||||
time tar -xf ${archive} -C ${archive%.tar.zst}
|
||||
rm -f ${archive}
|
||||
@@ -129,10 +139,9 @@ runs:
|
||||
sed -i 's|<a href="." class=|<a href="https://'${BUCKET}'.s3.amazonaws.com/'${REPORT_PREFIX}'/latest/index.html?nocache='"'+Date.now()+'"'" class=|g' ${WORKDIR}/report/app.js
|
||||
|
||||
# Upload a history and the final report (in this particular order to not to have duplicated history in 2 places)
|
||||
# Use sync for the final report to delete files from previous runs
|
||||
time aws s3 mv --recursive --only-show-errors "${WORKDIR}/report/history" "s3://${BUCKET}/${REPORT_PREFIX}/latest/history"
|
||||
time aws s3 mv --recursive --only-show-errors "${WORKDIR}/report" "s3://${BUCKET}/${REPORT_PREFIX}/${GITHUB_RUN_ID}"
|
||||
|
||||
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/${REPORT_PREFIX}/${GITHUB_RUN_ID}/index.html
|
||||
time aws s3 sync --delete --only-show-errors "${WORKDIR}/report" "s3://${BUCKET}/${REPORT_PREFIX}/${GITHUB_RUN_ID}"
|
||||
|
||||
# Generate redirect
|
||||
cat <<EOF > ${WORKDIR}/index.html
|
||||
@@ -144,8 +153,10 @@ runs:
|
||||
EOF
|
||||
time aws s3 cp --only-show-errors ${WORKDIR}/index.html "s3://${BUCKET}/${REPORT_PREFIX}/latest/index.html"
|
||||
|
||||
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
|
||||
echo "report-json-url=${REPORT_URL%/index.html}/data/suites.json" >> $GITHUB_OUTPUT
|
||||
echo "base-url=${BASE_URL}" >> $GITHUB_OUTPUT
|
||||
echo "base-s3-url=${BASE_S3_URL}" >> $GITHUB_OUTPUT
|
||||
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
|
||||
echo "report-json-url=${REPORT_JSON_URL}" >> $GITHUB_OUTPUT
|
||||
|
||||
echo "[Allure Report](${REPORT_URL})" >> ${GITHUB_STEP_SUMMARY}
|
||||
|
||||
|
||||
2
.github/actions/download/action.yml
vendored
2
.github/actions/download/action.yml
vendored
@@ -31,7 +31,7 @@ runs:
|
||||
BUCKET=neon-github-public-dev
|
||||
FILENAME=$(basename $ARCHIVE)
|
||||
|
||||
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX%$GITHUB_RUN_ATTEMPT} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
|
||||
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX%$GITHUB_RUN_ATTEMPT} | jq -r '.Contents[]?.Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
|
||||
if [ -z "${S3_KEY}" ]; then
|
||||
if [ "${SKIP_IF_DOES_NOT_EXIST}" = "true" ]; then
|
||||
echo 'SKIPPED=true' >> $GITHUB_OUTPUT
|
||||
|
||||
22
.github/workflows/build_and_test.yml
vendored
22
.github/workflows/build_and_test.yml
vendored
@@ -471,6 +471,26 @@ jobs:
|
||||
--build-type ${BUILD_TYPE} \
|
||||
--ingest suites.json
|
||||
|
||||
- name: Store Allure test stat in the DB (new)
|
||||
if: ${{ !cancelled() && steps.create-allure-report.outputs.report-json-url }}
|
||||
env:
|
||||
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
REPORT_JSON_URL: ${{ steps.create-allure-report.outputs.report-json-url }}
|
||||
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
|
||||
BASE_S3_URL: ${{ steps.create-allure-report.outputs.base-s3-url }}
|
||||
run: |
|
||||
aws s3 cp --only-show-errors --recursive ${BASE_S3_URL}/data/test-cases ./test-cases
|
||||
|
||||
./scripts/pysync
|
||||
|
||||
export DATABASE_URL="$TEST_RESULT_CONNSTR"
|
||||
poetry run python3 scripts/ingest_regress_test_result-new-format.py \
|
||||
--reference ${GITHUB_REF} \
|
||||
--revision ${COMMIT_SHA} \
|
||||
--run-id ${GITHUB_RUN_ID} \
|
||||
--run-attempt ${GITHUB_RUN_ATTEMPT} \
|
||||
--test-cases-dir ./test-cases
|
||||
|
||||
coverage-report:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container:
|
||||
@@ -1067,7 +1087,7 @@ jobs:
|
||||
OLD_PREFIX=artifacts/${GITHUB_RUN_ID}
|
||||
FILENAME=neon-${{ runner.os }}-${build_type}-artifact.tar.zst
|
||||
|
||||
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${OLD_PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
|
||||
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${OLD_PREFIX} | jq -r '.Contents[]?.Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
|
||||
if [ -z "${S3_KEY}" ]; then
|
||||
echo >&2 "Neither s3://${BUCKET}/${OLD_PREFIX}/${FILENAME} nor its version from previous attempts exist"
|
||||
exit 1
|
||||
|
||||
@@ -58,16 +58,12 @@ use compute_tools::monitor::launch_monitor;
|
||||
use compute_tools::params::*;
|
||||
use compute_tools::spec::*;
|
||||
|
||||
// this is an arbitrary build tag. Fine as a default / for testing purposes
|
||||
// in-case of not-set environment var
|
||||
const BUILD_TAG_DEFAULT: &str = "5670669815";
|
||||
|
||||
fn main() -> Result<()> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
|
||||
|
||||
let build_tag = option_env!("BUILD_TAG")
|
||||
.unwrap_or(BUILD_TAG_DEFAULT)
|
||||
.to_string();
|
||||
// if there is no build tag environment variable, use a "default value"
|
||||
// the use-case for this "default value" is regression tests that don't require BUILD_TAG
|
||||
let build_tag = option_env!("BUILD_TAG").unwrap_or("5670669815").to_string();
|
||||
info!("build_tag: {build_tag}");
|
||||
|
||||
let matches = cli().get_matches();
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Condvar, Mutex, OnceLock, RwLock};
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
@@ -24,7 +25,7 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||
use utils::measured_stream::MeasuredReader;
|
||||
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
|
||||
|
||||
use crate::pg_helpers::*;
|
||||
use crate::spec::*;
|
||||
@@ -285,7 +286,7 @@ impl ComputeNode {
|
||||
#[instrument(skip_all, fields(%lsn))]
|
||||
fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
|
||||
let spec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let start_time = Utc::now();
|
||||
let start_time = Instant::now();
|
||||
|
||||
let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
|
||||
|
||||
@@ -298,7 +299,10 @@ impl ComputeNode {
|
||||
info!("Storage auth token not set");
|
||||
}
|
||||
|
||||
// Connect to pageserver
|
||||
let mut client = config.connect(NoTls)?;
|
||||
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
|
||||
|
||||
let basebackup_cmd = match lsn {
|
||||
// HACK We don't use compression on first start (Lsn(0)) because there's no API for it
|
||||
Lsn(0) => format!("basebackup {} {}", spec.tenant_id, spec.timeline_id),
|
||||
@@ -344,13 +348,10 @@ impl ComputeNode {
|
||||
};
|
||||
|
||||
// Report metrics
|
||||
self.state.lock().unwrap().metrics.basebackup_bytes =
|
||||
measured_reader.get_byte_count() as u64;
|
||||
self.state.lock().unwrap().metrics.basebackup_ms = Utc::now()
|
||||
.signed_duration_since(start_time)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.metrics.pageserver_connect_micros = pageserver_connect_micros;
|
||||
state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
|
||||
state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -925,6 +926,7 @@ LIMIT 100",
|
||||
let spec = &pspec.spec;
|
||||
let custom_ext = spec.custom_extensions.clone().unwrap_or(Vec::new());
|
||||
info!("custom extensions: {:?}", &custom_ext);
|
||||
|
||||
let (ext_remote_paths, library_index) = extension_server::get_available_extensions(
|
||||
ext_remote_storage,
|
||||
&self.pgbin,
|
||||
@@ -944,94 +946,113 @@ LIMIT 100",
|
||||
}
|
||||
|
||||
// download an archive, unzip and place files in correct locations
|
||||
pub async fn download_extension(&self, ext_name: &str, is_library: bool) -> Result<u64> {
|
||||
match &self.ext_remote_storage {
|
||||
None => anyhow::bail!("No remote extension storage"),
|
||||
Some(remote_storage) => {
|
||||
let mut real_ext_name = ext_name.to_string();
|
||||
if is_library {
|
||||
// sometimes library names might have a suffix like
|
||||
// library.so or library.so.3. We strip this off
|
||||
// because library_index is based on the name without the file extension
|
||||
let strip_lib_suffix = Regex::new(r"\.so.*").unwrap();
|
||||
let lib_raw_name = strip_lib_suffix.replace(&real_ext_name, "").to_string();
|
||||
real_ext_name = self
|
||||
.library_index
|
||||
.get()
|
||||
.expect("must have already downloaded the library_index")[&lib_raw_name]
|
||||
.clone();
|
||||
}
|
||||
pub async fn download_extension(
|
||||
&self,
|
||||
ext_name: &str,
|
||||
is_library: bool,
|
||||
) -> Result<u64, DownloadError> {
|
||||
let remote_storage = self
|
||||
.ext_remote_storage
|
||||
.as_ref()
|
||||
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
|
||||
"Remote extensions storage is not configured",
|
||||
)))?;
|
||||
|
||||
let ext_path = &self
|
||||
.ext_remote_paths
|
||||
.get()
|
||||
.expect("error accessing ext_remote_paths")[&real_ext_name];
|
||||
let ext_archive_name = ext_path.object_name().expect("bad path");
|
||||
let mut real_ext_name = ext_name;
|
||||
if is_library {
|
||||
// sometimes library names might have a suffix like
|
||||
// library.so or library.so.3. We strip this off
|
||||
// because library_index is based on the name without the file extension
|
||||
let strip_lib_suffix = Regex::new(r"\.so.*").unwrap();
|
||||
let lib_raw_name = strip_lib_suffix.replace(real_ext_name, "").to_string();
|
||||
|
||||
let mut first_try = false;
|
||||
if !self
|
||||
.ext_download_progress
|
||||
.read()
|
||||
.expect("lock err")
|
||||
.contains_key(ext_archive_name)
|
||||
{
|
||||
self.ext_download_progress
|
||||
.write()
|
||||
.expect("lock err")
|
||||
.insert(ext_archive_name.to_string(), (Utc::now(), false));
|
||||
first_try = true;
|
||||
}
|
||||
let (download_start, download_completed) =
|
||||
self.ext_download_progress.read().expect("lock err")[ext_archive_name];
|
||||
let start_time_delta = Utc::now()
|
||||
.signed_duration_since(download_start)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
|
||||
// how long to wait for extension download if it was started by another process
|
||||
const HANG_TIMEOUT: u64 = 3000; // milliseconds
|
||||
|
||||
if download_completed {
|
||||
info!("extension already downloaded, skipping re-download");
|
||||
return Ok(0);
|
||||
} else if start_time_delta < HANG_TIMEOUT && !first_try {
|
||||
info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout");
|
||||
let mut interval =
|
||||
tokio::time::interval(tokio::time::Duration::from_millis(500));
|
||||
loop {
|
||||
info!("waiting for download");
|
||||
interval.tick().await;
|
||||
let (_, download_completed_now) =
|
||||
self.ext_download_progress.read().expect("lock")[ext_archive_name];
|
||||
if download_completed_now {
|
||||
info!("download finished by whoever else downloaded it");
|
||||
return Ok(0);
|
||||
}
|
||||
}
|
||||
// NOTE: the above loop will get terminated
|
||||
// based on the timeout of the download function
|
||||
}
|
||||
|
||||
// if extension hasn't been downloaded before or the previous
|
||||
// attempt to download was at least HANG_TIMEOUT ms ago
|
||||
// then we try to download it here
|
||||
info!("downloading new extension {ext_archive_name}");
|
||||
|
||||
let download_size = extension_server::download_extension(
|
||||
&real_ext_name,
|
||||
ext_path,
|
||||
remote_storage,
|
||||
&self.pgbin,
|
||||
)
|
||||
.await;
|
||||
self.ext_download_progress
|
||||
.write()
|
||||
.expect("bad lock")
|
||||
.insert(ext_archive_name.to_string(), (download_start, true));
|
||||
download_size
|
||||
}
|
||||
real_ext_name = self
|
||||
.library_index
|
||||
.get()
|
||||
.expect("must have already downloaded the library_index")
|
||||
.get(&lib_raw_name)
|
||||
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
|
||||
"library {} is not found",
|
||||
lib_raw_name
|
||||
)))?;
|
||||
}
|
||||
|
||||
let ext_path = &self
|
||||
.ext_remote_paths
|
||||
.get()
|
||||
.expect("error accessing ext_remote_paths")
|
||||
.get(real_ext_name)
|
||||
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
|
||||
"real_ext_name {} is not found",
|
||||
real_ext_name
|
||||
)))?;
|
||||
|
||||
let ext_archive_name = ext_path.object_name().expect("bad path");
|
||||
|
||||
let mut first_try = false;
|
||||
if !self
|
||||
.ext_download_progress
|
||||
.read()
|
||||
.expect("lock err")
|
||||
.contains_key(ext_archive_name)
|
||||
{
|
||||
self.ext_download_progress
|
||||
.write()
|
||||
.expect("lock err")
|
||||
.insert(ext_archive_name.to_string(), (Utc::now(), false));
|
||||
first_try = true;
|
||||
}
|
||||
let (download_start, download_completed) =
|
||||
self.ext_download_progress.read().expect("lock err")[ext_archive_name];
|
||||
let start_time_delta = Utc::now()
|
||||
.signed_duration_since(download_start)
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
|
||||
// how long to wait for extension download if it was started by another process
|
||||
const HANG_TIMEOUT: u64 = 3000; // milliseconds
|
||||
|
||||
if download_completed {
|
||||
info!("extension already downloaded, skipping re-download");
|
||||
return Ok(0);
|
||||
} else if start_time_delta < HANG_TIMEOUT && !first_try {
|
||||
info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout");
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
|
||||
loop {
|
||||
info!("waiting for download");
|
||||
interval.tick().await;
|
||||
let (_, download_completed_now) =
|
||||
self.ext_download_progress.read().expect("lock")[ext_archive_name];
|
||||
if download_completed_now {
|
||||
info!("download finished by whoever else downloaded it");
|
||||
return Ok(0);
|
||||
}
|
||||
}
|
||||
// NOTE: the above loop will get terminated
|
||||
// based on the timeout of the download function
|
||||
}
|
||||
|
||||
// if extension hasn't been downloaded before or the previous
|
||||
// attempt to download was at least HANG_TIMEOUT ms ago
|
||||
// then we try to download it here
|
||||
info!("downloading new extension {ext_archive_name}");
|
||||
|
||||
let download_size = extension_server::download_extension(
|
||||
real_ext_name,
|
||||
ext_path,
|
||||
remote_storage,
|
||||
&self.pgbin,
|
||||
)
|
||||
.await
|
||||
.map_err(DownloadError::Other);
|
||||
|
||||
self.ext_download_progress
|
||||
.write()
|
||||
.expect("bad lock")
|
||||
.insert(ext_archive_name.to_string(), (download_start, true));
|
||||
|
||||
download_size
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -1090,7 +1111,17 @@ LIMIT 100",
|
||||
.as_millis() as u64;
|
||||
info!("Prepare extensions took {prep_ext_time_delta}ms");
|
||||
|
||||
// Don't try to download libraries that are not in the index.
|
||||
// Assume that they are already present locally.
|
||||
libs_vec.retain(|lib| {
|
||||
self.library_index
|
||||
.get()
|
||||
.expect("error accessing ext_remote_paths")
|
||||
.contains_key(lib)
|
||||
});
|
||||
|
||||
info!("Downloading to shared preload libraries: {:?}", &libs_vec);
|
||||
|
||||
let mut download_tasks = Vec::new();
|
||||
for library in &libs_vec {
|
||||
download_tasks.push(self.download_extension(library, true));
|
||||
@@ -1104,8 +1135,19 @@ LIMIT 100",
|
||||
prep_extensions_ms: prep_ext_time_delta,
|
||||
};
|
||||
for result in results {
|
||||
let download_size = result?;
|
||||
remote_ext_metrics.num_ext_downloaded += 1;
|
||||
let download_size = match result {
|
||||
Ok(res) => {
|
||||
remote_ext_metrics.num_ext_downloaded += 1;
|
||||
res
|
||||
}
|
||||
Err(err) => {
|
||||
// if we failed to download an extension, we don't want to fail the whole
|
||||
// process, but we do want to log the error
|
||||
error!("Failed to download extension: {}", err);
|
||||
0
|
||||
}
|
||||
};
|
||||
|
||||
remote_ext_metrics.largest_ext_size =
|
||||
std::cmp::max(remote_ext_metrics.largest_ext_size, download_size);
|
||||
remote_ext_metrics.total_ext_download_size += download_size;
|
||||
|
||||
@@ -156,7 +156,7 @@ pub async fn get_available_extensions(
|
||||
let ext_index_full = serde_json::from_slice::<Index>(&ext_idx_buffer)?;
|
||||
let mut enabled_extensions = ext_index_full.public_extensions;
|
||||
enabled_extensions.extend_from_slice(custom_extensions);
|
||||
let library_index = ext_index_full.library_index;
|
||||
let mut library_index = ext_index_full.library_index;
|
||||
let all_extension_data = ext_index_full.extension_data;
|
||||
info!("library_index: {:?}", library_index);
|
||||
|
||||
@@ -169,13 +169,19 @@ pub async fn get_available_extensions(
|
||||
let extension_name = control_file
|
||||
.strip_suffix(".control")
|
||||
.expect("control files must end in .control");
|
||||
ext_remote_paths.insert(
|
||||
extension_name.to_string(),
|
||||
RemotePath::from_string(&ext_data.archive_path)?,
|
||||
);
|
||||
let control_path = local_sharedir.join(control_file);
|
||||
info!("writing file {:?}{:?}", control_path, control_contents);
|
||||
file_create_tasks.push(tokio::fs::write(control_path, control_contents));
|
||||
if !control_path.exists() {
|
||||
ext_remote_paths.insert(
|
||||
extension_name.to_string(),
|
||||
RemotePath::from_string(&ext_data.archive_path)?,
|
||||
);
|
||||
info!("writing file {:?}{:?}", control_path, control_contents);
|
||||
file_create_tasks.push(tokio::fs::write(control_path, control_contents));
|
||||
} else {
|
||||
warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_file);
|
||||
// also delete this from library index
|
||||
library_index.retain(|_, value| value != extension_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
let results = join_all(file_create_tasks).await;
|
||||
@@ -222,7 +228,7 @@ pub async fn download_extension(
|
||||
);
|
||||
let libdir_paths = (
|
||||
unzip_dest.to_string() + "/lib",
|
||||
Path::new(&get_pg_config("--libdir", pgbin)).join("postgresql"),
|
||||
Path::new(&get_pg_config("--pkglibdir", pgbin)).to_path_buf(),
|
||||
);
|
||||
// move contents of the libdir / sharedir in unzipped archive to the correct local paths
|
||||
for paths in [sharedir_paths, libdir_paths] {
|
||||
|
||||
@@ -141,6 +141,15 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
let filename = route.split('/').last().unwrap().to_string();
|
||||
info!("serving /extension_server POST request, filename: {filename:?} is_library: {is_library}");
|
||||
|
||||
// don't even try to download extensions
|
||||
// if no remote storage is configured
|
||||
if compute.ext_remote_storage.is_none() {
|
||||
info!("no extensions remote storage configured");
|
||||
let mut resp = Response::new(Body::from("no remote storage configured"));
|
||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
return resp;
|
||||
}
|
||||
|
||||
match compute.download_extension(&filename, is_library).await {
|
||||
Ok(_) => Response::new(Body::from("OK")),
|
||||
Err(e) => {
|
||||
|
||||
@@ -270,7 +270,7 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
}
|
||||
RoleAction::Create => {
|
||||
let mut query: String = format!(
|
||||
"CREATE ROLE {} CREATEROLE CREATEDB IN ROLE neon_superuser",
|
||||
"CREATE ROLE {} CREATEROLE CREATEDB BYPASSRLS IN ROLE neon_superuser",
|
||||
name.pg_quote()
|
||||
);
|
||||
info!("role create query: '{}'", &query);
|
||||
|
||||
@@ -68,13 +68,40 @@ where
|
||||
/// Response of the /metrics.json API
|
||||
#[derive(Clone, Debug, Default, Serialize)]
|
||||
pub struct ComputeMetrics {
|
||||
/// Time spent waiting in pool
|
||||
pub wait_for_spec_ms: u64,
|
||||
pub sync_safekeepers_ms: u64,
|
||||
|
||||
/// Time spent checking if safekeepers are synced
|
||||
pub sync_sk_check_ms: u64,
|
||||
|
||||
/// Time spent syncing safekeepers (walproposer.c).
|
||||
/// In most cases this should be zero.
|
||||
pub sync_safekeepers_ms: u64,
|
||||
|
||||
/// Time it took to establish a pg connection to the pageserver.
|
||||
/// This is two roundtrips, so it's a good proxy for compute-pageserver
|
||||
/// latency. The latency is usually 0.2ms, but it's not safe to assume
|
||||
/// that.
|
||||
pub pageserver_connect_micros: u64,
|
||||
|
||||
/// Time to get basebackup from pageserver and write it to disk.
|
||||
pub basebackup_ms: u64,
|
||||
|
||||
/// Compressed size of basebackup received.
|
||||
pub basebackup_bytes: u64,
|
||||
|
||||
/// Time spent starting potgres. This includes initialization of shared
|
||||
/// buffers, preloading extensions, and other pg operations.
|
||||
pub start_postgres_ms: u64,
|
||||
|
||||
/// Time spent applying pg catalog updates that were made in the console
|
||||
/// UI. This should be 0 when startup time matters, since cplane tries
|
||||
/// to do these updates eagerly, and passes the skip_pg_catalog_updates
|
||||
/// when it's safe to skip this step.
|
||||
pub config_ms: u64,
|
||||
|
||||
/// Total time, from when we receive the spec to when we're ready to take
|
||||
/// pg connections.
|
||||
pub total_startup_ms: u64,
|
||||
pub load_ext_ms: u64,
|
||||
pub num_ext_downloaded: u64,
|
||||
|
||||
@@ -24,6 +24,20 @@ pub async fn is_directory_empty(path: impl AsRef<Path>) -> anyhow::Result<bool>
|
||||
Ok(dir.next_entry().await?.is_none())
|
||||
}
|
||||
|
||||
pub async fn list_dir(path: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
|
||||
let mut dir = tokio::fs::read_dir(&path)
|
||||
.await
|
||||
.context(format!("read_dir({})", path.as_ref().display()))?;
|
||||
|
||||
let mut content = vec![];
|
||||
while let Some(next) = dir.next_entry().await? {
|
||||
let file_name = next.file_name();
|
||||
content.push(file_name.to_string_lossy().to_string());
|
||||
}
|
||||
|
||||
Ok(content)
|
||||
}
|
||||
|
||||
pub fn ignore_not_found(e: io::Error) -> io::Result<()> {
|
||||
if e.kind() == io::ErrorKind::NotFound {
|
||||
Ok(())
|
||||
@@ -43,7 +57,7 @@ where
|
||||
mod test {
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::fs_ext::is_directory_empty;
|
||||
use crate::fs_ext::{is_directory_empty, list_dir};
|
||||
|
||||
use super::ignore_absent_files;
|
||||
|
||||
@@ -109,4 +123,25 @@ mod test {
|
||||
|
||||
assert!(!file_path.exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_dir_works() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let dir_path = dir.path();
|
||||
|
||||
assert!(list_dir(dir_path).await.unwrap().is_empty());
|
||||
|
||||
let file_path: PathBuf = dir_path.join("testfile");
|
||||
let _ = std::fs::File::create(&file_path).unwrap();
|
||||
|
||||
assert_eq!(&list_dir(dir_path).await.unwrap(), &["testfile"]);
|
||||
|
||||
let another_dir_path: PathBuf = dir_path.join("testdir");
|
||||
std::fs::create_dir(another_dir_path).unwrap();
|
||||
|
||||
let expected = &["testdir", "testfile"];
|
||||
let mut actual = list_dir(dir_path).await.unwrap();
|
||||
actual.sort();
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -373,7 +373,7 @@ fn start_pageserver(
|
||||
let order = pageserver::InitializationOrder {
|
||||
initial_tenant_load: Some(init_done_tx),
|
||||
initial_logical_size_can_start: init_done_rx.clone(),
|
||||
initial_logical_size_attempt: init_logical_size_done_tx,
|
||||
initial_logical_size_attempt: Some(init_logical_size_done_tx),
|
||||
background_jobs_can_start: background_jobs_barrier.clone(),
|
||||
};
|
||||
|
||||
|
||||
@@ -31,7 +31,9 @@ use utils::{
|
||||
use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
|
||||
use crate::tenant::config::TenantConf;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::{TENANT_ATTACHING_MARKER_FILENAME, TIMELINES_SEGMENT_NAME};
|
||||
use crate::tenant::{
|
||||
TENANT_ATTACHING_MARKER_FILENAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
|
||||
};
|
||||
use crate::{
|
||||
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX,
|
||||
TIMELINE_UNINIT_MARK_SUFFIX,
|
||||
@@ -613,6 +615,11 @@ impl PageServerConf {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn tenant_deleted_mark_file_path(&self, tenant_id: &TenantId) -> PathBuf {
|
||||
self.tenant_path(tenant_id)
|
||||
.join(TENANT_DELETED_MARKER_FILE_NAME)
|
||||
}
|
||||
|
||||
pub fn traces_path(&self) -> PathBuf {
|
||||
self.workdir.join("traces")
|
||||
}
|
||||
|
||||
@@ -304,17 +304,18 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
// Debug-log the list of candidates
|
||||
let now = SystemTime::now();
|
||||
for (i, (partition, candidate)) in candidates.iter().enumerate() {
|
||||
let desc = candidate.layer.layer_desc();
|
||||
debug!(
|
||||
"cand {}/{}: size={}, no_access_for={}us, partition={:?}, {}/{}/{}",
|
||||
i + 1,
|
||||
candidates.len(),
|
||||
candidate.layer.file_size(),
|
||||
desc.file_size,
|
||||
now.duration_since(candidate.last_activity_ts)
|
||||
.unwrap()
|
||||
.as_micros(),
|
||||
partition,
|
||||
candidate.layer.get_tenant_id(),
|
||||
candidate.layer.get_timeline_id(),
|
||||
desc.tenant_id,
|
||||
desc.timeline_id,
|
||||
candidate.layer,
|
||||
);
|
||||
}
|
||||
@@ -346,7 +347,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
warned = Some(usage_planned);
|
||||
}
|
||||
|
||||
usage_planned.add_available_bytes(candidate.layer.file_size());
|
||||
usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size);
|
||||
|
||||
batched
|
||||
.entry(TimelineKey(candidate.timeline))
|
||||
@@ -389,15 +390,16 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
Ok(results) => {
|
||||
assert_eq!(results.len(), batch.len());
|
||||
for (result, layer) in results.into_iter().zip(batch.iter()) {
|
||||
let file_size = layer.layer_desc().file_size;
|
||||
match result {
|
||||
Some(Ok(())) => {
|
||||
usage_assumed.add_available_bytes(layer.file_size());
|
||||
usage_assumed.add_available_bytes(file_size);
|
||||
}
|
||||
Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
|
||||
unreachable!("get_local_layers_for_disk_usage_eviction finds only local layers")
|
||||
}
|
||||
Some(Err(EvictionError::FileNotFound)) => {
|
||||
evictions_failed.file_sizes += layer.file_size();
|
||||
evictions_failed.file_sizes += file_size;
|
||||
evictions_failed.count += 1;
|
||||
}
|
||||
Some(Err(
|
||||
@@ -406,7 +408,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
)) => {
|
||||
let e = utils::error::report_compact_sources(&e);
|
||||
warn!(%layer, "failed to evict layer: {e}");
|
||||
evictions_failed.file_sizes += layer.file_size();
|
||||
evictions_failed.file_sizes += file_size;
|
||||
evictions_failed.count += 1;
|
||||
}
|
||||
None => {
|
||||
|
||||
@@ -93,6 +93,47 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
delete:
|
||||
description: |
|
||||
Attempts to delete specified tenant. 500 and 409 errors should be retried until 404 is retrieved.
|
||||
404 means that deletion successfully finished"
|
||||
responses:
|
||||
"400":
|
||||
description: Error when no tenant id found in path
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"404":
|
||||
description: Tenant not found
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/NotFoundError"
|
||||
"409":
|
||||
description: Deletion is already in progress, continue polling
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ConflictError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline:
|
||||
parameters:
|
||||
@@ -820,6 +861,7 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/config:
|
||||
put:
|
||||
description: |
|
||||
|
||||
@@ -187,7 +187,7 @@ impl From<crate::tenant::DeleteTimelineError> for ApiError {
|
||||
format!("Cannot delete timeline which has child timelines: {children:?}")
|
||||
.into_boxed_str(),
|
||||
),
|
||||
a @ AlreadyInProgress => ApiError::Conflict(a.to_string()),
|
||||
a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()),
|
||||
Other(e) => ApiError::InternalServerError(e),
|
||||
}
|
||||
}
|
||||
@@ -208,6 +208,19 @@ impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
|
||||
fn from(value: crate::tenant::delete::DeleteTenantError) -> Self {
|
||||
use crate::tenant::delete::DeleteTenantError::*;
|
||||
match value {
|
||||
Get(g) => ApiError::from(g),
|
||||
e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
|
||||
Timeline(t) => ApiError::from(t),
|
||||
Other(o) => ApiError::InternalServerError(o),
|
||||
e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to construct a TimelineInfo struct for a timeline
|
||||
async fn build_timeline_info(
|
||||
timeline: &Arc<Timeline>,
|
||||
@@ -617,6 +630,23 @@ async fn tenant_status(
|
||||
json_response(StatusCode::OK, tenant_info)
|
||||
}
|
||||
|
||||
async fn tenant_delete_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
// TODO openapi spec
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_id)
|
||||
.instrument(info_span!("tenant_delete_handler", %tenant_id))
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
/// HTTP endpoint to query the current tenant_size of a tenant.
|
||||
///
|
||||
/// This is not used by consumption metrics under [`crate::consumption_metrics`], but can be used
|
||||
@@ -1345,6 +1375,9 @@ pub fn make_router(
|
||||
.get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
|
||||
.post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
|
||||
.get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status))
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
api_handler(r, tenant_delete_handler)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/synthetic_size", |r| {
|
||||
api_handler(r, tenant_size_handler)
|
||||
})
|
||||
|
||||
@@ -190,7 +190,7 @@ pub struct InitializationOrder {
|
||||
|
||||
/// Each timeline owns a clone of this to be consumed on the initial logical size calculation
|
||||
/// attempt. It is important to drop this once the attempt has completed.
|
||||
pub initial_logical_size_attempt: utils::completion::Completion,
|
||||
pub initial_logical_size_attempt: Option<utils::completion::Completion>,
|
||||
|
||||
/// Barrier for when we can start any background jobs.
|
||||
///
|
||||
@@ -226,6 +226,7 @@ async fn timed<Fut: std::future::Future>(
|
||||
|
||||
let ret = fut.await;
|
||||
|
||||
// this has a global allowed_errors
|
||||
tracing::warn!(
|
||||
task = name,
|
||||
elapsed_ms = started.elapsed().as_millis(),
|
||||
|
||||
@@ -28,6 +28,7 @@ use std::cmp::min;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
@@ -46,8 +47,10 @@ use std::sync::{Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use self::config::TenantConf;
|
||||
use self::delete::DeleteTenantFlow;
|
||||
use self::metadata::LoadMetadataError;
|
||||
use self::metadata::TimelineMetadata;
|
||||
use self::mgr::TenantsMap;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
use self::timeline::uninit::TimelineUninitMark;
|
||||
use self::timeline::uninit::UninitializedTimeline;
|
||||
@@ -106,6 +109,7 @@ macro_rules! pausable_failpoint {
|
||||
|
||||
pub mod blob_io;
|
||||
pub mod block_io;
|
||||
|
||||
pub mod disk_btree;
|
||||
pub(crate) mod ephemeral_file;
|
||||
pub mod layer_map;
|
||||
@@ -118,6 +122,7 @@ mod remote_timeline_client;
|
||||
pub mod storage_layer;
|
||||
|
||||
pub mod config;
|
||||
pub mod delete;
|
||||
pub mod mgr;
|
||||
pub mod tasks;
|
||||
pub mod upload_queue;
|
||||
@@ -145,6 +150,8 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
|
||||
|
||||
pub const TENANT_ATTACHING_MARKER_FILENAME: &str = "attaching";
|
||||
|
||||
pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
|
||||
|
||||
///
|
||||
/// Tenant consists of multiple timelines. Keep them in a hash table.
|
||||
///
|
||||
@@ -183,6 +190,8 @@ pub struct Tenant {
|
||||
cached_synthetic_tenant_size: Arc<AtomicU64>,
|
||||
|
||||
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
|
||||
|
||||
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
|
||||
}
|
||||
|
||||
// We should not blindly overwrite local metadata with remote one.
|
||||
@@ -274,7 +283,7 @@ pub enum LoadLocalTimelineError {
|
||||
ResumeDeletion(#[source] anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[derive(thiserror::Error)]
|
||||
pub enum DeleteTimelineError {
|
||||
#[error("NotFound")]
|
||||
NotFound,
|
||||
@@ -283,17 +292,37 @@ pub enum DeleteTimelineError {
|
||||
HasChildren(Vec<TimelineId>),
|
||||
|
||||
#[error("Timeline deletion is already in progress")]
|
||||
AlreadyInProgress,
|
||||
AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl Debug for DeleteTimelineError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::NotFound => write!(f, "NotFound"),
|
||||
Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
|
||||
Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
|
||||
Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum SetStoppingError {
|
||||
AlreadyStopping(completion::Barrier),
|
||||
Broken,
|
||||
}
|
||||
|
||||
impl Debug for SetStoppingError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
|
||||
Self::Broken => write!(f, "Broken"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct RemoteStartupData {
|
||||
index_part: IndexPart,
|
||||
remote_metadata: TimelineMetadata,
|
||||
@@ -616,7 +645,7 @@ impl Tenant {
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
// and build a layer map that contains an entry for each remote and local
|
||||
// layer file.
|
||||
let sorted_timelines = tree_sort_timelines(timeline_ancestors)?;
|
||||
let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
|
||||
for (timeline_id, remote_metadata) in sorted_timelines {
|
||||
let (index_part, remote_client) = remote_index_and_client
|
||||
.remove(&timeline_id)
|
||||
@@ -740,12 +769,13 @@ impl Tenant {
|
||||
/// If the loading fails for some reason, the Tenant will go into Broken
|
||||
/// state.
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_id))]
|
||||
pub fn spawn_load(
|
||||
pub(crate) fn spawn_load(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
init_order: Option<InitializationOrder>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
ctx: &RequestContext,
|
||||
) -> Arc<Tenant> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
@@ -765,7 +795,7 @@ impl Tenant {
|
||||
tenant_conf,
|
||||
wal_redo_manager,
|
||||
tenant_id,
|
||||
remote_storage,
|
||||
remote_storage.clone(),
|
||||
);
|
||||
let tenant = Arc::new(tenant);
|
||||
|
||||
@@ -781,27 +811,83 @@ impl Tenant {
|
||||
"initial tenant load",
|
||||
false,
|
||||
async move {
|
||||
let make_broken = |t: &Tenant, err: anyhow::Error| {
|
||||
error!("load failed, setting tenant state to Broken: {err:?}");
|
||||
t.state.send_modify(|state| {
|
||||
assert!(
|
||||
matches!(*state, TenantState::Loading | TenantState::Stopping { .. }),
|
||||
"the loading task owns the tenant state until activation is complete"
|
||||
);
|
||||
*state = TenantState::broken_from_reason(err.to_string());
|
||||
});
|
||||
};
|
||||
|
||||
let mut init_order = init_order;
|
||||
|
||||
// take the completion because initial tenant loading will complete when all of
|
||||
// these tasks complete.
|
||||
let _completion = init_order.as_mut().and_then(|x| x.initial_tenant_load.take());
|
||||
let _completion = init_order
|
||||
.as_mut()
|
||||
.and_then(|x| x.initial_tenant_load.take());
|
||||
|
||||
// Dont block pageserver startup on figuring out deletion status
|
||||
let pending_deletion = {
|
||||
match DeleteTenantFlow::should_resume_deletion(
|
||||
conf,
|
||||
remote_storage.as_ref(),
|
||||
&tenant_clone,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(should_resume_deletion) => should_resume_deletion,
|
||||
Err(err) => {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(err));
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
info!("pending deletion {}", pending_deletion.is_some());
|
||||
|
||||
if let Some(deletion) = pending_deletion {
|
||||
// as we are no longer loading, signal completion by dropping
|
||||
// the completion while we resume deletion
|
||||
drop(_completion);
|
||||
// do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
|
||||
let _ = init_order
|
||||
.as_mut()
|
||||
.and_then(|x| x.initial_logical_size_attempt.take());
|
||||
|
||||
match DeleteTenantFlow::resume(
|
||||
deletion,
|
||||
&tenant_clone,
|
||||
init_order.as_ref(),
|
||||
tenants,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(err) => {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(err));
|
||||
return Ok(());
|
||||
}
|
||||
Ok(()) => return Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
let background_jobs_can_start =
|
||||
init_order.as_ref().map(|x| &x.background_jobs_can_start);
|
||||
|
||||
match tenant_clone.load(init_order.as_ref(), &ctx).await {
|
||||
Ok(()) => {
|
||||
debug!("load finished, activating");
|
||||
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
|
||||
debug!("load finished",);
|
||||
|
||||
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
|
||||
}
|
||||
Err(err) => {
|
||||
error!("load failed, setting tenant state to Broken: {err:?}");
|
||||
tenant_clone.state.send_modify(|state| {
|
||||
assert_eq!(*state, TenantState::Loading, "the loading task owns the tenant state until activation is complete");
|
||||
*state = TenantState::broken_from_reason(err.to_string());
|
||||
});
|
||||
}
|
||||
Err(err) => make_broken(&tenant_clone, err),
|
||||
}
|
||||
Ok(())
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.instrument({
|
||||
let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
|
||||
@@ -877,6 +963,8 @@ impl Tenant {
|
||||
)
|
||||
})?;
|
||||
|
||||
info!("Found deletion mark for timeline {}", timeline_id);
|
||||
|
||||
match load_metadata(self.conf, &self.tenant_id, &timeline_id) {
|
||||
Ok(metadata) => {
|
||||
timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
|
||||
@@ -966,9 +1054,11 @@ impl Tenant {
|
||||
|
||||
// Sort the array of timeline IDs into tree-order, so that parent comes before
|
||||
// all its children.
|
||||
tree_sort_timelines(timelines_to_load).map(|sorted_timelines| TenantDirectoryScan {
|
||||
sorted_timelines_to_load: sorted_timelines,
|
||||
timelines_to_resume_deletion,
|
||||
tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
|
||||
TenantDirectoryScan {
|
||||
sorted_timelines_to_load: sorted_timelines,
|
||||
timelines_to_resume_deletion,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1682,7 +1772,7 @@ impl Tenant {
|
||||
// It's mesed up.
|
||||
// we just ignore the failure to stop
|
||||
|
||||
match self.set_stopping(shutdown_progress).await {
|
||||
match self.set_stopping(shutdown_progress, false).await {
|
||||
Ok(()) => {}
|
||||
Err(SetStoppingError::Broken) => {
|
||||
// assume that this is acceptable
|
||||
@@ -1722,18 +1812,25 @@ impl Tenant {
|
||||
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
|
||||
///
|
||||
/// This function is not cancel-safe!
|
||||
async fn set_stopping(&self, progress: completion::Barrier) -> Result<(), SetStoppingError> {
|
||||
///
|
||||
/// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
|
||||
async fn set_stopping(
|
||||
&self,
|
||||
progress: completion::Barrier,
|
||||
allow_transition_from_loading: bool,
|
||||
) -> Result<(), SetStoppingError> {
|
||||
let mut rx = self.state.subscribe();
|
||||
|
||||
// cannot stop before we're done activating, so wait out until we're done activating
|
||||
rx.wait_for(|state| match state {
|
||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
TenantState::Activating(_) | TenantState::Attaching => {
|
||||
info!(
|
||||
"waiting for {} to turn Active|Broken|Stopping",
|
||||
<&'static str>::from(state)
|
||||
);
|
||||
false
|
||||
}
|
||||
TenantState::Loading => allow_transition_from_loading,
|
||||
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
|
||||
})
|
||||
.await
|
||||
@@ -1742,9 +1839,16 @@ impl Tenant {
|
||||
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
|
||||
let mut err = None;
|
||||
let stopping = self.state.send_if_modified(|current_state| match current_state {
|
||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
TenantState::Activating(_) | TenantState::Attaching => {
|
||||
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
||||
}
|
||||
TenantState::Loading => {
|
||||
if !allow_transition_from_loading {
|
||||
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
||||
};
|
||||
*current_state = TenantState::Stopping { progress };
|
||||
true
|
||||
}
|
||||
TenantState::Active => {
|
||||
// FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
|
||||
// are created after the transition to Stopping. That's harmless, as the Timelines
|
||||
@@ -1813,6 +1917,10 @@ impl Tenant {
|
||||
.expect("cannot drop self.state while on a &self method");
|
||||
|
||||
// we now know we're done activating, let's see whether this task is the winner to transition into Broken
|
||||
self.set_broken_no_wait(reason)
|
||||
}
|
||||
|
||||
pub(crate) fn set_broken_no_wait(&self, reason: String) {
|
||||
self.state.send_modify(|current_state| {
|
||||
match *current_state {
|
||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
@@ -1878,22 +1986,28 @@ impl Tenant {
|
||||
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
|
||||
/// perform a topological sort, so that the parent of each timeline comes
|
||||
/// before the children.
|
||||
fn tree_sort_timelines(
|
||||
timelines: HashMap<TimelineId, TimelineMetadata>,
|
||||
) -> anyhow::Result<Vec<(TimelineId, TimelineMetadata)>> {
|
||||
/// E extracts the ancestor from T
|
||||
/// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc.
|
||||
fn tree_sort_timelines<T, E>(
|
||||
timelines: HashMap<TimelineId, T>,
|
||||
extractor: E,
|
||||
) -> anyhow::Result<Vec<(TimelineId, T)>>
|
||||
where
|
||||
E: Fn(&T) -> Option<TimelineId>,
|
||||
{
|
||||
let mut result = Vec::with_capacity(timelines.len());
|
||||
|
||||
let mut now = Vec::with_capacity(timelines.len());
|
||||
// (ancestor, children)
|
||||
let mut later: HashMap<TimelineId, Vec<(TimelineId, TimelineMetadata)>> =
|
||||
let mut later: HashMap<TimelineId, Vec<(TimelineId, T)>> =
|
||||
HashMap::with_capacity(timelines.len());
|
||||
|
||||
for (timeline_id, metadata) in timelines {
|
||||
if let Some(ancestor_id) = metadata.ancestor_timeline() {
|
||||
for (timeline_id, value) in timelines {
|
||||
if let Some(ancestor_id) = extractor(&value) {
|
||||
let children = later.entry(ancestor_id).or_default();
|
||||
children.push((timeline_id, metadata));
|
||||
children.push((timeline_id, value));
|
||||
} else {
|
||||
now.push((timeline_id, metadata));
|
||||
now.push((timeline_id, value));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2062,7 +2176,7 @@ impl Tenant {
|
||||
remote_client,
|
||||
pg_version,
|
||||
initial_logical_size_can_start.cloned(),
|
||||
initial_logical_size_attempt.cloned(),
|
||||
initial_logical_size_attempt.cloned().flatten(),
|
||||
state,
|
||||
);
|
||||
|
||||
@@ -2146,6 +2260,7 @@ impl Tenant {
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2162,6 +2277,7 @@ impl Tenant {
|
||||
// FIXME If the config file is not found, assume that we're attaching
|
||||
// a detached tenant and config is passed via attach command.
|
||||
// https://github.com/neondatabase/neon/issues/1555
|
||||
// OR: we're loading after incomplete deletion that managed to remove config.
|
||||
if !target_config_path.exists() {
|
||||
info!("tenant config not found in {target_config_display}");
|
||||
return Ok(TenantConfOpt::default());
|
||||
|
||||
546
pageserver/src/tenant/delete.rs
Normal file
546
pageserver/src/tenant/delete.rs
Normal file
@@ -0,0 +1,546 @@
|
||||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use pageserver_api::models::TenantState;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tracing::{error, info, instrument, warn, Instrument, Span};
|
||||
|
||||
use utils::{
|
||||
completion, crashsafe, fs_ext,
|
||||
id::{TenantId, TimelineId},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
context::RequestContext,
|
||||
task_mgr::{self, TaskKind},
|
||||
InitializationOrder,
|
||||
};
|
||||
|
||||
use super::{
|
||||
mgr::{GetTenantError, TenantsMap},
|
||||
span,
|
||||
timeline::delete::DeleteTimelineFlow,
|
||||
tree_sort_timelines, DeleteTimelineError, Tenant,
|
||||
};
|
||||
|
||||
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u8 = 3;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DeleteTenantError {
|
||||
#[error("GetTenant {0}")]
|
||||
Get(#[from] GetTenantError),
|
||||
|
||||
#[error("Invalid state {0}. Expected Active or Broken")]
|
||||
InvalidState(TenantState),
|
||||
|
||||
#[error("Tenant deletion is already in progress")]
|
||||
AlreadyInProgress,
|
||||
|
||||
#[error("Timeline {0}")]
|
||||
Timeline(#[from] DeleteTimelineError),
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
type DeletionGuard = tokio::sync::OwnedMutexGuard<DeleteTenantFlow>;
|
||||
|
||||
fn remote_tenant_delete_mark_path(
|
||||
conf: &PageServerConf,
|
||||
tenant_id: &TenantId,
|
||||
) -> anyhow::Result<RemotePath> {
|
||||
let tenant_remote_path = conf
|
||||
.tenant_path(tenant_id)
|
||||
.strip_prefix(&conf.workdir)
|
||||
.context("Failed to strip workdir prefix")
|
||||
.and_then(RemotePath::new)
|
||||
.context("tenant path")?;
|
||||
Ok(tenant_remote_path.join(Path::new("deleted")))
|
||||
}
|
||||
|
||||
async fn create_remote_delete_mark(
|
||||
conf: &PageServerConf,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let remote_mark_path = remote_tenant_delete_mark_path(conf, tenant_id)?;
|
||||
|
||||
let data: &[u8] = &[];
|
||||
remote_storage
|
||||
.upload(data, 0, &remote_mark_path, None)
|
||||
.await
|
||||
.context("mark upload")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_local_delete_mark(
|
||||
conf: &PageServerConf,
|
||||
tenant_id: &TenantId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let marker_path = conf.tenant_deleted_mark_file_path(tenant_id);
|
||||
|
||||
// Note: we're ok to replace existing file.
|
||||
let _ = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(&marker_path)
|
||||
.with_context(|| format!("could not create delete marker file {marker_path:?}"))?;
|
||||
|
||||
crashsafe::fsync_file_and_parent(&marker_path).context("sync_mark")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn schedule_ordered_timeline_deletions(
|
||||
tenant: &Arc<Tenant>,
|
||||
) -> Result<Vec<(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>, TimelineId)>, DeleteTenantError> {
|
||||
// Tenant is stopping at this point. We know it will be deleted.
|
||||
// No new timelines should be created.
|
||||
// Tree sort timelines to delete from leafs to the root.
|
||||
// NOTE: by calling clone we release the mutex which creates a possibility for a race: pending deletion
|
||||
// can complete and remove timeline from the map in between our call to clone
|
||||
// and `DeleteTimelineFlow::run`, so `run` wont find timeline in `timelines` map.
|
||||
// timelines.lock is currently synchronous so we cant hold it across await point.
|
||||
// So just ignore NotFound error if we get it from `run`.
|
||||
// Beware: in case it becomes async and we try to hold it here, `run` also locks it, which can create a deadlock.
|
||||
let timelines = tenant.timelines.lock().unwrap().clone();
|
||||
let sorted =
|
||||
tree_sort_timelines(timelines, |t| t.get_ancestor_timeline_id()).context("tree sort")?;
|
||||
|
||||
let mut already_running_deletions = vec![];
|
||||
|
||||
for (timeline_id, _) in sorted.into_iter().rev() {
|
||||
if let Err(e) = DeleteTimelineFlow::run(tenant, timeline_id, true).await {
|
||||
match e {
|
||||
DeleteTimelineError::NotFound => {
|
||||
// Timeline deletion finished after call to clone above but before call
|
||||
// to `DeleteTimelineFlow::run` and removed timeline from the map.
|
||||
continue;
|
||||
}
|
||||
DeleteTimelineError::AlreadyInProgress(guard) => {
|
||||
already_running_deletions.push((guard, timeline_id));
|
||||
continue;
|
||||
}
|
||||
e => return Err(DeleteTenantError::Timeline(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(already_running_deletions)
|
||||
}
|
||||
|
||||
async fn ensure_timelines_dir_empty(timelines_path: &Path) -> Result<(), DeleteTenantError> {
|
||||
// Assert timelines dir is empty.
|
||||
if !fs_ext::is_directory_empty(timelines_path).await? {
|
||||
// Display first 10 items in directory
|
||||
let list = &fs_ext::list_dir(timelines_path).await.context("list_dir")?[..10];
|
||||
return Err(DeleteTenantError::Other(anyhow::anyhow!(
|
||||
"Timelines directory is not empty after all timelines deletion: {list:?}"
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn remove_tenant_remote_delete_mark(
|
||||
conf: &PageServerConf,
|
||||
remote_storage: Option<&GenericRemoteStorage>,
|
||||
tenant_id: &TenantId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
if let Some(remote_storage) = remote_storage {
|
||||
remote_storage
|
||||
.delete(&remote_tenant_delete_mark_path(conf, tenant_id)?)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Cleanup fs traces: tenant config, timelines dir local delete mark, tenant dir
|
||||
async fn cleanup_remaining_fs_traces(
|
||||
conf: &PageServerConf,
|
||||
tenant_id: &TenantId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let rm = |p: PathBuf, is_dir: bool| async move {
|
||||
if is_dir {
|
||||
tokio::fs::remove_dir(&p).await
|
||||
} else {
|
||||
tokio::fs::remove_file(&p).await
|
||||
}
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.with_context(|| {
|
||||
let to_display = p.display();
|
||||
format!("failed to delete {to_display}")
|
||||
})
|
||||
};
|
||||
|
||||
rm(conf.tenant_config_path(tenant_id), false).await?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-remove-timelines-dir", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-remove-timelines-dir"
|
||||
))?
|
||||
});
|
||||
|
||||
rm(conf.timelines_path(tenant_id), true).await?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-remove-deleted-mark", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-remove-deleted-mark"
|
||||
))?
|
||||
});
|
||||
|
||||
rm(conf.tenant_deleted_mark_file_path(tenant_id), false).await?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-remove-tenant-dir", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-remove-tenant-dir"
|
||||
))?
|
||||
});
|
||||
|
||||
rm(conf.tenant_path(tenant_id), true).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Orchestrates tenant shut down of all tasks, removes its in-memory structures,
|
||||
/// and deletes its data from both disk and s3.
|
||||
/// The sequence of steps:
|
||||
/// 1. Upload remote deletion mark.
|
||||
/// 2. Create local mark file.
|
||||
/// 3. Shutdown tasks
|
||||
/// 4. Run ordered timeline deletions
|
||||
/// 5. Wait for timeline deletion operations that were scheduled before tenant deletion was requested
|
||||
/// 6. Remove remote mark
|
||||
/// 7. Cleanup remaining fs traces, tenant dir, config, timelines dir, local delete mark
|
||||
/// It is resumable from any step in case a crash/restart occurs.
|
||||
/// There are three entrypoints to the process:
|
||||
/// 1. [`DeleteTenantFlow::run`] this is the main one called by a management api handler.
|
||||
/// 2. [`DeleteTenantFlow::resume`] is called during restarts when local or remote deletion marks are still there.
|
||||
/// Note the only other place that messes around timeline delete mark is the `Tenant::spawn_load` function.
|
||||
#[derive(Default)]
|
||||
pub enum DeleteTenantFlow {
|
||||
#[default]
|
||||
NotStarted,
|
||||
InProgress,
|
||||
Finished,
|
||||
}
|
||||
|
||||
impl DeleteTenantFlow {
|
||||
// These steps are run in the context of management api request handler.
|
||||
// Long running steps are continued to run in the background.
|
||||
// NB: If this fails half-way through, and is retried, the retry will go through
|
||||
// all the same steps again. Make sure the code here is idempotent, and don't
|
||||
// error out if some of the shutdown tasks have already been completed!
|
||||
// NOTE: static needed for background part.
|
||||
// We assume that calling code sets up the span with tenant_id.
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn run(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let (tenant, mut guard) = Self::prepare(tenants, tenant_id).await?;
|
||||
|
||||
if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await {
|
||||
tenant.set_broken(format!("{e:#}")).await;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Self::schedule_background(guard, conf, remote_storage, tenants, tenant);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Helper function needed to be able to match once on returned error and transition tenant into broken state.
|
||||
// This is needed because tenant.shutwodn is not idempotent. If tenant state is set to stopping another call to tenant.shutdown
|
||||
// will result in an error, but here we need to be able to retry shutdown when tenant deletion is retried.
|
||||
// So the solution is to set tenant state to broken.
|
||||
async fn run_inner(
|
||||
guard: &mut OwnedMutexGuard<Self>,
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<&GenericRemoteStorage>,
|
||||
tenant: &Tenant,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
guard.mark_in_progress()?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-create-remote-mark", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-create-remote-mark"
|
||||
))?
|
||||
});
|
||||
|
||||
// IDEA: implement detach as delete without remote storage. Then they would use the same lock (deletion_progress) so wont contend.
|
||||
// Though sounds scary, different mark name?
|
||||
// Detach currently uses remove_dir_all so in case of a crash we can end up in a weird state.
|
||||
if let Some(remote_storage) = &remote_storage {
|
||||
create_remote_delete_mark(conf, remote_storage, &tenant.tenant_id)
|
||||
.await
|
||||
.context("remote_mark")?
|
||||
}
|
||||
|
||||
fail::fail_point!("tenant-delete-before-create-local-mark", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-create-local-mark"
|
||||
))?
|
||||
});
|
||||
|
||||
create_local_delete_mark(conf, &tenant.tenant_id)
|
||||
.await
|
||||
.context("local delete mark")?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-background", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-background"
|
||||
))?
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn mark_in_progress(&mut self) -> anyhow::Result<()> {
|
||||
match self {
|
||||
Self::Finished => anyhow::bail!("Bug. Is in finished state"),
|
||||
Self::InProgress { .. } => { /* We're in a retry */ }
|
||||
Self::NotStarted => { /* Fresh start */ }
|
||||
}
|
||||
|
||||
*self = Self::InProgress;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn should_resume_deletion(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<&GenericRemoteStorage>,
|
||||
tenant: &Tenant,
|
||||
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
|
||||
let acquire = |t: &Tenant| {
|
||||
Some(
|
||||
Arc::clone(&t.delete_progress)
|
||||
.try_lock_owned()
|
||||
.expect("we're the only owner during init"),
|
||||
)
|
||||
};
|
||||
|
||||
let tenant_id = tenant.tenant_id;
|
||||
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
|
||||
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
|
||||
return Ok(acquire(tenant));
|
||||
}
|
||||
|
||||
// If remote storage is there we rely on it
|
||||
if let Some(remote_storage) = remote_storage {
|
||||
let remote_mark_path = remote_tenant_delete_mark_path(conf, &tenant_id)?;
|
||||
|
||||
let attempt = 1;
|
||||
loop {
|
||||
match remote_storage.download(&remote_mark_path).await {
|
||||
Ok(_) => return Ok(acquire(tenant)),
|
||||
Err(e) => {
|
||||
if matches!(e, DownloadError::NotFound) {
|
||||
return Ok(None);
|
||||
}
|
||||
if attempt > SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS {
|
||||
return Err(anyhow::anyhow!(e))?;
|
||||
}
|
||||
|
||||
warn!(
|
||||
"failed to fetch tenant deletion mark at {} attempt {}",
|
||||
&remote_mark_path, attempt
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub(crate) async fn resume(
|
||||
guard: DeletionGuard,
|
||||
tenant: &Arc<Tenant>,
|
||||
init_order: Option<&InitializationOrder>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let (_, progress) = completion::channel();
|
||||
|
||||
tenant
|
||||
.set_stopping(progress, true)
|
||||
.await
|
||||
.expect("cant be stopping or broken");
|
||||
|
||||
// Do not consume valuable resources during the load phase, continue deletion once init phase is complete.
|
||||
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
|
||||
if let Some(background) = background_jobs_can_start {
|
||||
info!("waiting for backgound jobs barrier");
|
||||
background.clone().wait().await;
|
||||
info!("ready for backgound jobs barrier");
|
||||
}
|
||||
|
||||
// Tenant may not be loadable if we fail late in cleanup_remaining_fs_traces (e g remove timelines dir)
|
||||
let timelines_path = tenant.conf.timelines_path(&tenant.tenant_id);
|
||||
if timelines_path.exists() {
|
||||
tenant.load(init_order, ctx).await.context("load")?;
|
||||
}
|
||||
|
||||
Self::background(
|
||||
guard,
|
||||
tenant.conf,
|
||||
tenant.remote_storage.clone(),
|
||||
tenants,
|
||||
tenant,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn prepare(
|
||||
tenants: &tokio::sync::RwLock<TenantsMap>,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(Arc<Tenant>, tokio::sync::OwnedMutexGuard<Self>), DeleteTenantError> {
|
||||
let m = tenants.read().await;
|
||||
|
||||
let tenant = m
|
||||
.get(&tenant_id)
|
||||
.ok_or(GetTenantError::NotFound(tenant_id))?;
|
||||
|
||||
// FIXME: unsure about active only. Our init jobs may not be cancellable properly,
|
||||
// so at least for now allow deletions only for active tenants. TODO recheck
|
||||
// Broken and Stopping is needed for retries.
|
||||
if !matches!(
|
||||
tenant.current_state(),
|
||||
TenantState::Active | TenantState::Broken { .. }
|
||||
) {
|
||||
return Err(DeleteTenantError::InvalidState(tenant.current_state()));
|
||||
}
|
||||
|
||||
let guard = Arc::clone(&tenant.delete_progress)
|
||||
.try_lock_owned()
|
||||
.map_err(|_| DeleteTenantError::AlreadyInProgress)?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-shutdown", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: tenant-delete-before-shutdown"))?
|
||||
});
|
||||
|
||||
// make pageserver shutdown not to wait for our completion
|
||||
let (_, progress) = completion::channel();
|
||||
|
||||
// It would be good to only set stopping here and continue shutdown in the background, but shutdown is not idempotent.
|
||||
// i e it is an error to do:
|
||||
// tenant.set_stopping
|
||||
// tenant.shutdown
|
||||
// Its also bad that we're holding tenants.read here.
|
||||
// TODO relax set_stopping to be idempotent?
|
||||
if tenant.shutdown(progress, false).await.is_err() {
|
||||
return Err(DeleteTenantError::Other(anyhow::anyhow!(
|
||||
"tenant shutdown is already in progress"
|
||||
)));
|
||||
}
|
||||
|
||||
Ok((Arc::clone(tenant), guard))
|
||||
}
|
||||
|
||||
fn schedule_background(
|
||||
guard: OwnedMutexGuard<Self>,
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
tenant: Arc<Tenant>,
|
||||
) {
|
||||
let tenant_id = tenant.tenant_id;
|
||||
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::TimelineDeletionWorker,
|
||||
Some(tenant_id),
|
||||
None,
|
||||
"tenant_delete",
|
||||
false,
|
||||
async move {
|
||||
if let Err(err) =
|
||||
Self::background(guard, conf, remote_storage, tenants, &tenant).await
|
||||
{
|
||||
error!("Error: {err:#}");
|
||||
tenant.set_broken(format!("{err:#}")).await;
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
.instrument({
|
||||
let span = tracing::info_span!(parent: None, "delete_tenant", tenant_id=%tenant_id);
|
||||
span.follows_from(Span::current());
|
||||
span
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
async fn background(
|
||||
mut guard: OwnedMutexGuard<Self>,
|
||||
conf: &PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
tenant: &Arc<Tenant>,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
// Tree sort timelines, schedule delete for them. Mention retries from the console side.
|
||||
// Note that if deletion fails we dont mark timelines as broken,
|
||||
// the whole tenant will become broken as by `Self::schedule_background` logic
|
||||
let already_running_timeline_deletions = schedule_ordered_timeline_deletions(tenant)
|
||||
.await
|
||||
.context("schedule_ordered_timeline_deletions")?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-polling-ongoing-deletions", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-polling-ongoing-deletions"
|
||||
))?
|
||||
});
|
||||
|
||||
// Wait for deletions that were already running at the moment when tenant deletion was requested.
|
||||
// When we can lock deletion guard it means that corresponding timeline deletion finished.
|
||||
for (guard, timeline_id) in already_running_timeline_deletions {
|
||||
let flow = guard.lock().await;
|
||||
if !flow.is_finished() {
|
||||
return Err(DeleteTenantError::Other(anyhow::anyhow!(
|
||||
"already running timeline deletion failed: {timeline_id}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
let timelines_path = conf.timelines_path(&tenant.tenant_id);
|
||||
// May not exist if we fail in cleanup_remaining_fs_traces after removing it
|
||||
if timelines_path.exists() {
|
||||
// sanity check to guard against layout changes
|
||||
ensure_timelines_dir_empty(&timelines_path)
|
||||
.await
|
||||
.context("timelines dir not empty")?;
|
||||
}
|
||||
|
||||
remove_tenant_remote_delete_mark(conf, remote_storage.as_ref(), &tenant.tenant_id).await?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-cleanup-remaining-fs-traces", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-cleanup-remaining-fs-traces"
|
||||
))?
|
||||
});
|
||||
|
||||
cleanup_remaining_fs_traces(conf, &tenant.tenant_id)
|
||||
.await
|
||||
.context("cleanup_remaining_fs_traces")?;
|
||||
|
||||
let mut locked = tenants.write().await;
|
||||
if locked.remove(&tenant.tenant_id).is_none() {
|
||||
warn!("Tenant got removed from tenants map during deletion");
|
||||
};
|
||||
|
||||
*guard = Self::Finished;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -20,6 +20,7 @@
|
||||
//!
|
||||
use byteorder::{ReadBytesExt, BE};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use either::Either;
|
||||
use hex;
|
||||
use std::{cmp::Ordering, io, result};
|
||||
use thiserror::Error;
|
||||
@@ -256,103 +257,77 @@ where
|
||||
where
|
||||
V: FnMut(&[u8], u64) -> bool,
|
||||
{
|
||||
self.search_recurse(self.root_blk, search_key, dir, &mut visitor)
|
||||
}
|
||||
let mut stack = Vec::new();
|
||||
stack.push((self.root_blk, None));
|
||||
while let Some((node_blknum, opt_iter)) = stack.pop() {
|
||||
// Locate the node.
|
||||
let node_buf = self.reader.read_blk(self.start_blk + node_blknum)?;
|
||||
|
||||
fn search_recurse<V>(
|
||||
&self,
|
||||
node_blknum: u32,
|
||||
search_key: &[u8; L],
|
||||
dir: VisitDirection,
|
||||
visitor: &mut V,
|
||||
) -> Result<bool>
|
||||
where
|
||||
V: FnMut(&[u8], u64) -> bool,
|
||||
{
|
||||
// Locate the node.
|
||||
let node_buf = self.reader.read_blk(self.start_blk + node_blknum)?;
|
||||
let node = OnDiskNode::deparse(node_buf.as_ref())?;
|
||||
let prefix_len = node.prefix_len as usize;
|
||||
let suffix_len = node.suffix_len as usize;
|
||||
|
||||
let node = OnDiskNode::deparse(node_buf.as_ref())?;
|
||||
let prefix_len = node.prefix_len as usize;
|
||||
let suffix_len = node.suffix_len as usize;
|
||||
assert!(node.num_children > 0);
|
||||
|
||||
assert!(node.num_children > 0);
|
||||
let mut keybuf = Vec::new();
|
||||
keybuf.extend(node.prefix);
|
||||
keybuf.resize(prefix_len + suffix_len, 0);
|
||||
|
||||
let mut keybuf = Vec::new();
|
||||
keybuf.extend(node.prefix);
|
||||
keybuf.resize(prefix_len + suffix_len, 0);
|
||||
|
||||
if dir == VisitDirection::Forwards {
|
||||
// Locate the first match
|
||||
let mut idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
|
||||
Ok(idx) => idx,
|
||||
Err(idx) => {
|
||||
if node.level == 0 {
|
||||
// Imagine that the node contains the following keys:
|
||||
//
|
||||
// 1
|
||||
// 3 <-- idx
|
||||
// 5
|
||||
//
|
||||
// If the search key is '2' and there is exact match,
|
||||
// the binary search would return the index of key
|
||||
// '3'. That's cool, '3' is the first key to return.
|
||||
let mut iter = if let Some(iter) = opt_iter {
|
||||
iter
|
||||
} else if dir == VisitDirection::Forwards {
|
||||
// Locate the first match
|
||||
let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
|
||||
Ok(idx) => idx,
|
||||
Err(idx) => {
|
||||
if node.level == 0 {
|
||||
// Imagine that the node contains the following keys:
|
||||
//
|
||||
// 1
|
||||
// 3 <-- idx
|
||||
// 5
|
||||
//
|
||||
// If the search key is '2' and there is exact match,
|
||||
// the binary search would return the index of key
|
||||
// '3'. That's cool, '3' is the first key to return.
|
||||
idx
|
||||
} else {
|
||||
// This is an internal page, so each key represents a lower
|
||||
// bound for what's in the child page. If there is no exact
|
||||
// match, we have to return the *previous* entry.
|
||||
//
|
||||
// 1 <-- return this
|
||||
// 3 <-- idx
|
||||
// 5
|
||||
idx.saturating_sub(1)
|
||||
}
|
||||
}
|
||||
};
|
||||
Either::Left(idx..node.num_children.into())
|
||||
} else {
|
||||
let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
|
||||
Ok(idx) => {
|
||||
// Exact match. That's the first entry to return, and walk
|
||||
// backwards from there.
|
||||
idx
|
||||
} else {
|
||||
// This is an internal page, so each key represents a lower
|
||||
// bound for what's in the child page. If there is no exact
|
||||
// match, we have to return the *previous* entry.
|
||||
//
|
||||
// 1 <-- return this
|
||||
// 3 <-- idx
|
||||
// 5
|
||||
idx.saturating_sub(1)
|
||||
}
|
||||
}
|
||||
};
|
||||
// idx points to the first match now. Keep going from there
|
||||
let mut key_off = idx * suffix_len;
|
||||
while idx < node.num_children as usize {
|
||||
let suffix = &node.keys[key_off..key_off + suffix_len];
|
||||
keybuf[prefix_len..].copy_from_slice(suffix);
|
||||
let value = node.value(idx);
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if node.level == 0 {
|
||||
// leaf
|
||||
if !visitor(&keybuf, value.to_u64()) {
|
||||
return Ok(false);
|
||||
Err(idx) => {
|
||||
// No exact match. The binary search returned the index of the
|
||||
// first key that's > search_key. Back off by one, and walk
|
||||
// backwards from there.
|
||||
if let Some(idx) = idx.checked_sub(1) {
|
||||
idx
|
||||
} else {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if !self.search_recurse(value.to_blknum(), search_key, dir, visitor)? {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
idx += 1;
|
||||
key_off += suffix_len;
|
||||
}
|
||||
} else {
|
||||
let mut idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
|
||||
Ok(idx) => {
|
||||
// Exact match. That's the first entry to return, and walk
|
||||
// backwards from there. (The loop below starts from 'idx -
|
||||
// 1', so add one here to compensate.)
|
||||
idx + 1
|
||||
}
|
||||
Err(idx) => {
|
||||
// No exact match. The binary search returned the index of the
|
||||
// first key that's > search_key. Back off by one, and walk
|
||||
// backwards from there. (The loop below starts from idx - 1,
|
||||
// so we don't need to subtract one here)
|
||||
idx
|
||||
}
|
||||
};
|
||||
Either::Right((0..=idx).rev())
|
||||
};
|
||||
|
||||
// idx points to the first match + 1 now. Keep going from there.
|
||||
let mut key_off = idx * suffix_len;
|
||||
while idx > 0 {
|
||||
idx -= 1;
|
||||
key_off -= suffix_len;
|
||||
// idx points to the first match now. Keep going from there
|
||||
while let Some(idx) = iter.next() {
|
||||
let key_off = idx * suffix_len;
|
||||
let suffix = &node.keys[key_off..key_off + suffix_len];
|
||||
keybuf[prefix_len..].copy_from_slice(suffix);
|
||||
let value = node.value(idx);
|
||||
@@ -363,12 +338,8 @@ where
|
||||
return Ok(false);
|
||||
}
|
||||
} else {
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if !self.search_recurse(value.to_blknum(), search_key, dir, visitor)? {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
if idx == 0 {
|
||||
stack.push((node_blknum, Some(iter)));
|
||||
stack.push((value.to_blknum(), None));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ impl BatchedUpdates<'_> {
|
||||
///
|
||||
/// This should be called when the corresponding file on disk has been deleted.
|
||||
///
|
||||
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
|
||||
self.layer_map.remove_historic_noflush(layer_desc)
|
||||
}
|
||||
|
||||
@@ -253,11 +253,11 @@ impl LayerMap {
|
||||
///
|
||||
/// Helper function for BatchedUpdates::remove_historic
|
||||
///
|
||||
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
|
||||
self.historic
|
||||
.remove(historic_layer_coverage::LayerKey::from(&layer_desc));
|
||||
.remove(historic_layer_coverage::LayerKey::from(layer_desc));
|
||||
let layer_key = layer_desc.key();
|
||||
if Self::is_l0(&layer_desc) {
|
||||
if Self::is_l0(layer_desc) {
|
||||
let len_before = self.l0_delta_layers.len();
|
||||
let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
|
||||
l0_delta_layers.retain(|other| other.key() != layer_key);
|
||||
@@ -766,8 +766,7 @@ mod tests {
|
||||
expected_in_counts
|
||||
);
|
||||
|
||||
map.batch_update()
|
||||
.remove_historic(downloaded.layer_desc().clone());
|
||||
map.batch_update().remove_historic(downloaded.layer_desc());
|
||||
assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
|
||||
}
|
||||
|
||||
|
||||
@@ -20,17 +20,19 @@ use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::delete::DeleteTenantFlow;
|
||||
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
|
||||
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME};
|
||||
|
||||
use utils::fs_ext::PathExt;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::delete::DeleteTenantError;
|
||||
use super::timeline::delete::DeleteTimelineFlow;
|
||||
|
||||
/// The tenants known to the pageserver.
|
||||
/// The enum variants are used to distinguish the different states that the pageserver can be in.
|
||||
enum TenantsMap {
|
||||
pub(crate) enum TenantsMap {
|
||||
/// [`init_tenant_mgr`] is not done yet.
|
||||
Initializing,
|
||||
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
|
||||
@@ -42,13 +44,13 @@ enum TenantsMap {
|
||||
}
|
||||
|
||||
impl TenantsMap {
|
||||
fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
|
||||
pub(crate) fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
|
||||
match self {
|
||||
TenantsMap::Initializing => None,
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
|
||||
}
|
||||
}
|
||||
fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
|
||||
pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
|
||||
match self {
|
||||
TenantsMap::Initializing => None,
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
|
||||
@@ -97,7 +99,9 @@ pub async fn init_tenant_mgr(
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// This case happens if we crash during attach before creating the attach marker file
|
||||
// This case happens if we:
|
||||
// * crash during attach before creating the attach marker file
|
||||
// * crash during tenant delete before removing tenant directory
|
||||
let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
|
||||
format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
|
||||
})?;
|
||||
@@ -124,6 +128,7 @@ pub async fn init_tenant_mgr(
|
||||
broker_client.clone(),
|
||||
remote_storage.clone(),
|
||||
Some(init_order.clone()),
|
||||
&TENANTS,
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
@@ -154,12 +159,13 @@ pub async fn init_tenant_mgr(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn schedule_local_tenant_processing(
|
||||
pub(crate) fn schedule_local_tenant_processing(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_path: &Path,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
init_order: Option<InitializationOrder>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
anyhow::ensure!(
|
||||
@@ -219,6 +225,7 @@ pub fn schedule_local_tenant_processing(
|
||||
broker_client,
|
||||
remote_storage,
|
||||
init_order,
|
||||
tenants,
|
||||
ctx,
|
||||
)
|
||||
};
|
||||
@@ -356,7 +363,7 @@ pub async fn create_tenant(
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
let created_tenant =
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?;
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, &TENANTS, ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
@@ -417,6 +424,14 @@ pub async fn get_tenant(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
DeleteTenantFlow::run(conf, remote_storage, &TENANTS, tenant_id).await
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DeleteTimelineError {
|
||||
#[error("Tenant {0}")]
|
||||
@@ -432,7 +447,7 @@ pub async fn delete_timeline(
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
let tenant = get_tenant(tenant_id, true).await?;
|
||||
DeleteTimelineFlow::run(&tenant, timeline_id).await?;
|
||||
DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -507,7 +522,7 @@ pub async fn load_tenant(
|
||||
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
|
||||
}
|
||||
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, ctx)
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, &TENANTS, ctx)
|
||||
.with_context(|| {
|
||||
format!("Failed to schedule tenant processing in path {tenant_path:?}")
|
||||
})?;
|
||||
@@ -588,7 +603,7 @@ pub async fn attach_tenant(
|
||||
.context("check for attach marker file existence")?;
|
||||
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
|
||||
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?;
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, &TENANTS, ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
|
||||
@@ -401,16 +401,6 @@ pub trait AsLayerDesc {
|
||||
/// An image layer is a snapshot of all the data in a key-range, at a single
|
||||
/// LSN.
|
||||
pub trait PersistentLayer: Layer + AsLayerDesc {
|
||||
/// Identify the tenant this layer belongs to
|
||||
fn get_tenant_id(&self) -> TenantId {
|
||||
self.layer_desc().tenant_id
|
||||
}
|
||||
|
||||
/// Identify the timeline this layer belongs to
|
||||
fn get_timeline_id(&self) -> TimelineId {
|
||||
self.layer_desc().timeline_id
|
||||
}
|
||||
|
||||
/// File name used for this layer, both in the pageserver's local filesystem
|
||||
/// state as well as in the remote storage.
|
||||
fn filename(&self) -> LayerFileName {
|
||||
@@ -436,14 +426,6 @@ pub trait PersistentLayer: Layer + AsLayerDesc {
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns None if the layer file size is not known.
|
||||
///
|
||||
/// Should not change over the lifetime of the layer object because
|
||||
/// current_physical_size is computed as the som of this value.
|
||||
fn file_size(&self) -> u64 {
|
||||
self.layer_desc().file_size
|
||||
}
|
||||
|
||||
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo;
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats;
|
||||
|
||||
@@ -90,14 +90,30 @@ pub struct Summary {
|
||||
|
||||
impl From<&DeltaLayer> for Summary {
|
||||
fn from(layer: &DeltaLayer) -> Self {
|
||||
Self::expected(
|
||||
layer.desc.tenant_id,
|
||||
layer.desc.timeline_id,
|
||||
layer.desc.key_range.clone(),
|
||||
layer.desc.lsn_range.clone(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Summary {
|
||||
pub(super) fn expected(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
keys: Range<Key>,
|
||||
lsns: Range<Lsn>,
|
||||
) -> Self {
|
||||
Self {
|
||||
magic: DELTA_FILE_MAGIC,
|
||||
format_version: STORAGE_FORMAT_VERSION,
|
||||
|
||||
tenant_id: layer.desc.tenant_id,
|
||||
timeline_id: layer.desc.timeline_id,
|
||||
key_range: layer.desc.key_range.clone(),
|
||||
lsn_range: layer.desc.lsn_range.clone(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
key_range: keys,
|
||||
lsn_range: lsns,
|
||||
|
||||
index_start_blk: 0,
|
||||
index_root_blk: 0,
|
||||
@@ -108,12 +124,10 @@ impl From<&DeltaLayer> for Summary {
|
||||
// Flag indicating that this version initialize the page
|
||||
const WILL_INIT: u64 = 1;
|
||||
|
||||
///
|
||||
/// Struct representing reference to BLOB in layers. Reference contains BLOB
|
||||
/// offset, and for WAL records it also contains `will_init` flag. The flag
|
||||
/// helps to determine the range of records that needs to be applied, without
|
||||
/// reading/deserializing records themselves.
|
||||
///
|
||||
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
||||
pub struct BlobRef(pub u64);
|
||||
|
||||
@@ -138,10 +152,8 @@ impl BlobRef {
|
||||
pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
|
||||
struct DeltaKey([u8; DELTA_KEY_SIZE]);
|
||||
|
||||
///
|
||||
/// This is the key of the B-tree index stored in the delta layer. It consists
|
||||
/// of the serialized representation of a Key and LSN.
|
||||
///
|
||||
impl DeltaKey {
|
||||
fn from_slice(buf: &[u8]) -> Self {
|
||||
let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
|
||||
@@ -214,6 +226,12 @@ pub struct DeltaLayerInner {
|
||||
file: FileBlockReader<VirtualFile>,
|
||||
}
|
||||
|
||||
impl AsRef<DeltaLayerInner> for DeltaLayerInner {
|
||||
fn as_ref(&self) -> &DeltaLayerInner {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DeltaLayerInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("DeltaLayerInner")
|
||||
@@ -311,86 +329,16 @@ impl Layer for DeltaLayer {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ensure!(lsn_range.start >= self.desc.lsn_range.start);
|
||||
let mut need_image = true;
|
||||
|
||||
ensure!(self.desc.key_range.contains(&key));
|
||||
|
||||
{
|
||||
// Open the file and lock the metadata in memory
|
||||
let inner = self
|
||||
.load(LayerAccessKind::GetValueReconstructData, ctx)
|
||||
.await?;
|
||||
let inner = self
|
||||
.load(LayerAccessKind::GetValueReconstructData, ctx)
|
||||
.await?;
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
file,
|
||||
);
|
||||
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
|
||||
|
||||
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
|
||||
|
||||
tree_reader
|
||||
.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
|
||||
let blob_ref = BlobRef(value);
|
||||
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
|
||||
return false;
|
||||
}
|
||||
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
|
||||
if entry_lsn < lsn_range.start {
|
||||
return false;
|
||||
}
|
||||
offsets.push((entry_lsn, blob_ref.pos()));
|
||||
|
||||
!blob_ref.will_init()
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Ok, 'offsets' now contains the offsets of all the entries we need to read
|
||||
let cursor = file.block_cursor();
|
||||
let mut buf = Vec::new();
|
||||
for (entry_lsn, pos) in offsets {
|
||||
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
|
||||
format!(
|
||||
"Failed to read blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
let val = Value::des(&buf).with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize file blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
match val {
|
||||
Value::Image(img) => {
|
||||
reconstruct_state.img = Some((entry_lsn, img));
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((entry_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// release metadata lock and close the file
|
||||
}
|
||||
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
Ok(ValueReconstructResult::Continue)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
}
|
||||
inner
|
||||
.get_value_reconstruct_data(key, lsn_range, reconstruct_state)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
@@ -516,43 +464,27 @@ impl DeltaLayer {
|
||||
async fn load_inner(&self) -> Result<Arc<DeltaLayerInner>> {
|
||||
let path = self.path();
|
||||
|
||||
let file = VirtualFile::open(&path)
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary = match &self.path_or_conf {
|
||||
PathOrConf::Conf(_) => Some(Summary::from(self)),
|
||||
PathOrConf::Path(_) => None,
|
||||
};
|
||||
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
let loaded = DeltaLayerInner::load(&path, summary)?;
|
||||
|
||||
match &self.path_or_conf {
|
||||
PathOrConf::Conf(_) => {
|
||||
let mut expected_summary = Summary::from(self);
|
||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
if actual_summary != expected_summary {
|
||||
bail!("in-file summary does not match expected summary. actual = {:?} expected = {:?}", actual_summary, expected_summary);
|
||||
}
|
||||
}
|
||||
PathOrConf::Path(path) => {
|
||||
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
|
||||
let expected_filename = self.filename().file_name();
|
||||
if let PathOrConf::Path(ref path) = self.path_or_conf {
|
||||
// not production code
|
||||
|
||||
if actual_filename != expected_filename {
|
||||
println!(
|
||||
"warning: filename does not match what is expected from in-file summary"
|
||||
);
|
||||
println!("actual: {:?}", actual_filename);
|
||||
println!("expected: {:?}", expected_filename);
|
||||
}
|
||||
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
|
||||
let expected_filename = self.filename().file_name();
|
||||
|
||||
if actual_filename != expected_filename {
|
||||
println!("warning: filename does not match what is expected from in-file summary");
|
||||
println!("actual: {:?}", actual_filename);
|
||||
println!("expected: {:?}", expected_filename);
|
||||
}
|
||||
}
|
||||
|
||||
debug!("loaded from {}", &path.display());
|
||||
|
||||
Ok(Arc::new(DeltaLayerInner {
|
||||
file,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
}))
|
||||
Ok(Arc::new(loaded))
|
||||
}
|
||||
|
||||
/// Create a DeltaLayer struct representing an existing file on disk.
|
||||
@@ -621,9 +553,12 @@ impl DeltaLayer {
|
||||
/// Obtains all keys and value references stored in the layer
|
||||
///
|
||||
/// The value can be obtained via the [`ValueRef::load`] function.
|
||||
pub async fn load_val_refs(&self, ctx: &RequestContext) -> Result<Vec<(Key, Lsn, ValueRef)>> {
|
||||
pub async fn load_val_refs(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<(Key, Lsn, ValueRef<Arc<DeltaLayerInner>>)>> {
|
||||
let inner = self
|
||||
.load(LayerAccessKind::KeyIter, ctx)
|
||||
.load(LayerAccessKind::Iter, ctx)
|
||||
.await
|
||||
.context("load delta layer")?;
|
||||
DeltaLayerInner::load_val_refs(inner)
|
||||
@@ -912,15 +847,120 @@ impl Drop for DeltaLayerWriter {
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
async fn load_val_refs(this: &Arc<DeltaLayerInner>) -> Result<Vec<(Key, Lsn, ValueRef)>> {
|
||||
let file = &this.file;
|
||||
pub(super) fn load(path: &std::path::Path, summary: Option<Summary>) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path)
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
|
||||
if let Some(mut expected_summary) = summary {
|
||||
// production code path
|
||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
if actual_summary != expected_summary {
|
||||
bail!(
|
||||
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
actual_summary,
|
||||
expected_summary
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(DeltaLayerInner {
|
||||
file,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
let mut need_image = true;
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let file = &self.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
this.index_start_blk,
|
||||
this.index_root_blk,
|
||||
self.index_start_blk,
|
||||
self.index_root_blk,
|
||||
file,
|
||||
);
|
||||
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
|
||||
|
||||
let mut all_offsets = Vec::<(Key, Lsn, ValueRef)>::new();
|
||||
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
|
||||
|
||||
tree_reader
|
||||
.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
|
||||
let blob_ref = BlobRef(value);
|
||||
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
|
||||
return false;
|
||||
}
|
||||
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
|
||||
if entry_lsn < lsn_range.start {
|
||||
return false;
|
||||
}
|
||||
offsets.push((entry_lsn, blob_ref.pos()));
|
||||
|
||||
!blob_ref.will_init()
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Ok, 'offsets' now contains the offsets of all the entries we need to read
|
||||
let cursor = file.block_cursor();
|
||||
let mut buf = Vec::new();
|
||||
for (entry_lsn, pos) in offsets {
|
||||
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
|
||||
format!(
|
||||
"Failed to read blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
let val = Value::des(&buf).with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize file blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
)
|
||||
})?;
|
||||
match val {
|
||||
Value::Image(img) => {
|
||||
reconstruct_state.img = Some((entry_lsn, img));
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((entry_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
Ok(ValueReconstructResult::Continue)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn load_val_refs<T: AsRef<DeltaLayerInner> + Clone>(
|
||||
this: &T,
|
||||
) -> Result<Vec<(Key, Lsn, ValueRef<T>)>> {
|
||||
let dl = this.as_ref();
|
||||
let file = &dl.file;
|
||||
let tree_reader =
|
||||
DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(dl.index_start_blk, dl.index_root_blk, file);
|
||||
|
||||
let mut all_offsets = Vec::<(Key, Lsn, ValueRef<T>)>::new();
|
||||
tree_reader
|
||||
.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
@@ -939,7 +979,8 @@ impl DeltaLayerInner {
|
||||
|
||||
Ok(all_offsets)
|
||||
}
|
||||
async fn load_keys(&self) -> Result<Vec<(Key, Lsn, u64)>> {
|
||||
|
||||
pub(super) async fn load_keys(&self) -> Result<Vec<(Key, Lsn, u64)>> {
|
||||
let file = &self.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
@@ -979,26 +1020,27 @@ impl DeltaLayerInner {
|
||||
}
|
||||
|
||||
/// Reference to an on-disk value
|
||||
pub struct ValueRef {
|
||||
pub struct ValueRef<T: AsRef<DeltaLayerInner>> {
|
||||
blob_ref: BlobRef,
|
||||
reader: BlockCursor<Adapter>,
|
||||
reader: BlockCursor<Adapter<T>>,
|
||||
}
|
||||
|
||||
impl ValueRef {
|
||||
impl<T: AsRef<DeltaLayerInner>> ValueRef<T> {
|
||||
/// Loads the value from disk
|
||||
pub fn load(&self) -> Result<Value> {
|
||||
// theoretically we *could* record an access time for each, but it does not really matter
|
||||
let buf = self.reader.read_blob(self.blob_ref.pos())?;
|
||||
let val = Value::des(&buf)?;
|
||||
Ok(val)
|
||||
}
|
||||
}
|
||||
|
||||
struct Adapter(Arc<DeltaLayerInner>);
|
||||
struct Adapter<T: AsRef<DeltaLayerInner>>(T);
|
||||
|
||||
impl BlockReader for Adapter {
|
||||
impl<T: AsRef<DeltaLayerInner>> BlockReader for Adapter<T> {
|
||||
type BlockLease = PageReadGuard<'static>;
|
||||
|
||||
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, std::io::Error> {
|
||||
self.0.file.read_blk(blknum)
|
||||
self.0.as_ref().file.read_blk(blknum)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ use super::{AsLayerDesc, Layer, LayerAccessStatsReset, PathOrConf, PersistentLay
|
||||
/// the 'index' starts at the block indicated by 'index_start_blk'
|
||||
///
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
struct Summary {
|
||||
pub(super) struct Summary {
|
||||
/// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
|
||||
magic: u16,
|
||||
format_version: u16,
|
||||
@@ -85,13 +85,29 @@ struct Summary {
|
||||
|
||||
impl From<&ImageLayer> for Summary {
|
||||
fn from(layer: &ImageLayer) -> Self {
|
||||
Self::expected(
|
||||
layer.desc.tenant_id,
|
||||
layer.desc.timeline_id,
|
||||
layer.desc.key_range.clone(),
|
||||
layer.lsn,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Summary {
|
||||
pub(super) fn expected(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn: Lsn,
|
||||
) -> Self {
|
||||
Self {
|
||||
magic: IMAGE_FILE_MAGIC,
|
||||
format_version: STORAGE_FORMAT_VERSION,
|
||||
tenant_id: layer.desc.tenant_id,
|
||||
timeline_id: layer.desc.timeline_id,
|
||||
key_range: layer.desc.key_range.clone(),
|
||||
lsn: layer.lsn,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
key_range,
|
||||
lsn,
|
||||
|
||||
index_start_blk: 0,
|
||||
index_root_blk: 0,
|
||||
@@ -136,6 +152,8 @@ pub struct ImageLayerInner {
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
|
||||
lsn: Lsn,
|
||||
|
||||
/// Reader object for reading blocks from the file.
|
||||
file: FileBlockReader<VirtualFile>,
|
||||
}
|
||||
@@ -200,27 +218,11 @@ impl Layer for ImageLayer {
|
||||
let inner = self
|
||||
.load(LayerAccessKind::GetValueReconstructData, ctx)
|
||||
.await?;
|
||||
|
||||
let file = &inner.file;
|
||||
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
if let Some(offset) = tree_reader.get(&keybuf).await? {
|
||||
let blob = file.block_cursor().read_blob(offset).with_context(|| {
|
||||
format!(
|
||||
"failed to read value from data file {} at offset {}",
|
||||
self.path().display(),
|
||||
offset
|
||||
)
|
||||
})?;
|
||||
let value = Bytes::from(blob);
|
||||
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Missing)
|
||||
}
|
||||
inner
|
||||
.get_value_reconstruct_data(key, reconstruct_state)
|
||||
.await
|
||||
// FIXME: makes no sense to dump paths
|
||||
.with_context(|| format!("read {}", self.path().display()))
|
||||
}
|
||||
|
||||
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
|
||||
@@ -323,56 +325,35 @@ impl ImageLayer {
|
||||
) -> Result<&ImageLayerInner> {
|
||||
self.access_stats
|
||||
.record_access(access_kind, ctx.task_kind());
|
||||
loop {
|
||||
if let Some(inner) = self.inner.get() {
|
||||
return Ok(inner);
|
||||
}
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner())
|
||||
.await
|
||||
.with_context(|| format!("Failed to load image layer {}", self.path().display()))?;
|
||||
}
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner())
|
||||
.await
|
||||
.with_context(|| format!("Failed to load image layer {}", self.path().display()))
|
||||
}
|
||||
|
||||
async fn load_inner(&self) -> Result<ImageLayerInner> {
|
||||
let path = self.path();
|
||||
|
||||
// Open the file if it's not open already.
|
||||
let file = VirtualFile::open(&path)
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
let expected_summary = match &self.path_or_conf {
|
||||
PathOrConf::Conf(_) => Some(Summary::from(self)),
|
||||
PathOrConf::Path(_) => None,
|
||||
};
|
||||
|
||||
match &self.path_or_conf {
|
||||
PathOrConf::Conf(_) => {
|
||||
let mut expected_summary = Summary::from(self);
|
||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary)?;
|
||||
|
||||
if actual_summary != expected_summary {
|
||||
bail!("in-file summary does not match expected summary. actual = {:?} expected = {:?}", actual_summary, expected_summary);
|
||||
}
|
||||
}
|
||||
PathOrConf::Path(path) => {
|
||||
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
|
||||
let expected_filename = self.filename().file_name();
|
||||
if let PathOrConf::Path(ref path) = self.path_or_conf {
|
||||
// not production code
|
||||
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
|
||||
let expected_filename = self.filename().file_name();
|
||||
|
||||
if actual_filename != expected_filename {
|
||||
println!(
|
||||
"warning: filename does not match what is expected from in-file summary"
|
||||
);
|
||||
println!("actual: {:?}", actual_filename);
|
||||
println!("expected: {:?}", expected_filename);
|
||||
}
|
||||
if actual_filename != expected_filename {
|
||||
println!("warning: filename does not match what is expected from in-file summary");
|
||||
println!("actual: {:?}", actual_filename);
|
||||
println!("expected: {:?}", expected_filename);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ImageLayerInner {
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
file,
|
||||
})
|
||||
Ok(loaded)
|
||||
}
|
||||
|
||||
/// Create an ImageLayer struct representing an existing file on disk
|
||||
@@ -442,6 +423,65 @@ impl ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
pub(super) fn load(
|
||||
path: &std::path::Path,
|
||||
lsn: Lsn,
|
||||
summary: Option<Summary>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let file = VirtualFile::open(path)
|
||||
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0)?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
|
||||
|
||||
if let Some(mut expected_summary) = summary {
|
||||
// production code path
|
||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
|
||||
if actual_summary != expected_summary {
|
||||
bail!(
|
||||
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
actual_summary,
|
||||
expected_summary
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ImageLayerInner {
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
lsn,
|
||||
file,
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
let file = &self.file;
|
||||
let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file);
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
if let Some(offset) = tree_reader.get(&keybuf).await? {
|
||||
let blob = file
|
||||
.block_cursor()
|
||||
.read_blob(offset)
|
||||
.with_context(|| format!("failed to read value from offset {}", offset))?;
|
||||
let value = Bytes::from(blob);
|
||||
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Missing)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder object for constructing a new image layer.
|
||||
///
|
||||
/// Usage:
|
||||
|
||||
@@ -16,6 +16,7 @@ use anyhow::{ensure, Result};
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::OnceLock;
|
||||
use tracing::*;
|
||||
use utils::{
|
||||
bin_ser::BeSer,
|
||||
@@ -42,14 +43,16 @@ pub struct InMemoryLayer {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
|
||||
///
|
||||
/// This layer contains all the changes from 'start_lsn'. The
|
||||
/// start is inclusive.
|
||||
///
|
||||
start_lsn: Lsn,
|
||||
|
||||
/// The above fields never change. The parts that do change are in 'inner',
|
||||
/// and protected by mutex.
|
||||
/// Frozen layers have an exclusive end LSN.
|
||||
/// Writes are only allowed when this is `None`.
|
||||
end_lsn: OnceLock<Lsn>,
|
||||
|
||||
/// The above fields never change, except for `end_lsn`, which is only set once.
|
||||
/// All other changing parts are in `inner`, and protected by a mutex.
|
||||
inner: RwLock<InMemoryLayerInner>,
|
||||
}
|
||||
|
||||
@@ -57,21 +60,16 @@ impl std::fmt::Debug for InMemoryLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("InMemoryLayer")
|
||||
.field("start_lsn", &self.start_lsn)
|
||||
.field("end_lsn", &self.end_lsn)
|
||||
.field("inner", &self.inner)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InMemoryLayerInner {
|
||||
/// Frozen layers have an exclusive end LSN.
|
||||
/// Writes are only allowed when this is None
|
||||
end_lsn: Option<Lsn>,
|
||||
|
||||
///
|
||||
/// All versions of all pages in the layer are kept here. Indexed
|
||||
/// by block number and LSN. The value is an offset into the
|
||||
/// ephemeral file where the page version is stored.
|
||||
///
|
||||
index: HashMap<Key, VecMap<Lsn, u64>>,
|
||||
|
||||
/// The values are stored in a serialized format in this file.
|
||||
@@ -82,15 +80,7 @@ pub struct InMemoryLayerInner {
|
||||
|
||||
impl std::fmt::Debug for InMemoryLayerInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("InMemoryLayerInner")
|
||||
.field("end_lsn", &self.end_lsn)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemoryLayerInner {
|
||||
fn assert_writeable(&self) {
|
||||
assert!(self.end_lsn.is_none());
|
||||
f.debug_struct("InMemoryLayerInner").finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,13 +91,21 @@ impl InMemoryLayer {
|
||||
|
||||
pub fn info(&self) -> InMemoryLayerInfo {
|
||||
let lsn_start = self.start_lsn;
|
||||
let lsn_end = self.inner.read().unwrap().end_lsn;
|
||||
|
||||
match lsn_end {
|
||||
Some(lsn_end) => InMemoryLayerInfo::Frozen { lsn_start, lsn_end },
|
||||
None => InMemoryLayerInfo::Open { lsn_start },
|
||||
if let Some(&lsn_end) = self.end_lsn.get() {
|
||||
InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
|
||||
} else {
|
||||
InMemoryLayerInfo::Open { lsn_start }
|
||||
}
|
||||
}
|
||||
|
||||
fn assert_writable(&self) {
|
||||
assert!(self.end_lsn.get().is_none());
|
||||
}
|
||||
|
||||
fn end_lsn_or_max(&self) -> Lsn {
|
||||
self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -117,14 +115,7 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
|
||||
fn get_lsn_range(&self) -> Range<Lsn> {
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let end_lsn = if let Some(end_lsn) = inner.end_lsn {
|
||||
end_lsn
|
||||
} else {
|
||||
Lsn(u64::MAX)
|
||||
};
|
||||
self.start_lsn..end_lsn
|
||||
self.start_lsn..self.end_lsn_or_max()
|
||||
}
|
||||
|
||||
fn is_incremental(&self) -> bool {
|
||||
@@ -136,11 +127,7 @@ impl Layer for InMemoryLayer {
|
||||
async fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let end_str = inner
|
||||
.end_lsn
|
||||
.as_ref()
|
||||
.map(Lsn::to_string)
|
||||
.unwrap_or_default();
|
||||
let end_str = self.end_lsn_or_max();
|
||||
|
||||
println!(
|
||||
"----- in-memory layer for tli {} LSNs {}-{} ----",
|
||||
@@ -236,9 +223,7 @@ impl Layer for InMemoryLayer {
|
||||
|
||||
impl std::fmt::Display for InMemoryLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let end_lsn = inner.end_lsn.unwrap_or(Lsn(u64::MAX));
|
||||
let end_lsn = self.end_lsn_or_max();
|
||||
write!(f, "inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
|
||||
}
|
||||
}
|
||||
@@ -270,8 +255,8 @@ impl InMemoryLayer {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
start_lsn,
|
||||
end_lsn: OnceLock::new(),
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
end_lsn: None,
|
||||
index: HashMap::new(),
|
||||
file,
|
||||
}),
|
||||
@@ -285,7 +270,7 @@ impl InMemoryLayer {
|
||||
pub fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
|
||||
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
inner.assert_writeable();
|
||||
self.assert_writable();
|
||||
|
||||
let off = {
|
||||
SER_BUFFER.with(|x| -> Result<_> {
|
||||
@@ -317,10 +302,10 @@ impl InMemoryLayer {
|
||||
/// Records the end_lsn for non-dropped layers.
|
||||
/// `end_lsn` is exclusive
|
||||
pub fn freeze(&self, end_lsn: Lsn) {
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
let inner = self.inner.write().unwrap();
|
||||
|
||||
assert!(self.start_lsn < end_lsn);
|
||||
inner.end_lsn = Some(end_lsn);
|
||||
self.end_lsn.set(end_lsn).expect("end_lsn set only once");
|
||||
|
||||
for vec_map in inner.index.values() {
|
||||
for (lsn, _pos) in vec_map.as_slice() {
|
||||
@@ -344,12 +329,14 @@ impl InMemoryLayer {
|
||||
// rare though, so we just accept the potential latency hit for now.
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let end_lsn = *self.end_lsn.get().unwrap();
|
||||
|
||||
let mut delta_layer_writer = DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
Key::MIN,
|
||||
self.start_lsn..inner.end_lsn.unwrap(),
|
||||
self.start_lsn..end_lsn,
|
||||
)?;
|
||||
|
||||
let mut buf = Vec::new();
|
||||
|
||||
@@ -919,7 +919,7 @@ impl Timeline {
|
||||
pub fn set_state(&self, new_state: TimelineState) {
|
||||
match (self.current_state(), new_state) {
|
||||
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
|
||||
warn!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
|
||||
info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
|
||||
}
|
||||
(st, TimelineState::Loading) => {
|
||||
error!("ignoring transition from {st:?} into Loading state");
|
||||
@@ -1160,7 +1160,7 @@ impl Timeline {
|
||||
return Err(EvictionError::CannotEvictRemoteLayer);
|
||||
}
|
||||
|
||||
let layer_file_size = local_layer.file_size();
|
||||
let layer_file_size = local_layer.layer_desc().file_size;
|
||||
|
||||
let local_layer_mtime = local_layer
|
||||
.local_path()
|
||||
@@ -1590,7 +1590,6 @@ impl Timeline {
|
||||
///
|
||||
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut guard = self.layers.write().await;
|
||||
let mut num_layers = 0;
|
||||
|
||||
let timer = self.metrics.load_layer_map_histo.start_timer();
|
||||
|
||||
@@ -1608,12 +1607,12 @@ impl Timeline {
|
||||
let fname = direntry.file_name();
|
||||
let fname = fname.to_string_lossy();
|
||||
|
||||
if let Some(imgfilename) = ImageFileName::parse_str(&fname) {
|
||||
if let Some(filename) = ImageFileName::parse_str(&fname) {
|
||||
// create an ImageLayer struct for each image file.
|
||||
if imgfilename.lsn > disk_consistent_lsn {
|
||||
if filename.lsn > disk_consistent_lsn {
|
||||
info!(
|
||||
"found future image layer {} on timeline {} disk_consistent_lsn is {}",
|
||||
imgfilename, self.timeline_id, disk_consistent_lsn
|
||||
filename, self.timeline_id, disk_consistent_lsn
|
||||
);
|
||||
|
||||
rename_to_backup(&direntry_path)?;
|
||||
@@ -1621,31 +1620,31 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let file_size = direntry_path.metadata()?.len();
|
||||
let stats =
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident);
|
||||
|
||||
let layer = ImageLayer::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
&imgfilename,
|
||||
&filename,
|
||||
file_size,
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident),
|
||||
stats,
|
||||
);
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
loaded_layers.push(Arc::new(layer));
|
||||
num_layers += 1;
|
||||
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
|
||||
} else if let Some(filename) = DeltaFileName::parse_str(&fname) {
|
||||
// Create a DeltaLayer struct for each delta file.
|
||||
// The end-LSN is exclusive, while disk_consistent_lsn is
|
||||
// inclusive. For example, if disk_consistent_lsn is 100, it is
|
||||
// OK for a delta layer to have end LSN 101, but if the end LSN
|
||||
// is 102, then it might not have been fully flushed to disk
|
||||
// before crash.
|
||||
if deltafilename.lsn_range.end > disk_consistent_lsn + 1 {
|
||||
if filename.lsn_range.end > disk_consistent_lsn + 1 {
|
||||
info!(
|
||||
"found future delta layer {} on timeline {} disk_consistent_lsn is {}",
|
||||
deltafilename, self.timeline_id, disk_consistent_lsn
|
||||
filename, self.timeline_id, disk_consistent_lsn
|
||||
);
|
||||
|
||||
rename_to_backup(&direntry_path)?;
|
||||
@@ -1653,20 +1652,20 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let file_size = direntry_path.metadata()?.len();
|
||||
let stats =
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident);
|
||||
|
||||
let layer = DeltaLayer::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
&deltafilename,
|
||||
&filename,
|
||||
file_size,
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident),
|
||||
stats,
|
||||
);
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
loaded_layers.push(Arc::new(layer));
|
||||
num_layers += 1;
|
||||
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
||||
// ignore these
|
||||
} else if remote_timeline_client::is_temp_download_file(&direntry_path) {
|
||||
@@ -1691,6 +1690,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let num_layers = loaded_layers.len();
|
||||
guard.initialize_local_layers(loaded_layers, Lsn(disk_consistent_lsn.0) + 1);
|
||||
|
||||
info!(
|
||||
@@ -1791,13 +1791,15 @@ impl Timeline {
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let stats =
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted);
|
||||
|
||||
let remote_layer = RemoteLayer::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
imgfilename,
|
||||
&remote_layer_metadata,
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted),
|
||||
stats,
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
added_remote_layers.push(remote_layer);
|
||||
@@ -1816,12 +1818,15 @@ impl Timeline {
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let stats =
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted);
|
||||
|
||||
let remote_layer = RemoteLayer::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
deltafilename,
|
||||
&remote_layer_metadata,
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted),
|
||||
stats,
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
added_remote_layers.push(remote_layer);
|
||||
@@ -2269,15 +2274,16 @@ trait TraversalLayerExt {
|
||||
|
||||
impl TraversalLayerExt for Arc<dyn PersistentLayer> {
|
||||
fn traversal_id(&self) -> TraversalId {
|
||||
let timeline_id = self.layer_desc().timeline_id;
|
||||
match self.local_path() {
|
||||
Some(local_path) => {
|
||||
debug_assert!(local_path.to_str().unwrap().contains(&format!("{}", self.get_timeline_id())),
|
||||
debug_assert!(local_path.to_str().unwrap().contains(&format!("{}", timeline_id)),
|
||||
"need timeline ID to uniquely identify the layer when traversal crosses ancestor boundary",
|
||||
);
|
||||
format!("{}", local_path.display())
|
||||
}
|
||||
None => {
|
||||
format!("remote {}/{self}", self.get_timeline_id())
|
||||
format!("remote {}/{self}", timeline_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2813,7 +2819,10 @@ impl Timeline {
|
||||
// We will remove frozen layer and add delta layer in one atomic operation later.
|
||||
let layer = self.create_delta_layer(&frozen_layer).await?;
|
||||
(
|
||||
HashMap::from([(layer.filename(), LayerFileMetadata::new(layer.file_size()))]),
|
||||
HashMap::from([(
|
||||
layer.filename(),
|
||||
LayerFileMetadata::new(layer.layer_desc().file_size),
|
||||
)]),
|
||||
Some(layer),
|
||||
)
|
||||
};
|
||||
@@ -2833,7 +2842,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
// update metrics
|
||||
let sz = l.file_size();
|
||||
let sz = l.layer_desc().file_size;
|
||||
self.metrics.resident_physical_size_gauge.add(sz);
|
||||
self.metrics.num_persistent_files_created.inc_by(1);
|
||||
self.metrics.persistent_bytes_written.inc_by(sz);
|
||||
@@ -3452,14 +3461,14 @@ impl Timeline {
|
||||
// "gaps" in the sequence of level 0 files should only happen in case
|
||||
// of a crash, partial download from cloud storage, or something like
|
||||
// that, so it's not a big deal in practice.
|
||||
level0_deltas.sort_by_key(|l| l.get_lsn_range().start);
|
||||
level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
|
||||
let mut level0_deltas_iter = level0_deltas.iter();
|
||||
|
||||
let first_level0_delta = level0_deltas_iter.next().unwrap();
|
||||
let mut prev_lsn_end = first_level0_delta.get_lsn_range().end;
|
||||
let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
|
||||
let mut deltas_to_compact = vec![Arc::clone(first_level0_delta)];
|
||||
for l in level0_deltas_iter {
|
||||
let lsn_range = l.get_lsn_range();
|
||||
let lsn_range = &l.layer_desc().lsn_range;
|
||||
|
||||
if lsn_range.start != prev_lsn_end {
|
||||
break;
|
||||
@@ -3468,8 +3477,13 @@ impl Timeline {
|
||||
prev_lsn_end = lsn_range.end;
|
||||
}
|
||||
let lsn_range = Range {
|
||||
start: deltas_to_compact.first().unwrap().get_lsn_range().start,
|
||||
end: deltas_to_compact.last().unwrap().get_lsn_range().end,
|
||||
start: deltas_to_compact
|
||||
.first()
|
||||
.unwrap()
|
||||
.layer_desc()
|
||||
.lsn_range
|
||||
.start,
|
||||
end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
|
||||
};
|
||||
|
||||
let remotes = deltas_to_compact
|
||||
@@ -3521,33 +3535,22 @@ impl Timeline {
|
||||
let mut prev: Option<Key> = None;
|
||||
|
||||
let mut all_value_refs = Vec::new();
|
||||
let mut all_keys = Vec::new();
|
||||
|
||||
for l in deltas_to_compact.iter() {
|
||||
// TODO: replace this with an await once we fully go async
|
||||
all_value_refs.extend(
|
||||
Handle::current().block_on(
|
||||
l.clone()
|
||||
.downcast_delta_layer()
|
||||
.expect("delta layer")
|
||||
.load_val_refs(ctx),
|
||||
)?,
|
||||
);
|
||||
let delta = l.clone().downcast_delta_layer().expect("delta layer");
|
||||
Handle::current().block_on(async {
|
||||
all_value_refs.extend(delta.load_val_refs(ctx).await?);
|
||||
all_keys.extend(delta.load_keys(ctx).await?);
|
||||
anyhow::Ok(())
|
||||
})?;
|
||||
}
|
||||
|
||||
// The current stdlib sorting implementation is designed in a way where it is
|
||||
// particularly fast where the slice is made up of sorted sub-ranges.
|
||||
all_value_refs.sort_by_key(|(key, lsn, _value_ref)| (*key, *lsn));
|
||||
|
||||
let mut all_keys = Vec::new();
|
||||
for l in deltas_to_compact.iter() {
|
||||
// TODO: replace this with an await once we fully go async
|
||||
all_keys.extend(
|
||||
Handle::current().block_on(
|
||||
l.clone()
|
||||
.downcast_delta_layer()
|
||||
.expect("delta layer")
|
||||
.load_keys(ctx),
|
||||
)?,
|
||||
);
|
||||
}
|
||||
// The current stdlib sorting implementation is designed in a way where it is
|
||||
// particularly fast where the slice is made up of sorted sub-ranges.
|
||||
all_keys.sort_by_key(|(key, lsn, _size)| (*key, *lsn));
|
||||
@@ -4656,7 +4659,7 @@ impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction {
|
||||
|
||||
impl LocalLayerInfoForDiskUsageEviction {
|
||||
pub fn file_size(&self) -> u64 {
|
||||
self.layer.file_size()
|
||||
self.layer.layer_desc().file_size
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -219,27 +219,13 @@ async fn delete_local_layer_files(
|
||||
}
|
||||
};
|
||||
|
||||
let r = if metadata.is_dir() {
|
||||
// There shouldnt be any directories inside timeline dir as of current layout.
|
||||
if metadata.is_dir() {
|
||||
warn!(path=%entry.path().display(), "unexpected directory under timeline dir");
|
||||
tokio::fs::remove_dir(entry.path()).await
|
||||
} else {
|
||||
tokio::fs::remove_file(entry.path()).await
|
||||
};
|
||||
|
||||
if let Err(e) = r {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
warn!(
|
||||
timeline_dir=?local_timeline_directory,
|
||||
path=?entry.path().display(),
|
||||
"got not found err while removing timeline dir, proceeding anyway"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
anyhow::bail!(anyhow::anyhow!(
|
||||
"Failed to remove: {}. Error: {e}",
|
||||
entry.path().display()
|
||||
));
|
||||
}
|
||||
.with_context(|| format!("Failed to remove: {}", entry.path().display()))?;
|
||||
}
|
||||
|
||||
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
|
||||
@@ -359,10 +345,11 @@ impl DeleteTimelineFlow {
|
||||
// NB: If this fails half-way through, and is retried, the retry will go through
|
||||
// all the same steps again. Make sure the code here is idempotent, and don't
|
||||
// error out if some of the shutdown tasks have already been completed!
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant.tenant_id, %timeline_id))]
|
||||
#[instrument(skip(tenant), fields(tenant_id=%tenant.tenant_id))]
|
||||
pub async fn run(
|
||||
tenant: &Arc<Tenant>,
|
||||
timeline_id: TimelineId,
|
||||
inplace: bool,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
let (timeline, mut guard) = Self::prepare(tenant, timeline_id)?;
|
||||
|
||||
@@ -380,7 +367,11 @@ impl DeleteTimelineFlow {
|
||||
))?
|
||||
});
|
||||
|
||||
Self::schedule_background(guard, tenant.conf, Arc::clone(tenant), timeline);
|
||||
if inplace {
|
||||
Self::background(guard, tenant.conf, tenant, &timeline).await?
|
||||
} else {
|
||||
Self::schedule_background(guard, tenant.conf, Arc::clone(tenant), timeline);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -398,6 +389,8 @@ impl DeleteTimelineFlow {
|
||||
}
|
||||
|
||||
/// Shortcut to create Timeline in stopping state and spawn deletion task.
|
||||
/// See corresponding parts of [`crate::tenant::delete::DeleteTenantFlow`]
|
||||
#[instrument(skip_all, fields(%timeline_id))]
|
||||
pub async fn resume_deletion(
|
||||
tenant: Arc<Tenant>,
|
||||
timeline_id: TimelineId,
|
||||
@@ -444,11 +437,15 @@ impl DeleteTimelineFlow {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(%timeline_id))]
|
||||
pub async fn cleanup_remaining_timeline_fs_traces(
|
||||
tenant: &Tenant,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<()> {
|
||||
cleanup_remaining_timeline_fs_traces(tenant.conf, tenant.tenant_id, timeline_id).await
|
||||
let r =
|
||||
cleanup_remaining_timeline_fs_traces(tenant.conf, tenant.tenant_id, timeline_id).await;
|
||||
info!("Done");
|
||||
r
|
||||
}
|
||||
|
||||
fn prepare(
|
||||
@@ -494,11 +491,17 @@ impl DeleteTimelineFlow {
|
||||
// At the end of the operation we're holding the guard and need to lock timelines map
|
||||
// to remove the timeline from it.
|
||||
// Always if you have two locks that are taken in different order this can result in a deadlock.
|
||||
let delete_lock_guard = DeletionGuard(
|
||||
Arc::clone(&timeline.delete_progress)
|
||||
.try_lock_owned()
|
||||
.map_err(|_| DeleteTimelineError::AlreadyInProgress)?,
|
||||
);
|
||||
|
||||
let delete_progress = Arc::clone(&timeline.delete_progress);
|
||||
let delete_lock_guard = match delete_progress.try_lock_owned() {
|
||||
Ok(guard) => DeletionGuard(guard),
|
||||
Err(_) => {
|
||||
// Unfortunately if lock fails arc is consumed.
|
||||
return Err(DeleteTimelineError::AlreadyInProgress(Arc::clone(
|
||||
&timeline.delete_progress,
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
timeline.set_state(TimelineState::Stopping);
|
||||
|
||||
@@ -553,10 +556,14 @@ impl DeleteTimelineFlow {
|
||||
|
||||
remove_timeline_from_tenant(tenant, timeline.timeline_id, &guard).await?;
|
||||
|
||||
*guard.0 = Self::Finished;
|
||||
*guard = Self::Finished;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn is_finished(&self) -> bool {
|
||||
matches!(self, Self::Finished)
|
||||
}
|
||||
}
|
||||
|
||||
struct DeletionGuard(OwnedMutexGuard<DeleteTimelineFlow>);
|
||||
|
||||
@@ -120,10 +120,9 @@ impl LayerManager {
|
||||
|
||||
ensure!(
|
||||
lsn > last_record_lsn,
|
||||
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})\n{}",
|
||||
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
|
||||
lsn,
|
||||
last_record_lsn,
|
||||
std::backtrace::Backtrace::force_capture(),
|
||||
);
|
||||
|
||||
// Do we have a layer open for writing already?
|
||||
@@ -278,7 +277,7 @@ impl LayerManager {
|
||||
updates: &mut BatchedUpdates<'_>,
|
||||
mapping: &mut LayerFileManager,
|
||||
) {
|
||||
updates.remove_historic(layer.layer_desc().clone());
|
||||
updates.remove_historic(layer.layer_desc());
|
||||
mapping.remove(layer);
|
||||
}
|
||||
|
||||
@@ -292,10 +291,10 @@ impl LayerManager {
|
||||
metrics: &TimelineMetrics,
|
||||
mapping: &mut LayerFileManager,
|
||||
) -> anyhow::Result<()> {
|
||||
let desc = layer.layer_desc();
|
||||
if !layer.is_remote_layer() {
|
||||
layer.delete_resident_layer_file()?;
|
||||
let layer_file_size = layer.file_size();
|
||||
metrics.resident_physical_size_gauge.sub(layer_file_size);
|
||||
metrics.resident_physical_size_gauge.sub(desc.file_size);
|
||||
}
|
||||
|
||||
// TODO Removing from the bottom of the layer map is expensive.
|
||||
@@ -303,7 +302,7 @@ impl LayerManager {
|
||||
// won't be needed for page reconstruction for this timeline,
|
||||
// and mark what we can't delete yet as deleted from the layer
|
||||
// map index without actually rebuilding the index.
|
||||
updates.remove_historic(layer.layer_desc().clone());
|
||||
updates.remove_historic(desc);
|
||||
mapping.remove(layer);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -53,6 +53,9 @@ pub struct VirtualFile {
|
||||
pub path: PathBuf,
|
||||
open_options: OpenOptions,
|
||||
|
||||
// These are strings becase we only use them for metrics, and those expect strings.
|
||||
// It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
|
||||
// strings.
|
||||
tenant_id: String,
|
||||
timeline_id: String,
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::{
|
||||
auth::{self, AuthFlow, ClientCredentials},
|
||||
compute,
|
||||
console::{self, AuthInfo, CachedNodeInfo, ConsoleReqExtra},
|
||||
proxy::handle_try_wake,
|
||||
proxy::{handle_try_wake, retry_after},
|
||||
sasl, scram,
|
||||
stream::PqStream,
|
||||
};
|
||||
@@ -62,10 +62,13 @@ pub(super) async fn authenticate(
|
||||
}
|
||||
Ok(ControlFlow::Continue(e)) => {
|
||||
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
|
||||
num_retries += 1;
|
||||
}
|
||||
Ok(ControlFlow::Break(n)) => break n,
|
||||
}
|
||||
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
tokio::time::sleep(wait_duration).await;
|
||||
};
|
||||
if let Some(keys) = scram_keys {
|
||||
use tokio_postgres::config::AuthKeys;
|
||||
|
||||
@@ -8,6 +8,7 @@ use super::{
|
||||
use crate::{auth::ClientCredentials, compute, http, scram};
|
||||
use async_trait::async_trait;
|
||||
use futures::TryFutureExt;
|
||||
use tokio::time::Instant;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
|
||||
@@ -47,7 +48,9 @@ impl Api {
|
||||
.build()?;
|
||||
|
||||
info!(url = request.url().as_str(), "sending http request");
|
||||
let start = Instant::now();
|
||||
let response = self.endpoint.execute(request).await?;
|
||||
info!(duration = ?start.elapsed(), "received http response");
|
||||
let body = match parse_body::<GetRoleSecret>(response).await {
|
||||
Ok(body) => body,
|
||||
// Error 404 is special: it's ok not to have a secret.
|
||||
@@ -88,7 +91,9 @@ impl Api {
|
||||
.build()?;
|
||||
|
||||
info!(url = request.url().as_str(), "sending http request");
|
||||
let start = Instant::now();
|
||||
let response = self.endpoint.execute(request).await?;
|
||||
info!(duration = ?start.elapsed(), "received http response");
|
||||
let body = parse_body::<WakeCompute>(response).await?;
|
||||
|
||||
// Unfortunately, ownership won't let us use `Option::ok_or` here.
|
||||
|
||||
@@ -7,11 +7,14 @@ pub mod server;
|
||||
pub mod sql_over_http;
|
||||
pub mod websocket;
|
||||
|
||||
use std::time::Duration;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use futures::FutureExt;
|
||||
pub use reqwest::{Request, Response, StatusCode};
|
||||
pub use reqwest_middleware::{ClientWithMiddleware, Error};
|
||||
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
|
||||
use tokio::time::Instant;
|
||||
use tracing::trace;
|
||||
|
||||
use crate::url::ApiUrl;
|
||||
use reqwest_middleware::RequestBuilder;
|
||||
@@ -20,13 +23,21 @@ use reqwest_middleware::RequestBuilder;
|
||||
/// because it takes care of observability (OpenTelemetry).
|
||||
/// We deliberately don't want to replace this with a public static.
|
||||
pub fn new_client() -> ClientWithMiddleware {
|
||||
reqwest_middleware::ClientBuilder::new(reqwest::Client::new())
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.dns_resolver(Arc::new(GaiResolver::default()))
|
||||
.connection_verbose(true)
|
||||
.build()
|
||||
.expect("Failed to create http client");
|
||||
|
||||
reqwest_middleware::ClientBuilder::new(client)
|
||||
.with(reqwest_tracing::TracingMiddleware::default())
|
||||
.build()
|
||||
}
|
||||
|
||||
pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware {
|
||||
let timeout_client = reqwest::ClientBuilder::new()
|
||||
.dns_resolver(Arc::new(GaiResolver::default()))
|
||||
.connection_verbose(true)
|
||||
.timeout(default_timout)
|
||||
.build()
|
||||
.expect("Failed to create http client with timeout");
|
||||
@@ -39,6 +50,10 @@ pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware
|
||||
// As per docs, "This middleware always errors when given requests with streaming bodies".
|
||||
// That's all right because we only use this client to send `serde_json::RawValue`, which
|
||||
// is not a stream.
|
||||
//
|
||||
// ex-maintainer note:
|
||||
// this limitation can be fixed if streaming is necessary.
|
||||
// retries will still not be performed, but it wont error immediately
|
||||
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
|
||||
.build()
|
||||
}
|
||||
@@ -81,6 +96,37 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
/// https://docs.rs/reqwest/0.11.18/src/reqwest/dns/gai.rs.html
|
||||
use hyper::{
|
||||
client::connect::dns::{GaiResolver as HyperGaiResolver, Name},
|
||||
service::Service,
|
||||
};
|
||||
use reqwest::dns::{Addrs, Resolve, Resolving};
|
||||
#[derive(Debug)]
|
||||
pub struct GaiResolver(HyperGaiResolver);
|
||||
|
||||
impl Default for GaiResolver {
|
||||
fn default() -> Self {
|
||||
Self(HyperGaiResolver::new())
|
||||
}
|
||||
}
|
||||
|
||||
impl Resolve for GaiResolver {
|
||||
fn resolve(&self, name: Name) -> Resolving {
|
||||
let this = &mut self.0.clone();
|
||||
let start = Instant::now();
|
||||
Box::pin(
|
||||
Service::<Name>::call(this, name.clone()).map(move |result| {
|
||||
let resolve_duration = start.elapsed();
|
||||
trace!(duration = ?resolve_duration, addr = %name, "resolve host complete");
|
||||
result
|
||||
.map(|addrs| -> Addrs { Box::new(addrs) })
|
||||
.map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })
|
||||
}),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -34,7 +34,7 @@ enum Payload {
|
||||
Batch(Vec<QueryData>),
|
||||
}
|
||||
|
||||
pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024; // 1 MB
|
||||
pub const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MB
|
||||
const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
|
||||
|
||||
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
|
||||
@@ -214,7 +214,7 @@ pub async fn handle(
|
||||
|
||||
if request_content_length > MAX_REQUEST_SIZE {
|
||||
return Err(anyhow::anyhow!(
|
||||
"request is too large (max {MAX_REQUEST_SIZE} bytes)"
|
||||
"request is too large (max is {MAX_REQUEST_SIZE} bytes)"
|
||||
));
|
||||
}
|
||||
|
||||
@@ -292,13 +292,15 @@ async fn query_to_json<T: GenericClient>(
|
||||
// big.
|
||||
pin_mut!(row_stream);
|
||||
let mut rows: Vec<tokio_postgres::Row> = Vec::new();
|
||||
let mut curret_size = 0;
|
||||
let mut current_size = 0;
|
||||
while let Some(row) = row_stream.next().await {
|
||||
let row = row?;
|
||||
curret_size += row.body_len();
|
||||
current_size += row.body_len();
|
||||
rows.push(row);
|
||||
if curret_size > MAX_RESPONSE_SIZE {
|
||||
return Err(anyhow::anyhow!("response too large"));
|
||||
if current_size > MAX_RESPONSE_SIZE {
|
||||
return Err(anyhow::anyhow!(
|
||||
"response is too large (max is {MAX_RESPONSE_SIZE} bytes)"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -187,12 +187,16 @@ async fn ws_handler(
|
||||
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)
|
||||
.map_err(|e| ApiError::BadRequest(e.into()))?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = serve_websocket(websocket, config, &cancel_map, session_id, host).await
|
||||
{
|
||||
error!(session_id = ?session_id, "error in websocket connection: {e:?}");
|
||||
tokio::spawn(
|
||||
async move {
|
||||
if let Err(e) =
|
||||
serve_websocket(websocket, config, &cancel_map, session_id, host).await
|
||||
{
|
||||
error!(session_id = ?session_id, "error in websocket connection: {e:#}");
|
||||
}
|
||||
}
|
||||
});
|
||||
.in_current_span(),
|
||||
);
|
||||
|
||||
// Return the response so the spawned future can continue.
|
||||
Ok(response)
|
||||
@@ -217,6 +221,10 @@ async fn ws_handler(
|
||||
},
|
||||
None => Value::Null,
|
||||
};
|
||||
error!(
|
||||
?code,
|
||||
"sql-over-http per-client task finished with an error: {e:#}"
|
||||
);
|
||||
(
|
||||
json!({ "message": message, "code": code }),
|
||||
HashMap::default(),
|
||||
|
||||
@@ -545,7 +545,7 @@ impl ShouldRetry for compute::ConnectionError {
|
||||
}
|
||||
}
|
||||
|
||||
fn retry_after(num_retries: u32) -> time::Duration {
|
||||
pub fn retry_after(num_retries: u32) -> time::Duration {
|
||||
// 1.5 seems to be an ok growth factor heuristic
|
||||
BASE_RETRY_WAIT_DURATION.mul_f64(1.5_f64.powi(num_retries as i32))
|
||||
}
|
||||
|
||||
@@ -223,6 +223,7 @@ module.exports = async ({ github, context, fetch, report }) => {
|
||||
} else {
|
||||
commentBody += `#### No tests were run or test report is not available\n`
|
||||
}
|
||||
commentBody += autoupdateNotice
|
||||
|
||||
let createCommentFn, listCommentsFn, updateCommentFn, issueNumberOrSha
|
||||
if (isPullRequest) {
|
||||
|
||||
198
scripts/ingest_regress_test_result-new-format.py
Normal file
198
scripts/ingest_regress_test_result-new-format.py
Normal file
@@ -0,0 +1,198 @@
|
||||
#! /usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import dataclasses
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Tuple
|
||||
|
||||
import backoff
|
||||
import psycopg2
|
||||
from psycopg2.extras import execute_values
|
||||
|
||||
CREATE_TABLE = """
|
||||
CREATE TABLE IF NOT EXISTS results (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
parent_suite TEXT NOT NULL,
|
||||
suite TEXT NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
started_at TIMESTAMPTZ NOT NULL,
|
||||
stopped_at TIMESTAMPTZ NOT NULL,
|
||||
duration INT NOT NULL,
|
||||
flaky BOOLEAN NOT NULL,
|
||||
build_type TEXT NOT NULL,
|
||||
pg_version INT NOT NULL,
|
||||
run_id BIGINT NOT NULL,
|
||||
run_attempt INT NOT NULL,
|
||||
reference TEXT NOT NULL,
|
||||
revision CHAR(40) NOT NULL,
|
||||
raw JSONB COMPRESSION lz4 NOT NULL,
|
||||
UNIQUE (parent_suite, suite, name, build_type, pg_version, started_at, stopped_at, run_id)
|
||||
);
|
||||
"""
|
||||
|
||||
|
||||
@dataclass
|
||||
class Row:
|
||||
parent_suite: str
|
||||
suite: str
|
||||
name: str
|
||||
status: str
|
||||
started_at: datetime
|
||||
stopped_at: datetime
|
||||
duration: int
|
||||
flaky: bool
|
||||
build_type: str
|
||||
pg_version: int
|
||||
run_id: int
|
||||
run_attempt: int
|
||||
reference: str
|
||||
revision: str
|
||||
raw: str
|
||||
|
||||
|
||||
TEST_NAME_RE = re.compile(r"[\[-](?P<build_type>debug|release)-pg(?P<pg_version>\d+)[-\]]")
|
||||
|
||||
|
||||
def err(msg):
|
||||
print(f"error: {msg}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def get_connection_cursor(connstr: str):
|
||||
@backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150)
|
||||
def connect(connstr):
|
||||
conn = psycopg2.connect(connstr, connect_timeout=30)
|
||||
conn.autocommit = True
|
||||
return conn
|
||||
|
||||
conn = connect(connstr)
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
yield cur
|
||||
finally:
|
||||
if conn is not None:
|
||||
conn.close()
|
||||
|
||||
|
||||
def create_table(cur):
|
||||
cur.execute(CREATE_TABLE)
|
||||
|
||||
|
||||
def parse_test_name(test_name: str) -> Tuple[str, int, str]:
|
||||
build_type, pg_version = None, None
|
||||
if match := TEST_NAME_RE.search(test_name):
|
||||
found = match.groupdict()
|
||||
build_type = found["build_type"]
|
||||
pg_version = int(found["pg_version"])
|
||||
else:
|
||||
# It's ok, we embed BUILD_TYPE and Postgres Version into the test name only for regress suite and do not for other suites (like performance)
|
||||
build_type = "release"
|
||||
pg_version = 14
|
||||
|
||||
unparametrized_name = re.sub(rf"{build_type}-pg{pg_version}-?", "", test_name).replace("[]", "")
|
||||
|
||||
return build_type, pg_version, unparametrized_name
|
||||
|
||||
|
||||
def ingest_test_result(
|
||||
cur,
|
||||
reference: str,
|
||||
revision: str,
|
||||
run_id: int,
|
||||
run_attempt: int,
|
||||
test_cases_dir: Path,
|
||||
):
|
||||
rows = []
|
||||
for f in test_cases_dir.glob("*.json"):
|
||||
test = json.loads(f.read_text())
|
||||
# Drop unneded fields from raw data
|
||||
raw = test.copy()
|
||||
raw.pop("parameterValues")
|
||||
raw.pop("labels")
|
||||
raw.pop("extra")
|
||||
|
||||
build_type, pg_version, unparametrized_name = parse_test_name(test["name"])
|
||||
labels = {label["name"]: label["value"] for label in test["labels"]}
|
||||
row = Row(
|
||||
parent_suite=labels["parentSuite"],
|
||||
suite=labels["suite"],
|
||||
name=unparametrized_name,
|
||||
status=test["status"],
|
||||
started_at=datetime.fromtimestamp(test["time"]["start"] / 1000, tz=timezone.utc),
|
||||
stopped_at=datetime.fromtimestamp(test["time"]["stop"] / 1000, tz=timezone.utc),
|
||||
duration=test["time"]["duration"],
|
||||
flaky=test["flaky"] or test["retriesStatusChange"],
|
||||
build_type=build_type,
|
||||
pg_version=pg_version,
|
||||
run_id=run_id,
|
||||
run_attempt=run_attempt,
|
||||
reference=reference,
|
||||
revision=revision,
|
||||
raw=json.dumps(raw),
|
||||
)
|
||||
rows.append(dataclasses.astuple(row))
|
||||
|
||||
columns = ",".join(f.name for f in dataclasses.fields(Row))
|
||||
query = f"INSERT INTO results ({columns}) VALUES %s ON CONFLICT DO NOTHING"
|
||||
execute_values(cur, query, rows)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Regress test result uploader. \
|
||||
Database connection string should be provided via DATABASE_URL environment variable",
|
||||
)
|
||||
parser.add_argument("--initdb", action="store_true", help="Initialuze database")
|
||||
parser.add_argument(
|
||||
"--reference", type=str, required=True, help="git reference, for example refs/heads/main"
|
||||
)
|
||||
parser.add_argument("--revision", type=str, required=True, help="git revision")
|
||||
parser.add_argument("--run-id", type=int, required=True, help="GitHub Workflow run id")
|
||||
parser.add_argument(
|
||||
"--run-attempt", type=int, required=True, help="GitHub Workflow run attempt"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--test-cases-dir",
|
||||
type=Path,
|
||||
required=True,
|
||||
help="Path to a dir with extended test cases data",
|
||||
)
|
||||
|
||||
connstr = os.getenv("DATABASE_URL", "")
|
||||
if not connstr:
|
||||
err("DATABASE_URL environment variable is not set")
|
||||
|
||||
args = parser.parse_args()
|
||||
with get_connection_cursor(connstr) as cur:
|
||||
if args.initdb:
|
||||
create_table(cur)
|
||||
|
||||
if not args.test_cases_dir.exists():
|
||||
err(f"test-cases dir {args.test_cases_dir} does not exist")
|
||||
|
||||
if not args.test_cases_dir.is_dir():
|
||||
err(f"test-cases dir {args.test_cases_dir} it not a directory")
|
||||
|
||||
ingest_test_result(
|
||||
cur,
|
||||
reference=args.reference,
|
||||
revision=args.revision,
|
||||
run_id=args.run_id,
|
||||
run_attempt=args.run_attempt,
|
||||
test_cases_dir=args.test_cases_dir,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.getLogger("backoff").addHandler(logging.StreamHandler())
|
||||
main()
|
||||
@@ -32,6 +32,7 @@ import requests
|
||||
from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from mypy_boto3_s3 import S3Client
|
||||
|
||||
# Type-related stuff
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
@@ -432,7 +433,7 @@ class NeonEnvBuilder:
|
||||
self.port_distributor = port_distributor
|
||||
self.remote_storage = remote_storage
|
||||
self.ext_remote_storage: Optional[S3Storage] = None
|
||||
self.remote_storage_client: Optional[Any] = None
|
||||
self.remote_storage_client: Optional[S3Client] = None
|
||||
self.remote_storage_users = remote_storage_users
|
||||
self.broker = broker
|
||||
self.run_id = run_id
|
||||
@@ -875,7 +876,14 @@ class NeonEnv:
|
||||
|
||||
def timeline_dir(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
||||
"""Get a timeline directory's path based on the repo directory of the test environment"""
|
||||
return self.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
return self.tenant_dir(tenant_id) / "timelines" / str(timeline_id)
|
||||
|
||||
def tenant_dir(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
) -> Path:
|
||||
"""Get a tenant directory's path based on the repo directory of the test environment"""
|
||||
return self.repo_dir / "tenants" / str(tenant_id)
|
||||
|
||||
def get_pageserver_version(self) -> str:
|
||||
bin_pageserver = str(self.neon_binpath / "pageserver")
|
||||
@@ -1518,6 +1526,8 @@ class NeonPageserver(PgProtocol):
|
||||
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
|
||||
".*Error processing HTTP request: NotFound: Timeline .* was not found",
|
||||
".*took more than expected to complete.*",
|
||||
# these can happen during shutdown, but it should not be a reason to fail a test
|
||||
".*completed, took longer than expected.*",
|
||||
]
|
||||
|
||||
def start(
|
||||
@@ -2817,8 +2827,15 @@ def check_restored_datadir_content(
|
||||
endpoint: Endpoint,
|
||||
):
|
||||
# Get the timeline ID. We need it for the 'basebackup' command
|
||||
timeline = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
|
||||
timeline_id = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
|
||||
|
||||
# many tests already checkpoint, but do it just in case
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CHECKPOINT")
|
||||
|
||||
# wait for pageserver to catch up
|
||||
wait_for_last_flush_lsn(env, endpoint, endpoint.tenant_id, timeline_id)
|
||||
# stop postgres to ensure that files won't change
|
||||
endpoint.stop()
|
||||
|
||||
@@ -2833,7 +2850,7 @@ def check_restored_datadir_content(
|
||||
{psql_path} \
|
||||
--no-psqlrc \
|
||||
postgres://localhost:{env.pageserver.service_port.pg} \
|
||||
-c 'basebackup {endpoint.tenant_id} {timeline}' \
|
||||
-c 'basebackup {endpoint.tenant_id} {timeline_id}' \
|
||||
| tar -x -C {restored_dir_path}
|
||||
"""
|
||||
|
||||
|
||||
@@ -210,6 +210,10 @@ class PageserverHttpClient(requests.Session):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_delete(self, tenant_id: TenantId):
|
||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_load(self, tenant_id: TenantId):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/load")
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import time
|
||||
from typing import Any, Dict, Optional
|
||||
from typing import TYPE_CHECKING, Any, Dict, Optional
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
from fixtures.remote_storage import RemoteStorageKind, S3Storage
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
def assert_tenant_state(
|
||||
@@ -17,15 +19,6 @@ def assert_tenant_state(
|
||||
assert tenant_status["state"]["slug"] == expected_state, message or tenant_status
|
||||
|
||||
|
||||
def tenant_exists(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
|
||||
tenants = pageserver_http.tenant_list()
|
||||
matching = [t for t in tenants if TenantId(t["id"]) == tenant_id]
|
||||
assert len(matching) < 2
|
||||
if len(matching) == 0:
|
||||
return None
|
||||
return matching[0]
|
||||
|
||||
|
||||
def remote_consistent_lsn(
|
||||
pageserver_http: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
|
||||
) -> Lsn:
|
||||
@@ -199,20 +192,19 @@ def wait_timeline_detail_404(
|
||||
timeline_id: TimelineId,
|
||||
iterations: int,
|
||||
):
|
||||
last_exc = None
|
||||
for _ in range(iterations):
|
||||
time.sleep(0.250)
|
||||
def timeline_is_missing():
|
||||
data = {}
|
||||
try:
|
||||
data = pageserver_http.timeline_detail(tenant_id, timeline_id)
|
||||
log.info(f"detail {data}")
|
||||
log.info(f"timeline detail {data}")
|
||||
except PageserverApiException as e:
|
||||
log.debug(e)
|
||||
if e.status_code == 404:
|
||||
return
|
||||
|
||||
last_exc = e
|
||||
raise RuntimeError(f"Timeline exists state {data.get('state')}")
|
||||
|
||||
raise last_exc or RuntimeError(f"Timeline wasnt deleted in time, state: {data['state']}")
|
||||
wait_until(iterations, interval=0.250, func=timeline_is_missing)
|
||||
|
||||
|
||||
def timeline_delete_wait_completed(
|
||||
@@ -224,3 +216,72 @@ def timeline_delete_wait_completed(
|
||||
):
|
||||
pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args)
|
||||
wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id, iterations)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
# TODO avoid by combining remote storage related stuff in single type
|
||||
# and just passing in this type instead of whole builder
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None):
|
||||
# For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api.
|
||||
assert neon_env_builder.remote_storage_kind in (
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
RemoteStorageKind.REAL_S3,
|
||||
)
|
||||
# For mypy
|
||||
assert isinstance(neon_env_builder.remote_storage, S3Storage)
|
||||
assert neon_env_builder.remote_storage_client is not None
|
||||
|
||||
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
|
||||
response = neon_env_builder.remote_storage_client.list_objects_v2(
|
||||
Bucket=neon_env_builder.remote_storage.bucket_name,
|
||||
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
|
||||
)
|
||||
objects = response.get("Contents")
|
||||
assert (
|
||||
response["KeyCount"] == 0
|
||||
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
|
||||
|
||||
|
||||
def wait_tenant_status_404(
|
||||
pageserver_http: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
iterations: int,
|
||||
interval: float = 0.250,
|
||||
):
|
||||
def tenant_is_missing():
|
||||
data = {}
|
||||
try:
|
||||
data = pageserver_http.tenant_status(tenant_id)
|
||||
log.info(f"tenant status {data}")
|
||||
except PageserverApiException as e:
|
||||
log.debug(e)
|
||||
if e.status_code == 404:
|
||||
return
|
||||
|
||||
raise RuntimeError(f"Timeline exists state {data.get('state')}")
|
||||
|
||||
wait_until(iterations, interval=interval, func=tenant_is_missing)
|
||||
|
||||
|
||||
def tenant_delete_wait_completed(
|
||||
pageserver_http: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
iterations: int,
|
||||
):
|
||||
pageserver_http.tenant_delete(tenant_id=tenant_id)
|
||||
wait_tenant_status_404(pageserver_http, tenant_id=tenant_id, iterations=iterations)
|
||||
|
||||
|
||||
MANY_SMALL_LAYERS_TENANT_CONFIG = {
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
"checkpoint_distance": f"{1024**2}",
|
||||
"image_creation_threshold": "100",
|
||||
}
|
||||
|
||||
|
||||
def poll_for_remote_storage_iterations(remote_storage_kind: RemoteStorageKind) -> int:
|
||||
return 20 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 6
|
||||
|
||||
@@ -6,13 +6,16 @@ import subprocess
|
||||
import tarfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, Tuple, TypeVar
|
||||
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple, TypeVar
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import allure
|
||||
from psycopg2.extensions import cursor
|
||||
|
||||
from fixtures.log_helper import log
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import PgBin
|
||||
from fixtures.types import TimelineId
|
||||
|
||||
Fn = TypeVar("Fn", bound=Callable[..., Any])
|
||||
@@ -300,17 +303,13 @@ def wait_until(number_of_iterations: int, interval: float, func: Fn):
|
||||
raise Exception("timed out while waiting for %s" % func) from last_exception
|
||||
|
||||
|
||||
def wait_while(number_of_iterations: int, interval: float, func):
|
||||
def run_pg_bench_small(pg_bin: "PgBin", connstr: str):
|
||||
"""
|
||||
Wait until 'func' returns false, or throws an exception.
|
||||
Fast way to populate data.
|
||||
For more layers consider combining with these tenant settings:
|
||||
{
|
||||
"checkpoint_distance": 1024 ** 2,
|
||||
"image_creation_threshold": 100,
|
||||
}
|
||||
"""
|
||||
for i in range(number_of_iterations):
|
||||
try:
|
||||
if not func():
|
||||
return
|
||||
log.info("waiting for %s iteration %s failed", func, i + 1)
|
||||
time.sleep(interval)
|
||||
continue
|
||||
except Exception:
|
||||
return
|
||||
raise Exception("timed out while waiting for %s" % func)
|
||||
pg_bin.run(["pgbench", "-i", "-I dtGvp", "-s1", connstr])
|
||||
|
||||
@@ -1,24 +1,12 @@
|
||||
{
|
||||
"public_extensions": [
|
||||
"anon",
|
||||
"pg_buffercache"
|
||||
],
|
||||
"library_index": {
|
||||
"anon": "anon",
|
||||
"pg_buffercache": "pg_buffercache"
|
||||
},
|
||||
"public_extensions": [ "weighted_mean" ],
|
||||
"library_index": { "weighted_mean": "weighted_mean" },
|
||||
"extension_data": {
|
||||
"pg_buffercache": {
|
||||
"weighted_mean": {
|
||||
"control_data": {
|
||||
"pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
|
||||
"weighted_mean.control": "# weighted_mean extension\ncomment = 'A weighted mean aggregate function extension'\ndefault_version = '1.0.1'\nmodule_pathname = '$libdir/weighted_mean'\nrelocatable = true\ntrusted = true\n"
|
||||
},
|
||||
"archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
|
||||
},
|
||||
"anon": {
|
||||
"control_data": {
|
||||
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
|
||||
},
|
||||
"archive_path": "5670669815/v14/extensions/anon.tar.zst"
|
||||
"archive_path": "5670669815/v14/extensions/weighted_mean.tar.zst"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,17 +1,16 @@
|
||||
{
|
||||
"public_extensions": [
|
||||
"anon"
|
||||
"weighted_mean"
|
||||
],
|
||||
"library_index": {
|
||||
"anon": "anon"
|
||||
"weighted_mean": "weighted_mean"
|
||||
},
|
||||
"extension_data": {
|
||||
"anon": {
|
||||
"weighted_mean": {
|
||||
"control_data": {
|
||||
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
|
||||
"weighted_mean.control": "# weighted_mean extension\ncomment = 'A weighted mean aggregate function extension'\ndefault_version = '1.0.1'\nmodule_pathname = '$libdir/weighted_mean'\nrelocatable = true\ntrusted = true\n"
|
||||
},
|
||||
"archive_path": "5670669815/v15/extensions/anon.tar.zst"
|
||||
"archive_path": "5670669815/v15/extensions/weighted_mean.tar.zst"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Binary file not shown.
Binary file not shown.
51
test_runner/regress/data/extension_test/README.md
Normal file
51
test_runner/regress/data/extension_test/README.md
Normal file
@@ -0,0 +1,51 @@
|
||||
|
||||
This is some data for testing remote extensions
|
||||
It was generated by this docker code
|
||||
Basically just make installing the extension.
|
||||
|
||||
```
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "weighted-mean-pg-build"
|
||||
# compile weighted-mean extension
|
||||
# Note: Please do not include this in the compute image,
|
||||
# it is used for testing remote extension functionality
|
||||
# so the extension *must* not be part of the compute image already
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS weighted-mean-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
ENV PATH "/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://github.com/Kozea/weighted_mean/archive/master.tar.gz -O weighted_mean.tar.gz &&\
|
||||
mkdir weighted_mean-src && cd weighted_mean-src && tar xvzf ../weighted_mean.tar.gz --strip-components=1 -C . && \
|
||||
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt &&\
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/weighted_mean.control && \
|
||||
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /after.txt &&\
|
||||
mkdir -p /extensions/weighted_mean && cp /usr/local/pgsql/share/extension/weighted_mean.control /extensions/weighted_mean && \
|
||||
sort -o /before.txt /before.txt && sort -o /after.txt /after.txt && \
|
||||
comm -13 /before.txt /after.txt | tar --directory=/usr/local/pgsql --zstd -cf /extensions/weighted_mean.tar.zst -T -
|
||||
```
|
||||
|
||||
weighted_mean
|
||||
|
||||
Copyright (c) Ronan Dunklau 2012-2015
|
||||
|
||||
Permission to use, copy, modify, and distribute this software and its
|
||||
documentation for any purpose, without fee, and without a written agreement
|
||||
is hereby granted, provided that the above copyright notice and this
|
||||
paragraph and the following two paragraphs appear in all copies.
|
||||
|
||||
IN NO EVENT SHALL RONAN DUNKLAU BE LIABLE TO ANY PARTY FOR
|
||||
DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
|
||||
LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
|
||||
DOCUMENTATION, EVEN IF RONAN DUNKLAU HAS BEEN ADVISED OF THE
|
||||
POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
RONAN DUNKLAU SPECIFICALLY DISCLAIMS ANY WARRANTIES,
|
||||
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
|
||||
AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
|
||||
ON AN "AS IS" BASIS, AND RONAN DUNKLAU HAS NO OBLIGATIONS TO
|
||||
PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
|
||||
|
||||
|
||||
@@ -1,64 +1,23 @@
|
||||
import os
|
||||
import shutil
|
||||
import threading
|
||||
import uuid
|
||||
from contextlib import closing
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.broker import NeonBroker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import RemoteStorageKind, available_s3_storages
|
||||
|
||||
|
||||
# Cleaning up downloaded files is important for local tests
|
||||
# or else one test could reuse the files from another test or another test run
|
||||
def cleanup(pg_version):
|
||||
PGDIR = Path(f"pg_install/v{pg_version}")
|
||||
|
||||
LIB_DIR = PGDIR / Path("lib/postgresql")
|
||||
cleanup_lib_globs = ["anon*", "postgis*", "pg_buffercache*"]
|
||||
cleanup_lib_glob_paths = [LIB_DIR.glob(x) for x in cleanup_lib_globs]
|
||||
|
||||
SHARE_DIR = PGDIR / Path("share/postgresql/extension")
|
||||
cleanup_ext_globs = [
|
||||
"anon*",
|
||||
"address_standardizer*",
|
||||
"postgis*",
|
||||
"pageinspect*",
|
||||
"pg_buffercache*",
|
||||
"pgrouting*",
|
||||
]
|
||||
cleanup_ext_glob_paths = [SHARE_DIR.glob(x) for x in cleanup_ext_globs]
|
||||
|
||||
all_glob_paths = cleanup_lib_glob_paths + cleanup_ext_glob_paths
|
||||
all_cleanup_files = []
|
||||
for file_glob in all_glob_paths:
|
||||
for file in file_glob:
|
||||
all_cleanup_files.append(file)
|
||||
|
||||
for file in all_cleanup_files:
|
||||
try:
|
||||
os.remove(file)
|
||||
log.info(f"removed file {file}")
|
||||
except Exception as err:
|
||||
log.info(
|
||||
f"skipping remove of file {file} because it doesn't exist.\
|
||||
this may be expected or unexpected depending on the test {err}"
|
||||
)
|
||||
|
||||
cleanup_folders = [SHARE_DIR / Path("anon"), PGDIR / Path("download_extensions")]
|
||||
for folder in cleanup_folders:
|
||||
try:
|
||||
shutil.rmtree(folder)
|
||||
log.info(f"removed folder {folder}")
|
||||
except Exception as err:
|
||||
log.info(
|
||||
f"skipping remove of folder {folder} because it doesn't exist.\
|
||||
this may be expected or unexpected depending on the test {err}"
|
||||
)
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import (
|
||||
MockS3Server,
|
||||
RemoteStorageKind,
|
||||
available_s3_storages,
|
||||
)
|
||||
|
||||
|
||||
def upload_files(env):
|
||||
@@ -80,126 +39,134 @@ def upload_files(env):
|
||||
os.chdir("../../../..")
|
||||
|
||||
|
||||
# creates the weighted_mean extension and runs test queries to verify it works
|
||||
def weighted_mean_test(endpoint, message: Optional[str] = None, create: bool = True):
|
||||
if message is not None:
|
||||
log.info(message)
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Check that appropriate control files were downloaded
|
||||
cur.execute("SELECT * FROM pg_available_extensions")
|
||||
all_extensions = [x[0] for x in cur.fetchall()]
|
||||
log.info(all_extensions)
|
||||
assert "weighted_mean" in all_extensions
|
||||
|
||||
# test is from: https://github.com/Kozea/weighted_mean/tree/master/test
|
||||
if create:
|
||||
cur.execute("CREATE extension weighted_mean")
|
||||
cur.execute(
|
||||
"create temp table test as (\
|
||||
select a::numeric, b::numeric\
|
||||
from\
|
||||
generate_series(1, 100) as a(a),\
|
||||
generate_series(1, 100) as b(b));"
|
||||
)
|
||||
cur.execute("select weighted_mean(a,b) from test;")
|
||||
x = cur.fetchone()
|
||||
log.info(x)
|
||||
assert str(x) == "(Decimal('50.5000000000000000'),)"
|
||||
cur.execute("update test set b = 0;")
|
||||
cur.execute("select weighted_mean(a,b) from test;")
|
||||
x = cur.fetchone()
|
||||
log.info(x)
|
||||
assert str(x) == "(Decimal('0'),)"
|
||||
|
||||
|
||||
# Test downloading remote extension.
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
|
||||
def test_remote_extensions(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
pg_version: PgVersion,
|
||||
test_output_dir: Path,
|
||||
port_distributor: PortDistributor,
|
||||
mock_s3_server: MockS3Server,
|
||||
neon_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
default_broker: NeonBroker,
|
||||
run_id: uuid.UUID,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_remote_extensions",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
|
||||
alt_pgdir = test_output_dir / "pg_install"
|
||||
log.info(f"Copying {pg_distrib_dir} (which is big) to {alt_pgdir}")
|
||||
shutil.copytree(pg_distrib_dir / pg_version.v_prefixed, alt_pgdir / pg_version.v_prefixed)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
assert env.remote_storage_client is not None # satisfy mypy
|
||||
with NeonEnvBuilder(
|
||||
repo_dir=test_output_dir / "repo",
|
||||
port_distributor=port_distributor,
|
||||
mock_s3_server=mock_s3_server,
|
||||
neon_binpath=neon_binpath,
|
||||
pg_distrib_dir=alt_pgdir,
|
||||
pg_version=pg_version,
|
||||
broker=default_broker,
|
||||
run_id=run_id,
|
||||
preserve_database_files=False,
|
||||
) as neon_env_builder:
|
||||
log.info(port_distributor)
|
||||
log.info(mock_s3_server)
|
||||
log.info(neon_binpath)
|
||||
log.info(pg_distrib_dir)
|
||||
log.info(pg_version)
|
||||
log.info(default_broker)
|
||||
log.info(run_id)
|
||||
|
||||
# For MOCK_S3 we upload test files.
|
||||
# For REAL_S3 we use the files already in the bucket
|
||||
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
test_name="test_remote_extensions",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
upload_files(env)
|
||||
|
||||
# Start a compute node and check that it can download the extensions
|
||||
# and use them to CREATE EXTENSION and LOAD
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_remote_extensions",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
# config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
try:
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Check that appropriate control files were downloaded
|
||||
cur.execute("SELECT * FROM pg_available_extensions")
|
||||
all_extensions = [x[0] for x in cur.fetchall()]
|
||||
log.info(all_extensions)
|
||||
assert "anon" in all_extensions
|
||||
# Start a compute node and check that it can download the extensions
|
||||
# and use them to CREATE EXTENSION and LOAD
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_remote_extensions",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
# config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
weighted_mean_test(endpoint)
|
||||
|
||||
# postgis is on real s3 but not mock s3.
|
||||
# it's kind of a big file, would rather not upload to github
|
||||
if remote_storage_kind == RemoteStorageKind.REAL_S3:
|
||||
assert "postgis" in all_extensions
|
||||
# this may fail locally if dependency is missing
|
||||
# we don't really care about the error,
|
||||
# we just want to make sure it downloaded
|
||||
try:
|
||||
cur.execute("CREATE EXTENSION postgis")
|
||||
except Exception as err:
|
||||
log.info(f"(expected) error creating postgis extension: {err}")
|
||||
# we do not check the error, so this is basically a NO-OP
|
||||
# however checking the log you can make sure that it worked
|
||||
# and also get valuable information about how long loading the extension took
|
||||
# # Test that extension is downloaded after endpoint restart,
|
||||
# # when the library is used in the query.
|
||||
# #
|
||||
# # Run the test with mutliple simultaneous connections to an endpoint.
|
||||
# # to ensure that the extension is downloaded only once.
|
||||
|
||||
# this is expected to fail on my computer because I don't have the pgcrypto extension
|
||||
try:
|
||||
cur.execute("CREATE EXTENSION anon")
|
||||
except Exception as err:
|
||||
log.info("error creating anon extension")
|
||||
assert "pgcrypto" in str(err), "unexpected error creating anon extension"
|
||||
finally:
|
||||
cleanup(pg_version)
|
||||
# import subprocess
|
||||
# import threading
|
||||
# # shutdown compute node
|
||||
# endpoint.stop()
|
||||
# # remove extension files locally
|
||||
# SHAREDIR = subprocess.check_output(
|
||||
# [alt_pgdir / f"{pg_version.v_prefixed}/bin/pg_config", "--sharedir"]
|
||||
# )
|
||||
# SHAREDIRPATH = Path(SHAREDIR.decode("utf-8").strip())
|
||||
# log.info("SHAREDIRPATH: %s", SHAREDIRPATH)
|
||||
# (SHAREDIRPATH / "extension/weighted_mean--1.0.1.sql").unlink()
|
||||
# (SHAREDIRPATH / "extension/weighted_mean.control").unlink()
|
||||
|
||||
# # spin up compute node again (there are no extension files available, because compute is stateless)
|
||||
# endpoint = env.endpoints.create_start(
|
||||
# "test_remote_extensions",
|
||||
# tenant_id=tenant_id,
|
||||
# remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
# # config_lines=["log_min_messages=debug3"],
|
||||
# )
|
||||
|
||||
# Test downloading remote library.
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
|
||||
def test_remote_library(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_remote_library",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_remote_library", tenant_id=tenant_id)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
assert env.remote_storage_client is not None # satisfy mypy
|
||||
|
||||
# For MOCK_S3 we upload test files.
|
||||
# For REAL_S3 we use the files already in the bucket
|
||||
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
upload_files(env)
|
||||
|
||||
# and use them to run LOAD library
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_remote_library",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
# config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
try:
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# try to load library
|
||||
try:
|
||||
cur.execute("LOAD 'anon'")
|
||||
except Exception as err:
|
||||
log.info(f"error loading anon library: {err}")
|
||||
raise AssertionError("unexpected error loading anon library") from err
|
||||
|
||||
# test library which name is different from extension name
|
||||
# this may fail locally if dependency is missing
|
||||
# however, it does successfully download the postgis archive
|
||||
if remote_storage_kind == RemoteStorageKind.REAL_S3:
|
||||
try:
|
||||
cur.execute("LOAD 'postgis_topology-3'")
|
||||
except Exception as err:
|
||||
log.info("error loading postgis_topology-3")
|
||||
assert "No such file or directory" in str(
|
||||
err
|
||||
), "unexpected error loading postgis_topology-3"
|
||||
finally:
|
||||
cleanup(pg_version)
|
||||
# # connect to compute node and run the query
|
||||
# # that will trigger the download of the extension
|
||||
# threads = [
|
||||
# threading.Thread(
|
||||
# target=weighted_mean_test, args=(endpoint, f"this is thread {i}", False)
|
||||
# )
|
||||
# for i in range(2)
|
||||
# ]
|
||||
# for thread in threads:
|
||||
# thread.start()
|
||||
# for thread in threads:
|
||||
# thread.join()
|
||||
|
||||
|
||||
# Here we test a complex extension
|
||||
@@ -210,114 +177,56 @@ def test_remote_library(
|
||||
reason="skipping test because real s3 not enabled",
|
||||
)
|
||||
def test_multiple_extensions_one_archive(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_version: PgVersion,
|
||||
test_output_dir: Path,
|
||||
port_distributor: PortDistributor,
|
||||
mock_s3_server: MockS3Server,
|
||||
neon_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
default_broker: NeonBroker,
|
||||
run_id: uuid.UUID,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.REAL_S3,
|
||||
test_name="test_multiple_extensions_one_archive",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_multiple_extensions_one_archive", tenant_id=tenant_id)
|
||||
alt_pgdir = test_output_dir / "pg_install"
|
||||
log.info(f"Copying {pg_distrib_dir} (which is big) to {alt_pgdir}.")
|
||||
shutil.copytree(pg_distrib_dir / pg_version.v_prefixed, alt_pgdir / pg_version.v_prefixed)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
assert env.remote_storage_client is not None # satisfy mypy
|
||||
with NeonEnvBuilder(
|
||||
repo_dir=test_output_dir / "repo",
|
||||
port_distributor=port_distributor,
|
||||
mock_s3_server=mock_s3_server,
|
||||
neon_binpath=neon_binpath,
|
||||
pg_distrib_dir=alt_pgdir,
|
||||
pg_version=pg_version,
|
||||
broker=default_broker,
|
||||
run_id=run_id,
|
||||
preserve_database_files=False,
|
||||
) as neon_env_builder:
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.REAL_S3,
|
||||
test_name="test_multiple_extensions_one_archive",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_multiple_extensions_one_archive", tenant_id=tenant_id)
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_multiple_extensions_one_archive",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
)
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE EXTENSION address_standardizer;")
|
||||
cur.execute("CREATE EXTENSION address_standardizer_data_us;")
|
||||
# execute query to ensure that it works
|
||||
cur.execute(
|
||||
"SELECT house_num, name, suftype, city, country, state, unit \
|
||||
FROM standardize_address('us_lex', 'us_gaz', 'us_rules', \
|
||||
'One Rust Place, Boston, MA 02109');"
|
||||
)
|
||||
res = cur.fetchall()
|
||||
log.info(res)
|
||||
assert len(res) > 0
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
|
||||
cleanup(pg_version)
|
||||
|
||||
|
||||
# Test that extension is downloaded after endpoint restart,
|
||||
# when the library is used in the query.
|
||||
#
|
||||
# Run the test with mutliple simultaneous connections to an endpoint.
|
||||
# to ensure that the extension is downloaded only once.
|
||||
#
|
||||
def test_extension_download_after_restart(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
if "15" in pg_version: # SKIP v15 for now because test set only has extension built for v14
|
||||
return None
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
test_name="test_extension_download_after_restart",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_extension_download_after_restart", tenant_id=tenant_id)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
assert env.remote_storage_client is not None # satisfy mypy
|
||||
|
||||
# For MOCK_S3 we upload test files.
|
||||
upload_files(env)
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_extension_download_after_restart",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE extension pg_buffercache;")
|
||||
cur.execute("SELECT * from pg_buffercache;")
|
||||
res = cur.fetchall()
|
||||
assert len(res) > 0
|
||||
log.info(res)
|
||||
|
||||
# shutdown compute node
|
||||
endpoint.stop()
|
||||
# remove extension files locally
|
||||
cleanup(pg_version)
|
||||
|
||||
# spin up compute node again (there are no extension files available, because compute is stateless)
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_extension_download_after_restart",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
|
||||
# connect to compute node and run the query
|
||||
# that will trigger the download of the extension
|
||||
def run_query(endpoint, thread_id: int):
|
||||
log.info("thread_id {%d} starting", thread_id)
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_multiple_extensions_one_archive",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
)
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT * from pg_buffercache;")
|
||||
cur.execute("CREATE EXTENSION address_standardizer;")
|
||||
cur.execute("CREATE EXTENSION address_standardizer_data_us;")
|
||||
# execute query to ensure that it works
|
||||
cur.execute(
|
||||
"SELECT house_num, name, suftype, city, country, state, unit \
|
||||
FROM standardize_address('us_lex', 'us_gaz', 'us_rules', \
|
||||
'One Rust Place, Boston, MA 02109');"
|
||||
)
|
||||
res = cur.fetchall()
|
||||
log.info(res)
|
||||
assert len(res) > 0
|
||||
log.info("thread_id {%d}, res = %s", thread_id, res)
|
||||
|
||||
threads = [threading.Thread(target=run_query, args=(endpoint, i)) for i in range(2)]
|
||||
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
cleanup(pg_version)
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
@pytest.mark.timeout(1800)
|
||||
def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
|
||||
@@ -23,7 +23,6 @@ from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import subprocess_capture
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
|
||||
# Put data in vanilla pg
|
||||
vanilla_pg.start()
|
||||
@@ -163,7 +162,6 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
assert endpoint.safe_psql("select count(*) from t") == [(300000,)]
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
def test_import_from_pageserver_small(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -3,15 +3,11 @@
|
||||
#
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
|
||||
|
||||
# Run the main PostgreSQL regression tests, in src/test/regress.
|
||||
#
|
||||
# This runs for a long time, especially in debug mode, so use a larger-than-default
|
||||
# timeout.
|
||||
@pytest.mark.timeout(1800)
|
||||
def test_pg_regress(
|
||||
neon_simple_env: NeonEnv,
|
||||
test_output_dir: Path,
|
||||
@@ -60,18 +56,11 @@ def test_pg_regress(
|
||||
with capsys.disabled():
|
||||
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
|
||||
|
||||
# checkpoint one more time to ensure that the lsn we get is the latest one
|
||||
endpoint.safe_psql("CHECKPOINT")
|
||||
|
||||
# Check that we restore the content of the datadir correctly
|
||||
check_restored_datadir_content(test_output_dir, env, endpoint)
|
||||
|
||||
|
||||
# Run the PostgreSQL "isolation" tests, in src/test/isolation.
|
||||
#
|
||||
# This runs for a long time, especially in debug mode, so use a larger-than-default
|
||||
# timeout.
|
||||
@pytest.mark.timeout(1800)
|
||||
def test_isolation(
|
||||
neon_simple_env: NeonEnv,
|
||||
test_output_dir: Path,
|
||||
@@ -173,9 +162,4 @@ def test_sql_regress(
|
||||
with capsys.disabled():
|
||||
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
|
||||
|
||||
# checkpoint one more time to ensure that the lsn we get is the latest one
|
||||
endpoint.safe_psql("CHECKPOINT")
|
||||
endpoint.safe_psql("select pg_current_wal_insert_lsn()")[0][0]
|
||||
|
||||
# Check that we restore the content of the datadir correctly
|
||||
check_restored_datadir_content(test_output_dir, env, endpoint)
|
||||
|
||||
@@ -97,6 +97,11 @@ def test_remote_storage_backup_and_restore(
|
||||
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
# Thats because of UnreliableWrapper's injected failures
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*"
|
||||
)
|
||||
|
||||
checkpoint_numbers = range(1, 3)
|
||||
|
||||
for checkpoint_number in checkpoint_numbers:
|
||||
|
||||
@@ -33,8 +33,4 @@ def test_subxacts(neon_simple_env: NeonEnv, test_output_dir):
|
||||
cur.execute(f"insert into t1 values ({i}, {j})")
|
||||
cur.execute("commit")
|
||||
|
||||
# force wal flush
|
||||
cur.execute("checkpoint")
|
||||
|
||||
# Check that we can restore the content of the datadir correctly
|
||||
check_restored_datadir_content(test_output_dir, env, endpoint)
|
||||
|
||||
250
test_runner/regress/test_tenant_delete.py
Normal file
250
test_runner/regress/test_tenant_delete.py
Normal file
@@ -0,0 +1,250 @@
|
||||
import enum
|
||||
import os
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
last_flush_lsn_upload,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
MANY_SMALL_LAYERS_TENANT_CONFIG,
|
||||
assert_prefix_empty,
|
||||
poll_for_remote_storage_iterations,
|
||||
tenant_delete_wait_completed,
|
||||
wait_tenant_status_404,
|
||||
wait_until_tenant_active,
|
||||
wait_until_tenant_state,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
|
||||
from fixtures.types import TenantId
|
||||
from fixtures.utils import run_pg_bench_small
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"remote_storage_kind", [RemoteStorageKind.NOOP, *available_remote_storages()]
|
||||
)
|
||||
def test_tenant_delete_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_tenant_delete_smoke",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# first try to delete non existing tenant
|
||||
tenant_id = TenantId.generate()
|
||||
env.pageserver.allowed_errors.append(f".*NotFound: tenant {tenant_id}.*")
|
||||
with pytest.raises(PageserverApiException, match=f"NotFound: tenant {tenant_id}"):
|
||||
ps_http.tenant_delete(tenant_id=tenant_id)
|
||||
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id=tenant_id,
|
||||
conf=MANY_SMALL_LAYERS_TENANT_CONFIG,
|
||||
)
|
||||
|
||||
# create two timelines one being the parent of another
|
||||
parent = None
|
||||
for timeline in ["first", "second"]:
|
||||
timeline_id = env.neon_cli.create_branch(
|
||||
timeline, tenant_id=tenant_id, ancestor_branch_name=parent
|
||||
)
|
||||
with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint:
|
||||
run_pg_bench_small(pg_bin, endpoint.connstr())
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant=tenant_id, timeline=timeline_id)
|
||||
|
||||
parent = timeline
|
||||
|
||||
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
|
||||
|
||||
tenant_delete_wait_completed(ps_http, tenant_id, iterations)
|
||||
|
||||
tenant_path = env.tenant_dir(tenant_id=tenant_id)
|
||||
assert not tenant_path.exists()
|
||||
|
||||
if remote_storage_kind in [RemoteStorageKind.MOCK_S3, RemoteStorageKind.REAL_S3]:
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(tenant_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class Check(enum.Enum):
|
||||
RETRY_WITHOUT_RESTART = enum.auto()
|
||||
RETRY_WITH_RESTART = enum.auto()
|
||||
|
||||
|
||||
FAILPOINTS = [
|
||||
"tenant-delete-before-shutdown",
|
||||
"tenant-delete-before-create-remote-mark",
|
||||
"tenant-delete-before-create-local-mark",
|
||||
"tenant-delete-before-background",
|
||||
"tenant-delete-before-polling-ongoing-deletions",
|
||||
"tenant-delete-before-cleanup-remaining-fs-traces",
|
||||
"tenant-delete-before-remove-timelines-dir",
|
||||
"tenant-delete-before-remove-deleted-mark",
|
||||
"tenant-delete-before-remove-tenant-dir",
|
||||
# Some failpoints from timeline deletion
|
||||
"timeline-delete-before-index-deleted-at",
|
||||
"timeline-delete-before-rm",
|
||||
"timeline-delete-before-index-delete",
|
||||
"timeline-delete-after-rm-dir",
|
||||
]
|
||||
|
||||
FAILPOINTS_BEFORE_BACKGROUND = [
|
||||
"timeline-delete-before-schedule",
|
||||
"tenant-delete-before-shutdown",
|
||||
"tenant-delete-before-create-remote-mark",
|
||||
"tenant-delete-before-create-local-mark",
|
||||
"tenant-delete-before-background",
|
||||
]
|
||||
|
||||
|
||||
def combinations():
|
||||
result = []
|
||||
|
||||
remotes = [RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3]
|
||||
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"):
|
||||
remotes.append(RemoteStorageKind.REAL_S3)
|
||||
|
||||
for remote_storage_kind in remotes:
|
||||
for delete_failpoint in FAILPOINTS:
|
||||
if remote_storage_kind == RemoteStorageKind.NOOP and delete_failpoint in (
|
||||
"timeline-delete-before-index-delete",
|
||||
):
|
||||
# the above failpoint are not relevant for config without remote storage
|
||||
continue
|
||||
|
||||
result.append((remote_storage_kind, delete_failpoint))
|
||||
return result
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind, failpoint", combinations())
|
||||
@pytest.mark.parametrize("check", list(Check))
|
||||
def test_delete_tenant_exercise_crash_safety_failpoints(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
failpoint: str,
|
||||
check: Check,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind, "test_delete_tenant_exercise_crash_safety_failpoints"
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# From deletion polling
|
||||
f".*NotFound: tenant {env.initial_tenant}.*",
|
||||
# allow errors caused by failpoints
|
||||
f".*failpoint: {failpoint}",
|
||||
# It appears when we stopped flush loop during deletion (attempt) and then pageserver is stopped
|
||||
".*freeze_and_flush_on_shutdown.*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
|
||||
# We may leave some upload tasks in the queue. They're likely deletes.
|
||||
# For uploads we explicitly wait with `last_flush_lsn_upload` below.
|
||||
# So by ignoring these instead of waiting for empty upload queue
|
||||
# we execute more distinct code paths.
|
||||
'.*stopping left-over name="remote upload".*',
|
||||
]
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
timeline_id = env.neon_cli.create_timeline("delete", tenant_id=tenant_id)
|
||||
with env.endpoints.create_start("delete", tenant_id=tenant_id) as endpoint:
|
||||
# generate enough layers
|
||||
run_pg_bench_small(pg_bin, endpoint.connstr())
|
||||
if remote_storage_kind is RemoteStorageKind.NOOP:
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
else:
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
ps_http.configure_failpoints((failpoint, "return"))
|
||||
|
||||
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
|
||||
|
||||
# These failpoints are earlier than background task is spawned.
|
||||
# so they result in api request failure.
|
||||
if failpoint in FAILPOINTS_BEFORE_BACKGROUND:
|
||||
with pytest.raises(PageserverApiException, match=failpoint):
|
||||
ps_http.tenant_delete(tenant_id)
|
||||
|
||||
else:
|
||||
ps_http.tenant_delete(tenant_id)
|
||||
tenant_info = wait_until_tenant_state(
|
||||
pageserver_http=ps_http,
|
||||
tenant_id=tenant_id,
|
||||
expected_state="Broken",
|
||||
iterations=iterations,
|
||||
)
|
||||
|
||||
reason = tenant_info["state"]["data"]["reason"]
|
||||
log.info(f"tenant broken: {reason}")
|
||||
|
||||
# failpoint may not be the only error in the stack
|
||||
assert reason.endswith(f"failpoint: {failpoint}"), reason
|
||||
|
||||
if check is Check.RETRY_WITH_RESTART:
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
if (
|
||||
remote_storage_kind is RemoteStorageKind.NOOP
|
||||
and failpoint == "tenant-delete-before-create-local-mark"
|
||||
):
|
||||
tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations)
|
||||
elif failpoint in (
|
||||
"tenant-delete-before-shutdown",
|
||||
"tenant-delete-before-create-remote-mark",
|
||||
):
|
||||
wait_until_tenant_active(
|
||||
ps_http, tenant_id=tenant_id, iterations=iterations, period=0.25
|
||||
)
|
||||
tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations)
|
||||
else:
|
||||
# Pageserver should've resumed deletion after restart.
|
||||
wait_tenant_status_404(ps_http, tenant_id, iterations=iterations + 10)
|
||||
elif check is Check.RETRY_WITHOUT_RESTART:
|
||||
# this should succeed
|
||||
# this also checks that delete can be retried even when tenant is in Broken state
|
||||
ps_http.configure_failpoints((failpoint, "off"))
|
||||
|
||||
tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations)
|
||||
|
||||
# Check remote is impty
|
||||
if remote_storage_kind is RemoteStorageKind.MOCK_S3:
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(tenant_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
tenant_dir = env.tenant_dir(tenant_id)
|
||||
# Check local is empty
|
||||
assert not tenant_dir.exists()
|
||||
|
||||
|
||||
# TODO test concurrent deletions with "hang" failpoint
|
||||
# TODO test tenant delete continues after attach
|
||||
@@ -66,6 +66,10 @@ def test_tenant_reattach(
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
# Thats because of UnreliableWrapper's injected failures
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*"
|
||||
)
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
with endpoint.cursor() as cur:
|
||||
|
||||
@@ -17,9 +17,9 @@ from fixtures.neon_fixtures import (
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_tenant_state,
|
||||
tenant_exists,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
wait_tenant_status_404,
|
||||
)
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
|
||||
@@ -29,7 +29,6 @@ from fixtures.utils import (
|
||||
start_in_background,
|
||||
subprocess_capture,
|
||||
wait_until,
|
||||
wait_while,
|
||||
)
|
||||
|
||||
|
||||
@@ -269,11 +268,16 @@ def test_tenant_relocation(
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = TenantId("74ee8b079a0e437eb0afea7d26a07209")
|
||||
|
||||
# FIXME: Is this expected?
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
|
||||
)
|
||||
|
||||
# Needed for detach polling.
|
||||
env.pageserver.allowed_errors.append(f".*NotFound: tenant {tenant_id}.*")
|
||||
|
||||
# create folder for remote storage mock
|
||||
remote_storage_mock_path = env.repo_dir / "local_fs_remote_storage"
|
||||
|
||||
@@ -283,9 +287,7 @@ def test_tenant_relocation(
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
tenant_id, initial_timeline_id = env.neon_cli.create_tenant(
|
||||
TenantId("74ee8b079a0e437eb0afea7d26a07209")
|
||||
)
|
||||
_, initial_timeline_id = env.neon_cli.create_tenant(tenant_id)
|
||||
log.info("tenant to relocate %s initial_timeline_id %s", tenant_id, initial_timeline_id)
|
||||
|
||||
env.neon_cli.create_branch("test_tenant_relocation_main", tenant_id=tenant_id)
|
||||
@@ -469,11 +471,8 @@ def test_tenant_relocation(
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
|
||||
# Wait a little, so that the detach operation has time to finish.
|
||||
wait_while(
|
||||
number_of_iterations=100,
|
||||
interval=1,
|
||||
func=lambda: tenant_exists(pageserver_http, tenant_id),
|
||||
)
|
||||
wait_tenant_status_404(pageserver_http, tenant_id, iterations=100, interval=1)
|
||||
|
||||
post_migration_check(ep_main, 500500, old_local_path_main)
|
||||
post_migration_check(ep_second, 1001000, old_local_path_second)
|
||||
|
||||
|
||||
@@ -146,6 +146,11 @@ def test_tenants_attached_after_download(
|
||||
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
# Thats because of UnreliableWrapper's injected failures
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*"
|
||||
)
|
||||
|
||||
for checkpoint_number in range(1, 3):
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute(
|
||||
|
||||
@@ -4,7 +4,6 @@ import queue
|
||||
import shutil
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
@@ -18,6 +17,8 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_prefix_empty,
|
||||
poll_for_remote_storage_iterations,
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
@@ -27,7 +28,6 @@ from fixtures.pageserver.utils import (
|
||||
)
|
||||
from fixtures.remote_storage import (
|
||||
RemoteStorageKind,
|
||||
S3Storage,
|
||||
available_remote_storages,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
@@ -187,10 +187,9 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
||||
8. Retry or restart without the failpoint and check the result.
|
||||
"""
|
||||
|
||||
if remote_storage_kind is not None:
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind, "test_delete_timeline_exercise_crash_safety_failpoints"
|
||||
)
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind, "test_delete_timeline_exercise_crash_safety_failpoints"
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
@@ -231,7 +230,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
||||
|
||||
ps_http.configure_failpoints((failpoint, "return"))
|
||||
|
||||
iterations = 20 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 4
|
||||
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
|
||||
|
||||
# These failpoints are earlier than background task is spawned.
|
||||
# so they result in api request failure.
|
||||
@@ -280,14 +279,14 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
||||
"remote_storage_s3_request_seconds_count",
|
||||
filter={"request_type": "get_object", "result": "err"},
|
||||
).value
|
||||
== 1
|
||||
== 2 # One is missing tenant deletion mark, second is missing index part
|
||||
)
|
||||
assert (
|
||||
m.query_one(
|
||||
"remote_storage_s3_request_seconds_count",
|
||||
filter={"request_type": "get_object", "result": "ok"},
|
||||
).value
|
||||
== 1
|
||||
== 1 # index part for initial timeline
|
||||
)
|
||||
elif check is Check.RETRY_WITHOUT_RESTART:
|
||||
# this should succeed
|
||||
@@ -413,27 +412,6 @@ def test_timeline_resurrection_on_attach(
|
||||
assert all([tl["state"] == "Active" for tl in timelines])
|
||||
|
||||
|
||||
def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str] = None):
|
||||
# For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api.
|
||||
assert neon_env_builder.remote_storage_kind in (
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
RemoteStorageKind.REAL_S3,
|
||||
)
|
||||
# For mypy
|
||||
assert isinstance(neon_env_builder.remote_storage, S3Storage)
|
||||
|
||||
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
|
||||
assert neon_env_builder.remote_storage_client is not None
|
||||
response = neon_env_builder.remote_storage_client.list_objects_v2(
|
||||
Bucket=neon_env_builder.remote_storage.bucket_name,
|
||||
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
|
||||
)
|
||||
objects = response.get("Contents")
|
||||
assert (
|
||||
response["KeyCount"] == 0
|
||||
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
|
||||
|
||||
|
||||
def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
When deleting a timeline, if we succeed in setting the deleted flag remotely
|
||||
|
||||
Reference in New Issue
Block a user