Compare commits

...

19 Commits

Author SHA1 Message Date
Bojan Serafimov
f7efbb2d42 WIP relocation test 2022-07-05 17:12:16 -04:00
Bojan Serafimov
0aff7c9ee9 Merge branch 'main' into projects-migration-complete 2022-07-05 16:24:29 -04:00
Bojan Serafimov
4a2a55d9b2 Add standalone script 2022-07-05 15:07:39 -04:00
Kirill Bulatov
50821c0a3c Return download stream directly from the remote storage API 2022-07-05 21:45:15 +03:00
Andrey Taranik
68adfe0fc8 inventory file fix for neon-stress env 2022-07-05 21:29:03 +04:00
Dmitry Rodionov
cfdf79aceb harden create_empty_timeline
Reorder checks so it checks whether the timeline exists
before writing something to disk, possibly replacing valid content
2022-07-05 16:44:18 +03:00
Bojan Serafimov
99a0a5a19b Add missing empty rels 2022-07-05 09:16:55 -04:00
bojanserafimov
32560e75d2 Enable relocation test (#1974) 2022-07-05 08:27:57 -04:00
Anastasia Lubennikova
263a3ea5e3 Add script export_import_betwen_pageservers.py to migrate projects between pageservers 2022-07-05 15:27:31 +03:00
Heikki Linnakangas
bb69e0920c Do not overwrite an existing image layer.
See github issues #1594 and #1690

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2022-07-05 14:45:31 +03:00
Alexander Bayandin
05f6a1394d Add tests for different Postgres client libraries (#2008)
* Add tests for different postgres clients
* test/fixtures: sanitize test name for test_output_dir
* test/fixtures: do not look for etcd before runtime
* Add workflow for testing Postgres client libraries
2022-07-05 12:22:58 +01:00
Heikki Linnakangas
844832ffe4 Bump vendor/postgres
Contains changes from two PRs in vendor/postgres:
- https://github.com/neondatabase/postgres/pull/163
- https://github.com/neondatabase/postgres/pull/176
2022-07-05 10:55:03 +03:00
bojanserafimov
d29c545b5d Gc/compaction thread pool, take 2 (#1933)
Decrease the number of pageserver threads by running gc and compaction in a blocking tokio thread pool
2022-07-05 02:06:40 -04:00
Kirill Bulatov
6abdb12724 Fix 1.62 Clippy errors 2022-07-04 23:46:37 +03:00
Alexander Bayandin
7898e72990 Remove duplicated checks from LocalEnv 2022-07-04 22:35:00 +03:00
Dmitry Rodionov
65704708fa remove unused imports, make more use of pathlib.Path 2022-07-01 18:56:51 +03:00
Arseny Sher
6100a02d0f Prefix WAL files in s3 with environment name.
It wasn't merged to prod yet, so safe to enable.
2022-07-01 19:21:28 +04:00
Arseny Sher
97fed38213 Fix cadaca010c for older ssh clients. 2022-07-01 19:20:59 +04:00
Arseny Sher
cadaca010c Make ansible to work with storage nodes through teleport from local box. 2022-07-01 16:58:34 +03:00
75 changed files with 2404 additions and 564 deletions

View File

@@ -6,5 +6,7 @@ timeout = 30
[ssh_connection]
ssh_args = -F ./ansible.ssh.cfg
scp_if_ssh = True
# teleport doesn't support sftp yet https://github.com/gravitational/teleport/issues/7127
# and scp neither worked for me
transfer_method = piped
pipelining = True

View File

@@ -1,3 +1,7 @@
# Remove this once https://github.com/gravitational/teleport/issues/10918 is fixed
# (use pre 8.5 option name to cope with old ssh in CI)
PubkeyAcceptedKeyTypes +ssh-rsa-cert-v01@openssh.com
Host tele.zenith.tech
User admin
Port 3023

View File

@@ -12,6 +12,7 @@ pageservers
safekeepers
[storage:vars]
env_name = neon-stress
console_mgmt_base_url = http://neon-stress-console.local
bucket_name = neon-storage-ireland
bucket_region = eu-west-1

View File

@@ -12,6 +12,7 @@ pageservers
safekeepers
[storage:vars]
env_name = prod-1
console_mgmt_base_url = http://console-release.local
bucket_name = zenith-storage-oregon
bucket_region = us-west-2

View File

@@ -13,6 +13,7 @@ pageservers
safekeepers
[storage:vars]
env_name = us-stage
console_mgmt_base_url = http://console-staging.local
bucket_name = zenith-staging-storage-us-east-1
bucket_region = us-east-1

View File

@@ -6,7 +6,7 @@ After=network.target auditd.service
Type=simple
User=safekeeper
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="wal"}'
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ env_name }}/wal"}'
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT

74
.github/workflows/pg_clients.yml vendored Normal file
View File

@@ -0,0 +1,74 @@
name: Test Postgres client libraries
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '23 02 * * *' # run once a day, timezone is utc
workflow_dispatch:
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
test-postgres-client-libs:
runs-on: [ ubuntu-latest ]
steps:
- name: Checkout
uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: 3.9
- name: Install Poetry
uses: snok/install-poetry@v1
- name: Cache poetry deps
id: cache_poetry
uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: v1-${{ runner.os }}-python-deps-${{ hashFiles('poetry.lock') }}
- name: Install Python deps
shell: bash -ex {0}
run: ./scripts/pysync
- name: Run pytest
env:
REMOTE_ENV: 1
BENCHMARK_CONNSTR: "${{ secrets.BENCHMARK_STAGING_CONNSTR }}"
TEST_OUTPUT: /tmp/test_output
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
# this variable will be embedded in perf test report
# and is needed to distinguish different environments
PLATFORM: github-actions-selfhosted
shell: bash -ex {0}
run: |
# Test framework expects we have psql binary;
# but since we don't really need it in this test, let's mock it
mkdir -p "$POSTGRES_DISTRIB_DIR/bin" && touch "$POSTGRES_DISTRIB_DIR/bin/psql";
./scripts/pytest \
--junitxml=$TEST_OUTPUT/junit.xml \
--tb=short \
--verbose \
-m "remote_cluster" \
-rA "test_runner/pg_clients"
- name: Post to a Slack channel
if: failure()
id: slack
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Testing Postgres clients: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -1,3 +1,4 @@
use std::fmt::Write;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::net::{SocketAddr, TcpStream};
@@ -138,9 +139,11 @@ impl Role {
// Now we also support SCRAM-SHA-256 and to preserve compatibility
// we treat all encrypted_password as md5 unless they starts with SCRAM-SHA-256.
if pass.starts_with("SCRAM-SHA-256") {
params.push_str(&format!(" PASSWORD '{}'", pass));
write!(params, " PASSWORD '{pass}'")
.expect("String is documented to not to error during write operations");
} else {
params.push_str(&format!(" PASSWORD 'md5{}'", pass));
write!(params, " PASSWORD 'md5{pass}'")
.expect("String is documented to not to error during write operations");
}
} else {
params.push_str(" PASSWORD NULL");
@@ -158,7 +161,8 @@ impl Database {
/// it may require a proper quoting too.
pub fn to_pg_options(&self) -> String {
let mut params: String = self.options.as_pg_options();
params.push_str(&format!(" OWNER {}", &self.owner.quote()));
write!(params, " OWNER {}", &self.owner.quote())
.expect("String is documented to not to error during write operations");
params
}

View File

@@ -403,16 +403,6 @@ impl LocalEnv {
self.pg_distrib_dir.display()
);
}
for binary in ["pageserver", "safekeeper"] {
if !self.zenith_distrib_dir.join(binary).exists() {
bail!(
"Can't find binary '{}' in zenith distrib dir '{}'",
binary,
self.zenith_distrib_dir.display()
);
}
}
for binary in ["pageserver", "safekeeper"] {
if !self.zenith_distrib_dir.join(binary).exists() {
bail!(
@@ -421,12 +411,6 @@ impl LocalEnv {
);
}
}
if !self.pg_distrib_dir.join("bin/postgres").exists() {
bail!(
"Can't find postgres binary at {}",
self.pg_distrib_dir.display()
);
}
fs::create_dir(&base_path)?;

View File

@@ -12,8 +12,10 @@ use std::{
borrow::Cow,
collections::HashMap,
ffi::OsStr,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
path::{Path, PathBuf},
pin::Pin,
};
use anyhow::{bail, Context};
@@ -70,11 +72,7 @@ pub trait RemoteStorage: Send + Sync {
/// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
/// Returns the metadata, if any was stored with the file previously.
async fn download(
&self,
from: &Self::RemoteObjectId,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>>;
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError>;
/// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
/// Returns the metadata, if any was stored with the file previously.
@@ -83,12 +81,49 @@ pub trait RemoteStorage: Send + Sync {
from: &Self::RemoteObjectId,
start_inclusive: u64,
end_exclusive: Option<u64>,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>>;
) -> Result<Download, DownloadError>;
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()>;
}
pub struct Download {
pub download_stream: Pin<Box<dyn io::AsyncRead + Unpin + Send>>,
/// Extra key-value data, associated with the current remote file.
pub metadata: Option<StorageMetadata>,
}
impl Debug for Download {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Download")
.field("metadata", &self.metadata)
.finish()
}
}
#[derive(Debug)]
pub enum DownloadError {
/// Validation or other error happened due to user input.
BadInput(anyhow::Error),
/// The file was not found in the remote storage.
NotFound,
/// The file was found in the remote storage, but the download failed.
Other(anyhow::Error),
}
impl std::fmt::Display for DownloadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DownloadError::BadInput(e) => {
write!(f, "Failed to download a remote file due to user input: {e}")
}
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e}"),
}
}
}
impl std::error::Error for DownloadError {}
/// Every storage, currently supported.
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
pub enum GenericRemoteStorage {
@@ -180,7 +215,7 @@ pub struct S3Config {
pub concurrency_limit: NonZeroUsize,
}
impl std::fmt::Debug for S3Config {
impl Debug for S3Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("S3Config")
.field("bucket_name", &self.bucket_name)

View File

@@ -17,7 +17,7 @@ use tokio::{
};
use tracing::*;
use crate::path_with_suffix_extension;
use crate::{path_with_suffix_extension, Download, DownloadError};
use super::{strip_path_prefix, RemoteStorage, StorageMetadata};
@@ -192,15 +192,12 @@ impl RemoteStorage for LocalFs {
Ok(())
}
async fn download(
&self,
from: &Self::RemoteObjectId,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>> {
let file_path = self.resolve_in_storage(from)?;
if file_path.exists() && file_path.is_file() {
let mut source = io::BufReader::new(
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError> {
let file_path = self
.resolve_in_storage(from)
.map_err(DownloadError::BadInput)?;
if file_exists(&file_path).map_err(DownloadError::BadInput)? {
let source = io::BufReader::new(
fs::OpenOptions::new()
.read(true)
.open(&file_path)
@@ -210,22 +207,20 @@ impl RemoteStorage for LocalFs {
"Failed to open source file '{}' to use in the download",
file_path.display()
)
})?,
})
.map_err(DownloadError::Other)?,
);
io::copy(&mut source, to).await.with_context(|| {
format!(
"Failed to download file '{}' from the local storage",
file_path.display()
)
})?;
source.flush().await?;
self.read_storage_metadata(&file_path).await
let metadata = self
.read_storage_metadata(&file_path)
.await
.map_err(DownloadError::Other)?;
Ok(Download {
metadata,
download_stream: Box::pin(source),
})
} else {
bail!(
"File '{}' either does not exist or is not a file",
file_path.display()
)
Err(DownloadError::NotFound)
}
}
@@ -234,22 +229,19 @@ impl RemoteStorage for LocalFs {
from: &Self::RemoteObjectId,
start_inclusive: u64,
end_exclusive: Option<u64>,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>> {
) -> Result<Download, DownloadError> {
if let Some(end_exclusive) = end_exclusive {
ensure!(
end_exclusive > start_inclusive,
"Invalid range, start ({}) is bigger then end ({:?})",
start_inclusive,
end_exclusive
);
if end_exclusive <= start_inclusive {
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
};
if start_inclusive == end_exclusive.saturating_sub(1) {
return Ok(None);
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
}
}
let file_path = self.resolve_in_storage(from)?;
if file_path.exists() && file_path.is_file() {
let file_path = self
.resolve_in_storage(from)
.map_err(DownloadError::BadInput)?;
if file_exists(&file_path).map_err(DownloadError::BadInput)? {
let mut source = io::BufReader::new(
fs::OpenOptions::new()
.read(true)
@@ -260,31 +252,31 @@ impl RemoteStorage for LocalFs {
"Failed to open source file '{}' to use in the download",
file_path.display()
)
})?,
})
.map_err(DownloadError::Other)?,
);
source
.seek(io::SeekFrom::Start(start_inclusive))
.await
.context("Failed to seek to the range start in a local storage file")?;
match end_exclusive {
Some(end_exclusive) => {
io::copy(&mut source.take(end_exclusive - start_inclusive), to).await
}
None => io::copy(&mut source, to).await,
}
.with_context(|| {
format!(
"Failed to download file '{}' range from the local storage",
file_path.display()
)
})?;
.context("Failed to seek to the range start in a local storage file")
.map_err(DownloadError::Other)?;
let metadata = self
.read_storage_metadata(&file_path)
.await
.map_err(DownloadError::Other)?;
self.read_storage_metadata(&file_path).await
Ok(match end_exclusive {
Some(end_exclusive) => Download {
metadata,
download_stream: Box::pin(source.take(end_exclusive - start_inclusive)),
},
None => Download {
metadata,
download_stream: Box::pin(source),
},
})
} else {
bail!(
"File '{}' either does not exist or is not a file",
file_path.display()
)
Err(DownloadError::NotFound)
}
}
@@ -352,6 +344,19 @@ async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()>
Ok(())
}
fn file_exists(file_path: &Path) -> anyhow::Result<bool> {
if file_path.exists() {
ensure!(
file_path.is_file(),
"file path '{}' is not a file",
file_path.display()
);
Ok(true)
} else {
Ok(false)
}
}
#[cfg(test)]
mod pure_tests {
use tempfile::tempdir;
@@ -518,6 +523,31 @@ mod fs_tests {
use std::{collections::HashMap, io::Write};
use tempfile::tempdir;
async fn read_and_assert_remote_file_contents(
storage: &LocalFs,
#[allow(clippy::ptr_arg)]
// have to use &PathBuf due to `storage.local_path` parameter requirements
remote_storage_path: &PathBuf,
expected_metadata: Option<&StorageMetadata>,
) -> anyhow::Result<String> {
let mut download = storage
.download(remote_storage_path)
.await
.map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
ensure!(
download.metadata.as_ref() == expected_metadata,
"Unexpected metadata returned for the downloaded file"
);
let mut contents = String::new();
download
.download_stream
.read_to_string(&mut contents)
.await
.context("Failed to read remote file contents into string")?;
Ok(contents)
}
#[tokio::test]
async fn upload_file() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
@@ -568,15 +598,7 @@ mod fs_tests {
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let metadata = storage.download(&upload_target, &mut content_bytes).await?;
assert!(
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
content_bytes.flush().await?;
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
assert_eq!(
dummy_contents(upload_name),
contents,
@@ -584,13 +606,9 @@ mod fs_tests {
);
let non_existing_path = PathBuf::from("somewhere").join("else");
match storage.download(&non_existing_path, &mut io::sink()).await {
Ok(_) => panic!("Should not allow downloading non-existing storage files"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("does not exist"));
assert!(error_string.contains(&non_existing_path.display().to_string()));
}
match storage.download(&non_existing_path).await {
Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
}
Ok(())
}
@@ -603,58 +621,31 @@ mod fs_tests {
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
let mut full_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let metadata = storage
.download_byte_range(&upload_target, 0, None, &mut full_range_bytes)
.await?;
assert!(
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
full_range_bytes.flush().await?;
let full_range_download_contents =
read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
assert_eq!(
dummy_contents(upload_name),
String::from_utf8(full_range_bytes.into_inner().into_inner())?,
full_range_download_contents,
"Download full range should return the whole upload"
);
let mut zero_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let same_byte = 1_000_000_000;
let metadata = storage
.download_byte_range(
&upload_target,
same_byte,
Some(same_byte + 1), // exclusive end
&mut zero_range_bytes,
)
.await?;
assert!(
metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
zero_range_bytes.flush().await?;
assert!(
zero_range_bytes.into_inner().into_inner().is_empty(),
"Zero byte range should not download any part of the file"
);
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let metadata = storage
.download_byte_range(
&upload_target,
0,
Some(first_part_local.len() as u64),
&mut first_part_remote,
)
let mut first_part_download = storage
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
.await?;
assert!(
metadata.is_none(),
first_part_download.metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
io::copy(
&mut first_part_download.download_stream,
&mut first_part_remote,
)
.await?;
first_part_remote.flush().await?;
let first_part_remote = first_part_remote.into_inner().into_inner();
assert_eq!(
@@ -663,20 +654,24 @@ mod fs_tests {
"First part bytes should be returned when requested"
);
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let metadata = storage
let mut second_part_download = storage
.download_byte_range(
&upload_target,
first_part_local.len() as u64,
Some((first_part_local.len() + second_part_local.len()) as u64),
&mut second_part_remote,
)
.await?;
assert!(
metadata.is_none(),
second_part_download.metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
io::copy(
&mut second_part_download.download_stream,
&mut second_part_remote,
)
.await?;
second_part_remote.flush().await?;
let second_part_remote = second_part_remote.into_inner().into_inner();
assert_eq!(
@@ -696,11 +691,30 @@ mod fs_tests {
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
let start = 1_000_000_000;
let end = start + 1;
match storage
.download_byte_range(
&upload_target,
start,
Some(end), // exclusive end
)
.await
{
Ok(_) => panic!("Should not allow downloading wrong ranges"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("zero bytes"));
assert!(error_string.contains(&start.to_string()));
assert!(error_string.contains(&end.to_string()));
}
}
let start = 10000;
let end = 234;
assert!(start > end, "Should test an incorrect range");
match storage
.download_byte_range(&upload_target, start, Some(end), &mut io::sink())
.download_byte_range(&upload_target, start, Some(end))
.await
{
Ok(_) => panic!("Should not allow downloading wrong ranges"),
@@ -712,18 +726,6 @@ mod fs_tests {
}
}
let non_existing_path = PathBuf::from("somewhere").join("else");
match storage
.download_byte_range(&non_existing_path, 1, Some(3), &mut io::sink())
.await
{
Ok(_) => panic!("Should not allow downloading non-existing storage file ranges"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("does not exist"));
assert!(error_string.contains(&non_existing_path.display().to_string()));
}
}
Ok(())
}
@@ -762,35 +764,26 @@ mod fs_tests {
let upload_target =
upload_dummy_file(&workdir, &storage, upload_name, Some(metadata.clone())).await?;
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let full_download_metadata = storage.download(&upload_target, &mut content_bytes).await?;
content_bytes.flush().await?;
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
let full_range_download_contents =
read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
assert_eq!(
dummy_contents(upload_name),
contents,
full_range_download_contents,
"We should upload and download the same contents"
);
assert_eq!(
full_download_metadata.as_ref(),
Some(&metadata),
"We should get the same metadata back for full download"
);
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
let (first_part_local, _) = uploaded_bytes.split_at(3);
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
let partial_download_metadata = storage
.download_byte_range(
&upload_target,
0,
Some(first_part_local.len() as u64),
&mut first_part_remote,
)
let mut partial_download_with_metadata = storage
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
.await?;
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
io::copy(
&mut partial_download_with_metadata.download_stream,
&mut first_part_remote,
)
.await?;
first_part_remote.flush().await?;
let first_part_remote = first_part_remote.into_inner().into_inner();
assert_eq!(
@@ -800,8 +793,8 @@ mod fs_tests {
);
assert_eq!(
partial_download_metadata.as_ref(),
Some(&metadata),
partial_download_with_metadata.metadata,
Some(metadata),
"We should get the same metadata back for partial download"
);
@@ -843,7 +836,7 @@ mod fs_tests {
}
fn dummy_contents(name: &str) -> String {
format!("contents for {}", name)
format!("contents for {name}")
}
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<PathBuf>> {

View File

@@ -9,17 +9,17 @@ use std::path::{Path, PathBuf};
use anyhow::Context;
use rusoto_core::{
credential::{InstanceMetadataProvider, StaticProvider},
HttpClient, Region,
HttpClient, Region, RusotoError,
};
use rusoto_s3::{
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client,
StreamingBody, S3,
DeleteObjectRequest, GetObjectError, GetObjectRequest, ListObjectsV2Request, PutObjectRequest,
S3Client, StreamingBody, S3,
};
use tokio::{io, sync::Semaphore};
use tokio_util::io::ReaderStream;
use tracing::debug;
use crate::{strip_path_prefix, RemoteStorage, S3Config};
use crate::{strip_path_prefix, Download, DownloadError, RemoteStorage, S3Config};
use super::StorageMetadata;
@@ -187,6 +187,39 @@ impl S3Bucket {
concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()),
})
}
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 download")
.map_err(DownloadError::Other)?;
metrics::inc_get_object();
match self.client.get_object(request).await {
Ok(object_output) => match object_output.body {
None => {
metrics::inc_get_object_fail();
Err(DownloadError::Other(anyhow::anyhow!(
"Got no body for the S3 object given"
)))
}
Some(body) => Ok(Download {
metadata: object_output.metadata.map(StorageMetadata),
download_stream: Box::pin(io::BufReader::new(body.into_async_read())),
}),
},
Err(RusotoError::Service(GetObjectError::NoSuchKey(_))) => Err(DownloadError::NotFound),
Err(e) => {
metrics::inc_get_object_fail();
Err(DownloadError::Other(anyhow::anyhow!(
"Failed to download S3 object: {e}"
)))
}
}
}
}
#[async_trait::async_trait]
@@ -283,38 +316,13 @@ impl RemoteStorage for S3Bucket {
Ok(())
}
async fn download(
&self,
from: &Self::RemoteObjectId,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>> {
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 download")?;
metrics::inc_get_object();
let object_output = self
.client
.get_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.key().to_owned(),
..GetObjectRequest::default()
})
.await
.map_err(|e| {
metrics::inc_get_object_fail();
e
})?;
if let Some(body) = object_output.body {
let mut from = io::BufReader::new(body.into_async_read());
io::copy(&mut from, to).await?;
}
Ok(object_output.metadata.map(StorageMetadata))
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError> {
self.download_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.key().to_owned(),
..GetObjectRequest::default()
})
.await
}
async fn download_byte_range(
@@ -322,8 +330,7 @@ impl RemoteStorage for S3Bucket {
from: &Self::RemoteObjectId,
start_inclusive: u64,
end_exclusive: Option<u64>,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<Option<StorageMetadata>> {
) -> Result<Download, DownloadError> {
// S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
// and needs both ends to be exclusive
let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
@@ -331,34 +338,14 @@ impl RemoteStorage for S3Bucket {
Some(end_inclusive) => format!("bytes={}-{}", start_inclusive, end_inclusive),
None => format!("bytes={}-", start_inclusive),
});
let _guard = self
.concurrency_limiter
.acquire()
.await
.context("Concurrency limiter semaphore got closed during S3 range download")?;
metrics::inc_get_object();
let object_output = self
.client
.get_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.key().to_owned(),
range,
..GetObjectRequest::default()
})
.await
.map_err(|e| {
metrics::inc_get_object_fail();
e
})?;
if let Some(body) = object_output.body {
let mut from = io::BufReader::new(body.into_async_read());
io::copy(&mut from, to).await?;
}
Ok(object_output.metadata.map(StorageMetadata))
self.download_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.key().to_owned(),
range,
..GetObjectRequest::default()
})
.await
}
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()> {

View File

@@ -263,6 +263,8 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
// start profiler (if enabled)
let profiler_guard = profiling::init_profiler(conf);
pageserver::tenant_tasks::init_tenant_task_pool()?;
// initialize authentication for incoming connections
let auth = match &conf.auth_type {
AuthType::Trust | AuthType::MD5 => None,

View File

@@ -158,6 +158,18 @@ pub struct LayeredRepository {
// Global pageserver config parameters
pub conf: &'static PageServerConf,
// Allows us to gracefully cancel operations that edit the directory
// that backs this layered repository. Usage:
//
// Use `let _guard = file_lock.try_read()` while writing any files.
// Use `let _guard = file_lock.write().unwrap()` to wait for all writes to finish.
//
// TODO try_read this lock during checkpoint as well to prevent race
// between checkpoint and detach/delete.
// TODO try_read this lock for all gc/compaction operations, not just
// ones scheduled by the tenant task manager.
pub file_lock: RwLock<()>,
// Overridden tenant-specific config parameters.
// We keep TenantConfOpt sturct here to preserve the information
// about parameters that are not set.
@@ -220,23 +232,32 @@ impl Repository for LayeredRepository {
fn create_empty_timeline(
&self,
timelineid: ZTimelineId,
timeline_id: ZTimelineId,
initdb_lsn: Lsn,
) -> Result<Arc<LayeredTimeline>> {
let mut timelines = self.timelines.lock().unwrap();
let vacant_timeline_entry = match timelines.entry(timeline_id) {
Entry::Occupied(_) => bail!("Timeline already exists"),
Entry::Vacant(vacant_entry) => vacant_entry,
};
let timeline_path = self.conf.timeline_path(&timeline_id, &self.tenant_id);
if timeline_path.exists() {
bail!("Timeline directory already exists, but timeline is missing in repository map. This is a bug.")
}
// Create the timeline directory, and write initial metadata to file.
crashsafe_dir::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenant_id))?;
crashsafe_dir::create_dir_all(timeline_path)?;
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn);
Self::save_metadata(self.conf, timelineid, self.tenant_id, &metadata, true)?;
Self::save_metadata(self.conf, timeline_id, self.tenant_id, &metadata, true)?;
let timeline = LayeredTimeline::new(
self.conf,
Arc::clone(&self.tenant_conf),
metadata,
None,
timelineid,
timeline_id,
self.tenant_id,
Arc::clone(&self.walredo_mgr),
self.upload_layers,
@@ -245,12 +266,7 @@ impl Repository for LayeredRepository {
// Insert if not exists
let timeline = Arc::new(timeline);
match timelines.entry(timelineid) {
Entry::Occupied(_) => bail!("Timeline already exists"),
Entry::Vacant(vacant) => {
vacant.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)))
}
};
vacant_timeline_entry.insert(LayeredTimelineEntry::Loaded(Arc::clone(&timeline)));
Ok(timeline)
}
@@ -685,6 +701,7 @@ impl LayeredRepository {
) -> LayeredRepository {
LayeredRepository {
tenant_id,
file_lock: RwLock::new(()),
conf,
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
timelines: Mutex::new(HashMap::new()),
@@ -1910,15 +1927,28 @@ impl LayeredTimeline {
} else {
Lsn(0)
};
// Let's consider an example:
//
// delta layer with LSN range 71-81
// delta layer with LSN range 81-91
// delta layer with LSN range 91-101
// image layer at LSN 100
//
// If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
// there's no need to create a new one. We check this case explicitly, to avoid passing
// a bogus range to count_deltas below, with start > end. It's even possible that there
// are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
// after we read last_record_lsn, which is passed here in the 'lsn' argument.
if img_lsn < lsn {
let num_deltas = layers.count_deltas(&img_range, &(img_lsn..lsn))?;
let num_deltas = layers.count_deltas(&img_range, &(img_lsn..lsn))?;
debug!(
"range {}-{}, has {} deltas on this timeline",
img_range.start, img_range.end, num_deltas
);
if num_deltas >= self.get_image_creation_threshold() {
return Ok(true);
debug!(
"key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
img_range.start, img_range.end, num_deltas, img_lsn, lsn
);
if num_deltas >= self.get_image_creation_threshold() {
return Ok(true);
}
}
}
}

View File

@@ -34,7 +34,7 @@ pub trait BlobCursor {
) -> Result<(), std::io::Error>;
}
impl<'a, R> BlobCursor for BlockCursor<R>
impl<R> BlobCursor for BlockCursor<R>
where
R: BlockReader,
{

View File

@@ -445,7 +445,10 @@ impl ImageLayerWriter {
},
);
info!("new image layer {}", path.display());
let mut file = VirtualFile::create(&path)?;
let mut file = VirtualFile::open_with_options(
&path,
std::fs::OpenOptions::new().write(true).create_new(true),
)?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64))?;
let blob_writer = WriteBlobWriter::new(file, PAGE_SZ as u64);

View File

@@ -13,7 +13,7 @@ pub mod repository;
pub mod storage_sync;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_threads;
pub mod tenant_tasks;
pub mod thread_mgr;
pub mod timelines;
pub mod virtual_file;

View File

@@ -81,6 +81,12 @@ mod profiling_impl {
pub struct DummyProfilerGuard;
impl Drop for DummyProfilerGuard {
fn drop(&mut self) {
// do nothing, this exists to calm Clippy down
}
}
pub fn profpoint_start(
_conf: &PageServerConf,
_point: ProfilingConfig,

View File

@@ -225,7 +225,7 @@ pub trait Repository: Send + Sync {
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
fn create_empty_timeline(
&self,
timelineid: ZTimelineId,
timeline_id: ZTimelineId,
initdb_lsn: Lsn,
) -> Result<Arc<Self::Timeline>>;
@@ -636,6 +636,19 @@ mod tests {
Ok(())
}
#[test]
fn no_duplicate_timelines() -> Result<()> {
let repo = RepoHarness::create("no_duplicate_timelines")?.load();
let _ = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
match repo.create_empty_timeline(TIMELINE_ID, Lsn(0)) {
Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!(e.to_string(), "Timeline already exists"),
}
Ok(())
}
/// Convenience function to create a page image with given string as the only content
pub fn test_value(s: &str) -> Value {
let mut buf = BytesMut::new();

View File

@@ -44,13 +44,23 @@ where
index_part_path.display()
)
})?;
let mut index_part_download =
storage
.download(&part_storage_path)
.await
.with_context(|| {
format!("Failed to open download stream for for storage path {part_storage_path:?}")
})?;
let mut index_part_bytes = Vec::new();
storage
.download(&part_storage_path, &mut index_part_bytes)
.await
.with_context(|| {
format!("Failed to download an index part from storage path {part_storage_path:?}")
})?;
io::copy(
&mut index_part_download.download_stream,
&mut index_part_bytes,
)
.await
.with_context(|| {
format!("Failed to download an index part from storage path {part_storage_path:?}")
})?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes).with_context(|| {
format!("Failed to deserialize index part file from storage path '{part_storage_path:?}'")
@@ -162,15 +172,19 @@ where
temp_file_path.display()
)
})?;
storage
.download(&layer_storage_path, &mut destination_file)
let mut download = storage
.download(&layer_storage_path)
.await
.with_context(|| {
format!(
"Failed to download a layer from storage path '{layer_storage_path:?}'"
"Failed to open a download stream for layer with remote storage path '{layer_storage_path:?}'"
)
})?;
io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| {
format!(
"Failed to download layer with remote storage path '{layer_storage_path:?}' into file '{}'", temp_file_path.display()
)
})?;
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations

View File

@@ -230,8 +230,6 @@ pub fn shutdown_all_tenants() {
drop(m);
thread_mgr::shutdown_threads(Some(ThreadKind::WalReceiverManager), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), None, None);
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), None, None);
// Ok, no background threads running anymore. Flush any remaining data in
// memory to disk.
@@ -330,44 +328,12 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
}
(TenantState::Idle, TenantState::Active) => {
info!("activating tenant {tenant_id}");
let compactor_spawn_result = thread_mgr::spawn(
ThreadKind::Compactor,
Some(tenant_id),
None,
"Compactor thread",
false,
move || crate::tenant_threads::compact_loop(tenant_id),
);
if compactor_spawn_result.is_err() {
let mut m = tenants_state::write_tenants();
m.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
.state = old_state;
drop(m);
}
compactor_spawn_result?;
let gc_spawn_result = thread_mgr::spawn(
ThreadKind::GarbageCollector,
Some(tenant_id),
None,
"GC thread",
false,
move || crate::tenant_threads::gc_loop(tenant_id),
)
.map(|_thread_id| ()) // update the `Result::Ok` type to match the outer function's return signature
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
if let Err(e) = &gc_spawn_result {
let mut m = tenants_state::write_tenants();
m.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?
.state = old_state;
drop(m);
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
return gc_spawn_result;
}
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
// TODO maybe use tokio::sync::watch instead?
crate::tenant_tasks::start_compaction_loop(tenant_id)?;
crate::tenant_tasks::start_gc_loop(tenant_id)?;
}
(TenantState::Idle, TenantState::Stopping) => {
info!("stopping idle tenant {tenant_id}");
@@ -379,8 +345,10 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
Some(tenant_id),
None,
);
thread_mgr::shutdown_threads(Some(ThreadKind::GarbageCollector), Some(tenant_id), None);
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
// Wait until all gc/compaction tasks finish
let repo = get_repository_for_tenant(tenant_id)?;
let _guard = repo.file_lock.write().unwrap();
}
}

View File

@@ -0,0 +1,288 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use std::collections::HashMap;
use std::ops::ControlFlow;
use std::time::Duration;
use crate::repository::Repository;
use crate::tenant_mgr::TenantState;
use crate::thread_mgr::ThreadKind;
use crate::{tenant_mgr, thread_mgr};
use anyhow::{self, Context};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use metrics::{register_int_counter_vec, IntCounterVec};
use once_cell::sync::{Lazy, OnceCell};
use tokio::sync::mpsc;
use tokio::sync::watch;
use tracing::*;
use utils::zid::ZTenantId;
static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_tenant_task_events",
"Number of task start/stop/fail events.",
&["event"],
)
.expect("Failed to register tenant_task_events metric")
});
///
/// Compaction task's main loop
///
async fn compaction_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
loop {
trace!("waking up");
// Run blocking part of the task
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
// Break if tenant is not active
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
return Ok(ControlFlow::Break(()));
}
// Break if we're not allowed to write to disk
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
// TODO do this inside repo.compaction_iteration instead.
let _guard = match repo.file_lock.try_read() {
Ok(g) => g,
Err(_) => return Ok(ControlFlow::Break(())),
};
// Run compaction
let compaction_period = repo.get_compaction_period();
repo.compaction_iteration()?;
Ok(ControlFlow::Continue(compaction_period))
})
.await;
// Decide whether to sleep or break
let sleep_duration = match period {
Ok(Ok(ControlFlow::Continue(period))) => period,
Ok(Ok(ControlFlow::Break(()))) => break,
Ok(Err(e)) => {
error!("Compaction failed, retrying: {}", e);
Duration::from_secs(2)
}
Err(e) => {
error!("Compaction join error, retrying: {}", e);
Duration::from_secs(2)
}
};
// Sleep
tokio::select! {
_ = cancel.changed() => {
trace!("received cancellation request");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
}
}
trace!(
"compaction loop stopped. State is {:?}",
tenant_mgr::get_tenant_state(tenantid)
);
}
static START_GC_LOOP: OnceCell<mpsc::Sender<ZTenantId>> = OnceCell::new();
static START_COMPACTION_LOOP: OnceCell<mpsc::Sender<ZTenantId>> = OnceCell::new();
/// Spawn a task that will periodically schedule garbage collection until
/// the tenant becomes inactive. This should be called on tenant
/// activation.
pub fn start_gc_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
START_GC_LOOP
.get()
.context("Failed to get START_GC_LOOP")?
.blocking_send(tenantid)
.context("Failed to send to START_GC_LOOP channel")?;
Ok(())
}
/// Spawn a task that will periodically schedule compaction until
/// the tenant becomes inactive. This should be called on tenant
/// activation.
pub fn start_compaction_loop(tenantid: ZTenantId) -> anyhow::Result<()> {
START_COMPACTION_LOOP
.get()
.context("failed to get START_COMPACTION_LOOP")?
.blocking_send(tenantid)
.context("failed to send to START_COMPACTION_LOOP")?;
Ok(())
}
/// Spawn the TenantTaskManager
/// This needs to be called before start_gc_loop or start_compaction_loop
pub fn init_tenant_task_pool() -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("tenant-task-worker")
.worker_threads(40) // Way more than necessary
.max_blocking_threads(100) // Way more than necessary
.enable_all()
.build()?;
let (gc_send, mut gc_recv) = mpsc::channel::<ZTenantId>(100);
START_GC_LOOP
.set(gc_send)
.expect("Failed to set START_GC_LOOP");
let (compaction_send, mut compaction_recv) = mpsc::channel::<ZTenantId>(100);
START_COMPACTION_LOOP
.set(compaction_send)
.expect("Failed to set START_COMPACTION_LOOP");
// TODO this is getting repetitive
let mut gc_loops = HashMap::<ZTenantId, watch::Sender<()>>::new();
let mut compaction_loops = HashMap::<ZTenantId, watch::Sender<()>>::new();
thread_mgr::spawn(
ThreadKind::TenantTaskManager,
None,
None,
"Tenant task manager main thread",
true,
move || {
runtime.block_on(async move {
let mut futures = FuturesUnordered::new();
loop {
tokio::select! {
_ = thread_mgr::shutdown_watcher() => {
// Send cancellation to all tasks
for (_, cancel) in gc_loops.drain() {
cancel.send(()).ok();
}
for (_, cancel) in compaction_loops.drain() {
cancel.send(()).ok();
}
// Exit after all tasks finish
while let Some(result) = futures.next().await {
match result {
Ok(()) => {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
},
Err(e) => {
TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc();
error!("loop join error {}", e)
},
}
}
break;
},
tenantid = gc_recv.recv() => {
let tenantid = tenantid.expect("Gc task channel closed unexpectedly");
// Spawn new task, request cancellation of the old one if exists
let (cancel_send, cancel_recv) = watch::channel(());
let handle = tokio::spawn(gc_loop(tenantid, cancel_recv)
.instrument(info_span!("gc loop", tenant = %tenantid)));
if let Some(old_cancel_send) = gc_loops.insert(tenantid, cancel_send) {
old_cancel_send.send(()).ok();
}
// Update metrics, remember handle
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
futures.push(handle);
},
tenantid = compaction_recv.recv() => {
let tenantid = tenantid.expect("Compaction task channel closed unexpectedly");
// Spawn new task, request cancellation of the old one if exists
let (cancel_send, cancel_recv) = watch::channel(());
let handle = tokio::spawn(compaction_loop(tenantid, cancel_recv)
.instrument(info_span!("compaction loop", tenant = %tenantid)));
if let Some(old_cancel_send) = compaction_loops.insert(tenantid, cancel_send) {
old_cancel_send.send(()).ok();
}
// Update metrics, remember handle
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
futures.push(handle);
},
result = futures.next() => {
// Log and count any unhandled panics
match result {
Some(Ok(())) => {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
},
Some(Err(e)) => {
TENANT_TASK_EVENTS.with_label_values(&["panic"]).inc();
error!("loop join error {}", e)
},
None => {},
};
},
}
}
});
Ok(())
},
)?;
Ok(())
}
///
/// GC task's main loop
///
async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
loop {
trace!("waking up");
// Run blocking part of the task
let period: Result<Result<_, anyhow::Error>, _> = tokio::task::spawn_blocking(move || {
// Break if tenant is not active
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
return Ok(ControlFlow::Break(()));
}
// Break if we're not allowed to write to disk
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
// TODO do this inside repo.gc_iteration instead.
let _guard = match repo.file_lock.try_read() {
Ok(g) => g,
Err(_) => return Ok(ControlFlow::Break(())),
};
// Run gc
let gc_period = repo.get_gc_period();
let gc_horizon = repo.get_gc_horizon();
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
Ok(ControlFlow::Continue(gc_period))
})
.await;
// Decide whether to sleep or break
let sleep_duration = match period {
Ok(Ok(ControlFlow::Continue(period))) => period,
Ok(Ok(ControlFlow::Break(()))) => break,
Ok(Err(e)) => {
error!("Gc failed, retrying: {}", e);
Duration::from_secs(2)
}
Err(e) => {
error!("Gc join error, retrying: {}", e);
Duration::from_secs(2)
}
};
// Sleep
tokio::select! {
_ = cancel.changed() => {
trace!("received cancellation request");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
}
}
trace!(
"GC loop stopped. State is {:?}",
tenant_mgr::get_tenant_state(tenantid)
);
}

View File

@@ -1,79 +0,0 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use anyhow::Result;
use std::time::Duration;
use tracing::*;
use utils::zid::ZTenantId;
///
/// Compaction thread's main loop
///
pub fn compact_loop(tenantid: ZTenantId) -> Result<()> {
if let Err(err) = compact_loop_ext(tenantid) {
error!("compact loop terminated with error: {:?}", err);
Err(err)
} else {
Ok(())
}
}
fn compact_loop_ext(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let compaction_period = repo.get_compaction_period();
std::thread::sleep(compaction_period);
trace!("compaction thread for tenant {} waking up", tenantid);
// Compact timelines
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
repo.compaction_iteration()?;
}
trace!(
"compaction thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}
///
/// GC thread's main loop
///
pub fn gc_loop(tenantid: ZTenantId) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != Some(TenantState::Active) {
break;
}
trace!("gc thread for tenant {} waking up", tenantid);
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let gc_horizon = repo.get_gc_horizon();
// Garbage collect old files that are not needed for PITR anymore
if gc_horizon > 0 {
repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)?;
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = repo.get_gc_period().as_secs();
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == Some(TenantState::Active)
{
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
}
trace!(
"GC thread stopped for tenant {} state is {:?}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}

View File

@@ -94,11 +94,8 @@ pub enum ThreadKind {
// Main walreceiver manager thread that ensures that every timeline spawns a connection to safekeeper, to fetch WAL.
WalReceiverManager,
// Thread that handles compaction of all timelines for a tenant.
Compactor,
// Thread that handles GC of a tenant
GarbageCollector,
// Thread that schedules new compaction and gc jobs
TenantTaskManager,
// Thread that flushes frozen in-memory layers to disk
LayerFlushThread,

View File

@@ -115,7 +115,7 @@ mod tests {
Ok(())
});
let () = waiter.await?;
waiter.await?;
notifier.await?
}
}

View File

@@ -2,18 +2,16 @@ use anyhow::{Context, Result};
use etcd_broker::subscription_key::{
NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind,
};
use tokio::io::AsyncRead;
use tokio::task::JoinHandle;
use std::cmp::min;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use postgres_ffi::xlog_utils::{
XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, MAX_SEND_SIZE, PG_TLI,
};
use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI};
use remote_storage::{GenericRemoteStorage, RemoteStorage};
use tokio::fs::File;
use tokio::runtime::Builder;
@@ -452,45 +450,41 @@ async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
pub async fn read_object(
file_path: PathBuf,
offset: u64,
) -> (impl AsyncRead, JoinHandle<Result<()>>) {
let storage = REMOTE_STORAGE.get().expect("failed to get remote storage");
) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead>>> {
let download = match REMOTE_STORAGE
.get()
.context("Failed to get remote storage")?
.as_ref()
.context("No remote storage configured")?
{
GenericRemoteStorage::Local(local_storage) => {
let source = local_storage.remote_object_id(&file_path)?;
let (mut pipe_writer, pipe_reader) = tokio::io::duplex(MAX_SEND_SIZE);
let copy_result = tokio::spawn(async move {
let res = match storage.as_ref().unwrap() {
GenericRemoteStorage::Local(local_storage) => {
let source = local_storage.remote_object_id(&file_path)?;
info!(
"local download about to start from {} at offset {}",
source.display(),
offset
);
local_storage
.download_byte_range(&source, offset, None, &mut pipe_writer)
.await
}
GenericRemoteStorage::S3(s3_storage) => {
let s3key = s3_storage.remote_object_id(&file_path)?;
info!(
"S3 download about to start from {:?} at offset {}",
s3key, offset
);
s3_storage
.download_byte_range(&s3key, offset, None, &mut pipe_writer)
.await
}
};
if let Err(e) = res {
error!("failed to download WAL segment from remote storage: {}", e);
Err(e)
} else {
Ok(())
info!(
"local download about to start from {} at offset {}",
source.display(),
offset
);
local_storage
.download_byte_range(&source, offset, None)
.await
}
});
GenericRemoteStorage::S3(s3_storage) => {
let s3key = s3_storage.remote_object_id(&file_path)?;
(pipe_reader, copy_result)
info!(
"S3 download about to start from {:?} at offset {}",
s3key, offset
);
s3_storage.download_byte_range(&s3key, offset, None).await
}
}
.with_context(|| {
format!(
"Failed to open WAL segment download stream for local storage path {}",
file_path.display()
)
})?;
Ok(download.download_stream)
}

View File

@@ -604,8 +604,7 @@ impl WalReader {
// Try to open remote file, if remote reads are enabled
if self.enable_remote_read {
let (reader, _) = read_object(wal_file_path, xlogoff as u64).await;
return Ok(Box::pin(reader));
return read_object(wal_file_path, xlogoff as u64).await;
}
bail!("WAL segment is not found")

438
scripts/add_missing_rels.py Normal file
View File

@@ -0,0 +1,438 @@
import os
import shutil
from pathlib import Path
import tempfile
from contextlib import closing
import psycopg2
import subprocess
import argparse
### utils copied from test fixtures
from typing import Any, List
from psycopg2.extensions import connection as PgConnection
import asyncpg
from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, cast, Union, Tuple
Env = Dict[str, str]
_global_counter = 0
def global_counter() -> int:
""" A really dumb global counter.
This is useful for giving output files a unique number, so if we run the
same command multiple times we can keep their output separate.
"""
global _global_counter
_global_counter += 1
return _global_counter
def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str:
""" Run a process and capture its output
Output will go to files named "cmd_NNN.stdout" and "cmd_NNN.stderr"
where "cmd" is the name of the program and NNN is an incrementing
counter.
If those files already exist, we will overwrite them.
Returns basepath for files with captured output.
"""
assert type(cmd) is list
base = os.path.basename(cmd[0]) + '_{}'.format(global_counter())
basepath = os.path.join(capture_dir, base)
stdout_filename = basepath + '.stdout'
stderr_filename = basepath + '.stderr'
with open(stdout_filename, 'w') as stdout_f:
with open(stderr_filename, 'w') as stderr_f:
print('(capturing output to "{}.stdout")'.format(base))
subprocess.run(cmd, **kwargs, stdout=stdout_f, stderr=stderr_f)
return basepath
class PgBin:
""" A helper class for executing postgres binaries """
def __init__(self, log_dir: Path, pg_distrib_dir):
self.log_dir = log_dir
self.pg_bin_path = os.path.join(str(pg_distrib_dir), 'bin')
self.env = os.environ.copy()
self.env['LD_LIBRARY_PATH'] = os.path.join(str(pg_distrib_dir), 'lib')
def _fixpath(self, command: List[str]):
if '/' not in command[0]:
command[0] = os.path.join(self.pg_bin_path, command[0])
def _build_env(self, env_add: Optional[Env]) -> Env:
if env_add is None:
return self.env
env = self.env.copy()
env.update(env_add)
return env
def run(self, command: List[str], env: Optional[Env] = None, cwd: Optional[str] = None):
"""
Run one of the postgres binaries.
The command should be in list form, e.g. ['pgbench', '-p', '55432']
All the necessary environment variables will be set.
If the first argument (the command name) doesn't include a path (no '/'
characters present), then it will be edited to include the correct path.
If you want stdout/stderr captured to files, use `run_capture` instead.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
def run_capture(self,
command: List[str],
env: Optional[Env] = None,
cwd: Optional[str] = None,
**kwargs: Any) -> str:
"""
Run one of the postgres binaries, with stderr and stdout redirected to a file.
This is just like `run`, but for chatty programs. Returns basepath for files
with captured output.
"""
self._fixpath(command)
print('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
return subprocess_capture(str(self.log_dir),
command,
env=env,
cwd=cwd,
check=True,
**kwargs)
class PgProtocol:
""" Reusable connection logic """
def __init__(self, **kwargs):
self.default_options = kwargs
def connstr(self, **kwargs) -> str:
"""
Build a libpq connection string for the Postgres instance.
"""
return str(make_dsn(**self.conn_options(**kwargs)))
def conn_options(self, **kwargs):
conn_options = self.default_options.copy()
if 'dsn' in kwargs:
conn_options.update(parse_dsn(kwargs['dsn']))
conn_options.update(kwargs)
# Individual statement timeout in seconds. 2 minutes should be
# enough for our tests, but if you need a longer, you can
# change it by calling "SET statement_timeout" after
# connecting.
if 'options' in conn_options:
conn_options['options'] = f"-cstatement_timeout=120s " + conn_options['options']
else:
conn_options['options'] = "-cstatement_timeout=120s"
return conn_options
# autocommit=True here by default because that's what we need most of the time
def connect(self, autocommit=True, **kwargs) -> PgConnection:
"""
Connect to the node.
Returns psycopg2's connection object.
This method passes all extra params to connstr.
"""
conn = psycopg2.connect(**self.conn_options(**kwargs))
# WARNING: this setting affects *all* tests!
conn.autocommit = autocommit
return conn
async def connect_async(self, **kwargs) -> asyncpg.Connection:
"""
Connect to the node from async python.
Returns asyncpg's connection object.
"""
# asyncpg takes slightly different options than psycopg2. Try
# to convert the defaults from the psycopg2 format.
# The psycopg2 option 'dbname' is called 'database' is asyncpg
conn_options = self.conn_options(**kwargs)
if 'dbname' in conn_options:
conn_options['database'] = conn_options.pop('dbname')
# Convert options='-c<key>=<val>' to server_settings
if 'options' in conn_options:
options = conn_options.pop('options')
for match in re.finditer('-c(\w*)=(\w*)', options):
key = match.group(1)
val = match.group(2)
if 'server_options' in conn_options:
conn_options['server_settings'].update({key: val})
else:
conn_options['server_settings'] = {key: val}
return await asyncpg.connect(**conn_options)
def safe_psql(self, query: str, **kwargs: Any) -> List[Tuple[Any, ...]]:
"""
Execute query against the node and return all rows.
This method passes all extra params to connstr.
"""
return self.safe_psql_many([query], **kwargs)[0]
def safe_psql_many(self, queries: List[str], **kwargs: Any) -> List[List[Tuple[Any, ...]]]:
"""
Execute queries against the node and return all rows.
This method passes all extra params to connstr.
"""
result: List[List[Any]] = []
with closing(self.connect(**kwargs)) as conn:
with conn.cursor() as cur:
for query in queries:
print(f"Executing query: {query}")
cur.execute(query)
if cur.description is None:
result.append([]) # query didn't return data
else:
result.append(cast(List[Any], cur.fetchall()))
return result
class VanillaPostgres(PgProtocol):
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
super().__init__(host='localhost', port=port, dbname='postgres')
self.pgdatadir = pgdatadir
self.pg_bin = pg_bin
self.running = False
if init:
self.pg_bin.run_capture(['initdb', '-D', str(pgdatadir)])
self.configure([f"port = {port}\n"])
def configure(self, options: List[str]):
"""Append lines into postgresql.conf file."""
assert not self.running
with open(os.path.join(self.pgdatadir, 'postgresql.conf'), 'a') as conf_file:
conf_file.write("\n".join(options))
def start(self, log_path: Optional[str] = None):
assert not self.running
self.running = True
if log_path is None:
log_path = os.path.join(self.pgdatadir, "pg.log")
self.pg_bin.run_capture(
['pg_ctl', '-w', '-D', str(self.pgdatadir), '-l', log_path, 'start'])
def stop(self):
assert self.running
self.running = False
self.pg_bin.run_capture(['pg_ctl', '-w', '-D', str(self.pgdatadir), 'stop'])
def get_subdir_size(self, subdir) -> int:
"""Return size of pgdatadir subdirectory in bytes."""
return get_dir_size(os.path.join(self.pgdatadir, subdir))
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
if self.running:
self.stop()
### actual code
def get_rel_paths(log_dir, pg_bin, base_tar):
"""Yeild list of relation paths"""
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
port = "55439" # Probably free
with VanillaPostgres(restored_dir, pg_bin, port, init=False) as vanilla_pg:
vanilla_pg.configure([f"port={port}"])
vanilla_pg.start()
# Create database based on template0 because we can't connect to template0
query = "create database template0copy template template0"
vanilla_pg.safe_psql(query, user="cloud_admin")
vanilla_pg.safe_psql("CHECKPOINT", user="cloud_admin")
# Get all databases
query = "select oid, datname from pg_database"
oid_dbname_pairs = vanilla_pg.safe_psql(query, user="cloud_admin")
template0_oid = [
oid
for (oid, database) in oid_dbname_pairs
if database == "template0"
][0]
# Get rel paths for each database
for oid, database in oid_dbname_pairs:
if database == "template0":
# We can't connect to template0
continue
query = "select relname, pg_relation_filepath(oid) from pg_class"
result = vanilla_pg.safe_psql(query, user="cloud_admin", dbname=database)
for relname, filepath in result:
if filepath is not None:
if database == "template0copy":
# Add all template0copy paths to template0
prefix = f"base/{oid}/"
if filepath.startswith(prefix):
suffix = filepath[len(prefix):]
yield f"base/{template0_oid}/{suffix}"
elif filepath.startswith("global"):
print(f"skipping {database} global file {filepath}")
else:
raise AssertionError
else:
yield filepath
def pack_base(log_dir, restored_dir, output_tar):
tmp_tar_name = "tmp.tar"
tmp_tar_path = os.path.join(restored_dir, tmp_tar_name)
cmd = ["tar", "-cf", tmp_tar_name] + os.listdir(restored_dir)
subprocess_capture(log_dir, cmd, cwd=restored_dir)
shutil.move(tmp_tar_path, output_tar)
def get_files_in_tar(log_dir, tar):
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", tar, "-C", restored_dir])
# Find empty files
empty_files = []
for root, dirs, files in os.walk(restored_dir):
for name in files:
file_path = os.path.join(root, name)
yield file_path[len(restored_dir) + 1:]
def corrupt(log_dir, base_tar, output_tar):
"""Remove all empty files and repackage. Return paths of files removed."""
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
# Find empty files
empty_files = []
for root, dirs, files in os.walk(restored_dir):
for name in files:
file_path = os.path.join(root, name)
file_size = os.path.getsize(file_path)
if file_size == 0:
empty_files.append(file_path)
# Delete empty files (just to see if they get recreated)
for empty_file in empty_files:
os.remove(empty_file)
# Repackage
pack_base(log_dir, restored_dir, output_tar)
# Return relative paths
return {
empty_file[len(restored_dir) + 1:]
for empty_file in empty_files
}
def touch_missing_rels(log_dir, corrupt_tar, output_tar, paths):
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", corrupt_tar, "-C", restored_dir])
# Touch files that don't exist
for path in paths:
absolute_path = os.path.join(restored_dir, path)
exists = os.path.exists(absolute_path)
if not exists:
print("File {absolute_path} didn't exist. Creating..")
Path(absolute_path).touch()
# Repackage
pack_base(log_dir, restored_dir, output_tar)
# TODO this test is not currently called. It needs any ordinary base.tar path as input
def test_add_missing_rels(base_tar):
output_tar = base_tar + ".fixed"
# Create new base tar with missing empty files
corrupt_tar = os.path.join(test_output_dir, "psql_2-corrupted.stdout")
deleted_files = corrupt(test_output_dir, base_tar, corrupt_tar)
assert len(set(get_files_in_tar(test_output_dir, base_tar)) -
set(get_files_in_tar(test_output_dir, corrupt_tar))) > 0
# Reconstruct paths from the corrupted tar, assert it covers everything important
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, corrupt_tar))
paths_missed = deleted_files - reconstructed_paths
assert paths_missed.issubset({
"postgresql.auto.conf",
"pg_ident.conf",
})
# Recreate the correct tar by touching files, compare with original tar
touch_missing_rels(test_output_dir, corrupt_tar, output_tar, reconstructed_paths)
paths_missed = (set(get_files_in_tar(test_output_dir, base_tar)) -
set(get_files_in_tar(test_output_dir, output_tar)))
assert paths_missed.issubset({
"postgresql.auto.conf",
"pg_ident.conf",
})
# Example command:
# poetry run python scripts/add_missing_rels.py \
# --base-tar /home/bojan/src/neondatabase/neon/test_output/test_import_from_pageserver/psql_2.stdout \
# --output-tar output-base.tar \
# --log-dir /home/bojan/tmp
# --pg-distrib-dir /home/bojan/src/neondatabase/neon/tmp_install/
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--base-tar',
dest='base_tar',
required=True,
help='base.tar file to add missing rels to (file will not be modified)',
)
parser.add_argument(
'--output-tar',
dest='output_tar',
required=True,
help='path and name for the output base.tar file',
)
parser.add_argument(
'--log-dir',
dest='log_dir',
required=True,
help='directory to save log files in',
)
parser.add_argument(
'--pg-distrib-dir',
dest='pg_distrib_dir',
required=True,
help='directory where postgres is installed',
)
args = parser.parse_args()
base_tar = args.base_tar
output_tar = args.output_tar
log_dir = args.log_dir
pg_bin = PgBin(log_dir, args.pg_distrib_dir)
reconstructed_paths = set(get_rel_paths(log_dir, pg_bin, base_tar))
touch_missing_rels(log_dir, base_tar, output_tar, reconstructed_paths)

View File

@@ -0,0 +1,232 @@
#
# Simple script to export nodes from one pageserver
# and import them into another page server
#
from os import path
import os
import requests
import uuid
import subprocess
import argparse
from pathlib import Path
# directory to save exported tar files to
basepath = path.dirname(path.abspath(__file__))
class NeonPageserverApiException(Exception):
pass
class NeonPageserverHttpClient(requests.Session):
def __init__(self, host, port):
super().__init__()
self.host = host
self.port = port
def verbose_error(self, res: requests.Response):
try:
res.raise_for_status()
except requests.RequestException as e:
try:
msg = res.json()['msg']
except:
msg = ''
raise NeonPageserverApiException(msg) from e
def check_status(self):
self.get(f"http://{self.host}:{self.port}/v1/status").raise_for_status()
def tenant_list(self):
res = self.get(f"http://{self.host}:{self.port}/v1/tenant")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def tenant_create(self, new_tenant_id: uuid.UUID, ok_if_exists):
res = self.post(
f"http://{self.host}:{self.port}/v1/tenant",
json={
'new_tenant_id': new_tenant_id.hex,
},
)
if res.status_code == 409:
if ok_if_exists:
print(f'could not create tenant: already exists for id {new_tenant_id}')
else:
res.raise_for_status()
elif res.status_code == 201:
print(f'created tenant {new_tenant_id}')
else:
self.verbose_error(res)
return new_tenant_id
def timeline_list(self, tenant_id: uuid.UUID):
res = self.get(f"http://{self.host}:{self.port}/v1/tenant/{tenant_id.hex}/timeline")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
import pytest
import os
def add_missing_empty_rels(base_tar, output_tar):
os.environ['INPUT_BASE_TAR'] = base_tar
os.environ['OUTPUT_BASE_TAR'] = output_tar
pytest.main(["-s", "-k", "test_main_hack"])
def main(args: argparse.Namespace):
old_pageserver_host = args.old_pageserver_host
new_pageserver_host = args.new_pageserver_host
tenants = args.tenants
old_http_client = NeonPageserverHttpClient(old_pageserver_host, args.old_pageserver_http_port)
old_http_client.check_status()
old_pageserver_connstr = f"postgresql://{old_pageserver_host}:{args.old_pageserver_pg_port}"
new_http_client = NeonPageserverHttpClient(new_pageserver_host, args.new_pageserver_http_port)
new_http_client.check_status()
new_pageserver_connstr = f"postgresql://{new_pageserver_host}:{args.new_pageserver_pg_port}"
psql_env = {**os.environ, 'LD_LIBRARY_PATH': '/usr/local/lib/'}
for tenant_id in tenants:
print(f"Tenant: {tenant_id}")
timelines = old_http_client.timeline_list(uuid.UUID(tenant_id))
print(f"Timelines: {timelines}")
# Create tenant in new pageserver
if args.only_import is False:
new_http_client.tenant_create(uuid.UUID(tenant_id), args.ok_if_exists)
for timeline in timelines:
# Export timelines from old pageserver
if args.only_import is False:
query = f"fullbackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']}"
cmd = [args.psql_path, "--no-psqlrc", old_pageserver_connstr, "-c", query]
print(f"Running: {cmd}")
tar_filename = path.join(basepath,
f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar")
stderr_filename = path.join(
basepath, f"{timeline['tenant_id']}_{timeline['timeline_id']}.stderr")
with open(tar_filename, 'w') as stdout_f:
with open(stderr_filename, 'w') as stderr_f:
print(f"(capturing output to {tar_filename})")
subprocess.run(cmd, stdout=stdout_f, stderr=stderr_f, env=psql_env)
# add_missing_emtpy_rels(incomplete_tar_filename, tar_filename)
print(f"Done export: {tar_filename}")
# Import timelines to new pageserver
psql_path = Path(args.psql_path)
import_cmd = f"import basebackup {timeline['tenant_id']} {timeline['timeline_id']} {timeline['local']['last_record_lsn']} {timeline['local']['last_record_lsn']}"
tar_filename = path.join(basepath,
f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar")
full_cmd = rf"""cat {tar_filename} | {psql_path} {new_pageserver_connstr} -c '{import_cmd}' """
stderr_filename2 = path.join(
basepath, f"import_{timeline['tenant_id']}_{timeline['timeline_id']}.stderr")
stdout_filename = path.join(
basepath, f"import_{timeline['tenant_id']}_{timeline['timeline_id']}.stdout")
print(f"Running: {full_cmd}")
with open(stdout_filename, 'w') as stdout_f:
with open(stderr_filename2, 'w') as stderr_f:
print(f"(capturing output to {stdout_filename})")
subprocess.run(full_cmd,
stdout=stdout_f,
stderr=stderr_f,
env=psql_env,
shell=True)
print(f"Done import")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--tenant-id',
dest='tenants',
required=True,
nargs='+',
help='Id of the tenant to migrate. You can pass multiple arguments',
)
parser.add_argument(
'--from-host',
dest='old_pageserver_host',
required=True,
help='Host of the pageserver to migrate data from',
)
parser.add_argument(
'--from-http-port',
dest='old_pageserver_http_port',
required=False,
type=int,
default=9898,
help='HTTP port of the pageserver to migrate data from. Default: 9898',
)
parser.add_argument(
'--from-pg-port',
dest='old_pageserver_pg_port',
required=False,
type=int,
default=6400,
help='pg port of the pageserver to migrate data from. Default: 6400',
)
parser.add_argument(
'--to-host',
dest='new_pageserver_host',
required=True,
help='Host of the pageserver to migrate data to',
)
parser.add_argument(
'--to-http-port',
dest='new_pageserver_http_port',
required=False,
default=9898,
type=int,
help='HTTP port of the pageserver to migrate data to. Default: 9898',
)
parser.add_argument(
'--to-pg-port',
dest='new_pageserver_pg_port',
required=False,
default=6400,
type=int,
help='pg port of the pageserver to migrate data to. Default: 6400',
)
parser.add_argument(
'--ignore-tenant-exists',
dest='ok_if_exists',
required=False,
help=
'Ignore error if we are trying to create the tenant that already exists. It can be dangerous if existing tenant already contains some data.',
)
parser.add_argument(
'--psql-path',
dest='psql_path',
required=False,
default='/usr/local/bin/psql',
help='Path to the psql binary. Default: /usr/local/bin/psql',
)
parser.add_argument(
'--only-import',
dest='only_import',
required=False,
default=False,
action='store_true',
help='Skip export and tenant creation part',
)
args = parser.parse_args()
main(args)

View File

@@ -28,6 +28,10 @@ strict = true
# There is some work in progress, though: https://github.com/MagicStack/asyncpg/pull/577
ignore_missing_imports = true
[mypy-pg8000.*]
# Used only in testing clients
ignore_missing_imports = true
[mypy-cached_property.*]
ignore_missing_imports = true

View File

@@ -1,6 +1,3 @@
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverApiException

View File

@@ -1,8 +1,6 @@
from contextlib import closing
from typing import Iterator
from uuid import UUID, uuid4
from uuid import uuid4
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserverApiException
from requests.exceptions import HTTPError
import pytest

View File

@@ -1,11 +1,9 @@
from contextlib import closing, contextmanager
import psycopg2.extras
import pytest
from fixtures.neon_fixtures import PgProtocol, NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.log_helper import log
import os
import time
import asyncpg
from fixtures.neon_fixtures import Postgres
import threading

View File

@@ -1,8 +1,6 @@
import pytest
from contextlib import closing
from fixtures.neon_fixtures import NeonEnv
from fixtures.log_helper import log
#

View File

@@ -1,4 +1,3 @@
import subprocess
from contextlib import closing
import psycopg2.extras

View File

@@ -0,0 +1,167 @@
from fixtures.neon_fixtures import VanillaPostgres
from fixtures.utils import subprocess_capture
import os
import shutil
from pathlib import Path
import tempfile
def get_rel_paths(log_dir, pg_bin, base_tar):
"""Yeild list of relation paths"""
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
port = "55439" # Probably free
with VanillaPostgres(restored_dir, pg_bin, port, init=False) as vanilla_pg:
vanilla_pg.configure([f"port={port}"])
vanilla_pg.start()
# Create database based on template0 because we can't connect to template0
query = "create database template0copy template template0"
vanilla_pg.safe_psql(query, user="cloud_admin")
vanilla_pg.safe_psql("CHECKPOINT", user="cloud_admin")
# Get all databases
query = "select oid, datname from pg_database"
oid_dbname_pairs = vanilla_pg.safe_psql(query, user="cloud_admin")
template0_oid = [
oid
for (oid, database) in oid_dbname_pairs
if database == "template0"
][0]
# Get rel paths for each database
for oid, database in oid_dbname_pairs:
if database == "template0":
# We can't connect to template0
continue
query = "select relname, pg_relation_filepath(oid) from pg_class"
result = vanilla_pg.safe_psql(query, user="cloud_admin", dbname=database)
for relname, filepath in result:
if filepath is not None:
if database == "template0copy":
# Add all template0copy paths to template0
prefix = f"base/{oid}/"
if filepath.startswith(prefix):
suffix = filepath[len(prefix):]
yield f"base/{template0_oid}/{suffix}"
elif filepath.startswith("global"):
print(f"skipping {database} global file {filepath}")
else:
raise AssertionError
else:
yield filepath
def pack_base(log_dir, restored_dir, output_tar):
tmp_tar_name = "tmp.tar"
tmp_tar_path = os.path.join(restored_dir, tmp_tar_name)
cmd = ["tar", "-cf", tmp_tar_name] + os.listdir(restored_dir)
subprocess_capture(log_dir, cmd, cwd=restored_dir)
shutil.move(tmp_tar_path, output_tar)
def get_files_in_tar(log_dir, tar):
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", tar, "-C", restored_dir])
# Find empty files
empty_files = []
for root, dirs, files in os.walk(restored_dir):
for name in files:
file_path = os.path.join(root, name)
yield file_path[len(restored_dir) + 1:]
def corrupt(log_dir, base_tar, output_tar):
"""Remove all empty files and repackage. Return paths of files removed."""
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", base_tar, "-C", restored_dir])
# Find empty files
empty_files = []
for root, dirs, files in os.walk(restored_dir):
for name in files:
file_path = os.path.join(root, name)
file_size = os.path.getsize(file_path)
if file_size == 0:
empty_files.append(file_path)
# Delete empty files (just to see if they get recreated)
for empty_file in empty_files:
os.remove(empty_file)
# Repackage
pack_base(log_dir, restored_dir, output_tar)
# Return relative paths
return {
empty_file[len(restored_dir) + 1:]
for empty_file in empty_files
}
def touch_missing_rels(log_dir, corrupt_tar, output_tar, paths):
with tempfile.TemporaryDirectory() as restored_dir:
# Unpack the base tar
subprocess_capture(log_dir, ["tar", "-xf", corrupt_tar, "-C", restored_dir])
# Touch files that don't exist
for path in paths:
absolute_path = os.path.join(restored_dir, path)
exists = os.path.exists(absolute_path)
if not exists:
print("File {absolute_path} didn't exist. Creating..")
Path(absolute_path).touch()
# Repackage
pack_base(log_dir, restored_dir, output_tar)
def test_complete(test_output_dir, pg_bin):
# Specify directories
# TODO make a basebackup instead of using one from another test
work_dir = "/home/bojan/src/neondatabase/neon/test_output/test_import_from_pageserver/"
base_tar = os.path.join(work_dir, "psql_2.stdout")
output_tar = os.path.join(work_dir, "psql_2-completed.stdout")
# Create new base tar with missing empty files
corrupt_tar = os.path.join(test_output_dir, "psql_2-corrupted.stdout")
deleted_files = corrupt(test_output_dir, base_tar, corrupt_tar)
assert len(set(get_files_in_tar(test_output_dir, base_tar)) -
set(get_files_in_tar(test_output_dir, corrupt_tar))) > 0
# Reconstruct paths from the corrupted tar, assert it covers everything important
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, corrupt_tar))
paths_missed = deleted_files - reconstructed_paths
assert paths_missed.issubset({
"postgresql.auto.conf",
"pg_ident.conf",
})
# Recreate the correct tar by touching files, compare with original tar
touch_missing_rels(test_output_dir, corrupt_tar, output_tar, reconstructed_paths)
paths_missed = (set(get_files_in_tar(test_output_dir, base_tar)) -
set(get_files_in_tar(test_output_dir, output_tar)))
assert paths_missed.issubset({
"postgresql.auto.conf",
"pg_ident.conf",
})
# HACK this script relies on test fixtures, but you can run it with
# poetry run pytest -k test_main_hack and pass inputs via envvars
#
# The script takes a base tar, infers what empty rel files might be missing
# and creates a new base tar with those files included. It does not modify
# the original file.
def test_main_hack(test_output_dir, pg_bin, pytestconfig):
base_tar = os.environ['INPUT_BASE_TAR']
output_tar = os.environ['OUTPUT_BASE_TAR']
reconstructed_paths = set(get_rel_paths(test_output_dir, pg_bin, base_tar))
touch_missing_rels(test_output_dir, base_tar, output_tar, reconstructed_paths)

View File

@@ -1,16 +1,10 @@
import subprocess
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PortDistributor, VanillaPostgres
from fixtures.neon_fixtures import pg_distrib_dir
import os
from fixtures.utils import mkdir_if_needed, subprocess_capture
import shutil
import getpass
import pwd
from fixtures.utils import subprocess_capture
num_rows = 1000
@@ -46,19 +40,20 @@ def test_fullbackup(neon_env_builder: NeonEnvBuilder,
psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')}
# Get and unpack fullbackup from pageserver
restored_dir_path = os.path.join(env.repo_dir, "restored_datadir")
restored_dir_path = env.repo_dir / "restored_datadir"
os.mkdir(restored_dir_path, 0o750)
query = f"fullbackup {env.initial_tenant.hex} {timeline} {lsn}"
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
tar_output_file = result_basepath + ".stdout"
subprocess_capture(str(env.repo_dir), ["tar", "-xf", tar_output_file, "-C", restored_dir_path])
subprocess_capture(str(env.repo_dir),
["tar", "-xf", tar_output_file, "-C", str(restored_dir_path)])
# HACK
# fullbackup returns neon specific pg_control and first WAL segment
# use resetwal to overwrite it
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, 'pg_resetwal')
cmd = [pg_resetwal_path, "-D", restored_dir_path]
cmd = [pg_resetwal_path, "-D", str(restored_dir_path)]
pg_bin.run_capture(cmd, env=psql_env)
# Restore from the backup and find the data we inserted

View File

@@ -11,7 +11,7 @@ import signal
import pytest
from fixtures.neon_fixtures import PgProtocol, PortDistributor, Postgres, NeonEnvBuilder, Etcd, NeonPageserverHttpClient, assert_local, wait_until, wait_for_last_record_lsn, wait_for_upload, neon_binpath, pg_distrib_dir
from fixtures.utils import lsn_from_hex
from fixtures.utils import lsn_from_hex, subprocess_capture
def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
@@ -101,10 +101,6 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve
log.info('load thread stopped')
@pytest.mark.skip(
reason=
"needs to replace callmemaybe call with better idea how to migrate timelines between pageservers"
)
@pytest.mark.parametrize('with_load', ['with_load', 'without_load'])
def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
port_distributor: PortDistributor,
@@ -188,30 +184,38 @@ def test_tenant_relocation(neon_env_builder: NeonEnvBuilder,
new_pageserver_http_port,
neon_env_builder.broker):
# call to attach timeline to new pageserver
new_pageserver_http.timeline_attach(tenant, timeline)
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_timeline_detail = wait_until(
number_of_iterations=5,
interval=1,
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
# Migrate either by attacking from s3 or import/export basebackup
relocation_method = "import"
if relocation_method == "import":
scripts_dir = "/home/bojan/src/neondatabase/neon/scripts/"
cmd = [
"python",
os.path.join(scripts_dir, "export_import_betwen_pageservers.py"),
"--tenant-id", tenant.hex,
"--from-host", "localhost",
"--from-http-port", str(pageserver_http.port),
"--from-pg-port", str(env.pageserver.service_port.pg),
"--to-host", "localhost",
"--to-http-port", str(new_pageserver_http_port),
"--to-pg-port", str(new_pageserver_pg_port),
"--psql-path", os.path.join(pg_distrib_dir, "bin", "psql"),
]
subprocess_capture(env.repo_dir, cmd, check=True)
elif relocation_method == "attach":
# call to attach timeline to new pageserver
new_pageserver_http.timeline_attach(tenant, timeline)
# when load is active these checks can break because lsns are not static
# so lets check with some margin
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
0.03)
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_timeline_detail = wait_until(
number_of_iterations=5,
interval=1,
func=lambda: assert_local(new_pageserver_http, tenant, timeline))
# callmemaybe to start replication from safekeeper to the new pageserver
# when there is no load there is a clean checkpoint and no wal delta
# needs to be streamed to the new pageserver
# TODO (rodionov) use attach to start replication
with pg_cur(PgProtocol(host='localhost', port=new_pageserver_pg_port)) as cur:
# "callmemaybe {} {} host={} port={} options='-c ztimelineid={} ztenantid={}'"
safekeeper_connstring = f"host=localhost port={env.safekeepers[0].port.pg} options='-c ztimelineid={timeline} ztenantid={tenant} pageserver_connstr=postgresql://no_user:@localhost:{new_pageserver_pg_port}'"
cur.execute("callmemaybe {} {} {}".format(tenant.hex,
timeline.hex,
safekeeper_connstring))
# when load is active these checks can break because lsns are not static
# so lets check with some margin
assert_abs_margin_ratio(lsn_from_hex(new_timeline_detail['local']['disk_consistent_lsn']),
lsn_from_hex(timeline_detail['local']['disk_consistent_lsn']),
0.03)
tenant_pg.stop()

View File

@@ -0,0 +1,70 @@
from fixtures.neon_fixtures import NeonEnvBuilder, wait_until
from uuid import UUID
import time
def get_only_element(l):
assert len(l) == 1
return l[0]
# Test that gc and compaction tenant tasks start and stop correctly
def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
# The gc and compaction loops don't bother to watch for tenant state
# changes while sleeping, so we use small periods to make this test
# run faster. With default settings we'd have to wait longer for tasks
# to notice state changes and shut down.
# TODO fix this behavior in the pageserver
tenant_config = "{gc_period = '1 s', compaction_period = '1 s'}"
neon_env_builder.pageserver_config_override = f"tenant_config={tenant_config}"
name = "test_tenant_tasks"
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
def get_state(tenant):
all_states = client.tenant_list()
matching = [t for t in all_states if t["id"] == tenant.hex]
return get_only_element(matching)["state"]
def get_metric_value(name):
metrics = client.get_metrics()
relevant = [line for line in metrics.splitlines() if line.startswith(name)]
if len(relevant) == 0:
return 0
line = get_only_element(relevant)
value = line.lstrip(name).strip()
return int(value)
def detach_all_timelines(tenant):
timelines = [UUID(t["timeline_id"]) for t in client.timeline_list(tenant)]
for t in timelines:
client.timeline_detach(tenant, t)
def assert_idle(tenant):
assert get_state(tenant) == "Idle"
# Create tenant, start compute
tenant, _ = env.neon_cli.create_tenant()
timeline = env.neon_cli.create_timeline(name, tenant_id=tenant)
pg = env.postgres.create_start(name, tenant_id=tenant)
assert (get_state(tenant) == "Active")
# Stop compute
pg.stop()
# Detach all tenants and wait for them to go idle
# TODO they should be already idle since there are no active computes
for tenant_info in client.tenant_list():
tenant_id = UUID(tenant_info["id"])
detach_all_timelines(tenant_id)
wait_until(10, 0.2, lambda: assert_idle(tenant_id))
# Assert that all tasks finish quickly after tenants go idle
def assert_tasks_finish():
tasks_started = get_metric_value('pageserver_tenant_task_events{event="start"}')
tasks_ended = get_metric_value('pageserver_tenant_task_events{event="stop"}')
tasks_panicked = get_metric_value('pageserver_tenant_task_events{event="panic"}')
assert tasks_started == tasks_ended
assert tasks_panicked == 0
wait_until(10, 0.2, assert_tasks_finish)

View File

@@ -1,3 +1,4 @@
import pathlib
import pytest
import random
import time
@@ -14,7 +15,7 @@ from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.neon_fixtures import PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, neon_binpath, PgProtocol
from fixtures.utils import get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.utils import get_dir_size, lsn_to_hex, lsn_from_hex
from fixtures.log_helper import log
from typing import List, Optional, Any
from uuid import uuid4
@@ -645,7 +646,7 @@ class ProposerPostgres(PgProtocol):
def create_dir_config(self, safekeepers: str):
""" Create dir and config for running --sync-safekeepers """
mkdir_if_needed(self.pg_data_dir_path())
pathlib.Path(self.pg_data_dir_path()).mkdir(exist_ok=True)
with open(self.config_file_path(), "w") as f:
cfg = [
"synchronous_standby_names = 'walproposer'\n",
@@ -828,7 +829,7 @@ class SafekeeperEnv:
self.timeline_id = uuid.uuid4()
self.tenant_id = uuid.uuid4()
mkdir_if_needed(str(self.repo_dir))
self.repo_dir.mkdir(exist_ok=True)
# Create config and a Safekeeper object for each safekeeper
self.safekeepers = []
@@ -847,8 +848,8 @@ class SafekeeperEnv:
http=self.port_distributor.get_port(),
)
safekeeper_dir = os.path.join(self.repo_dir, f"sk{i}")
mkdir_if_needed(safekeeper_dir)
safekeeper_dir = self.repo_dir / f"sk{i}"
safekeeper_dir.mkdir(exist_ok=True)
args = [
self.bin_safekeeper,
@@ -857,7 +858,7 @@ class SafekeeperEnv:
"--listen-http",
f"127.0.0.1:{port.http}",
"-D",
safekeeper_dir,
str(safekeeper_dir),
"--id",
str(i),
"--broker-endpoints",

View File

@@ -1,19 +1,17 @@
import os
import subprocess
from pathlib import Path
from fixtures.neon_fixtures import (NeonEnvBuilder,
VanillaPostgres,
PortDistributor,
PgBin,
base_dir,
vanilla_pg,
pg_distrib_dir)
from fixtures.log_helper import log
def test_wal_restore(neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
test_output_dir,
test_output_dir: Path,
port_distributor: PortDistributor):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_wal_restore")
@@ -22,13 +20,13 @@ def test_wal_restore(neon_env_builder: NeonEnvBuilder,
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
env.neon_cli.pageserver_stop()
port = port_distributor.get_port()
data_dir = os.path.join(test_output_dir, 'pgsql.restored')
data_dir = test_output_dir / 'pgsql.restored'
with VanillaPostgres(data_dir, PgBin(test_output_dir), port) as restored:
pg_bin.run_capture([
os.path.join(base_dir, 'libs/utils/scripts/restore_from_wal.sh'),
os.path.join(pg_distrib_dir, 'bin'),
os.path.join(test_output_dir, 'repo/safekeepers/sk1/{}/*'.format(tenant_id)),
data_dir,
str(test_output_dir / 'repo' / 'safekeepers' / 'sk1' / str(tenant_id) / '*'),
str(data_dir),
str(port)
])
restored.start()

View File

@@ -1,13 +1,13 @@
import os
from pathlib import Path
import pytest
from fixtures.utils import mkdir_if_needed
from fixtures.neon_fixtures import NeonEnv, base_dir, pg_distrib_dir
# The isolation tests run for a long time, especially in debug mode,
# so use a larger-than-default timeout.
@pytest.mark.timeout(1800)
def test_isolation(neon_simple_env: NeonEnv, test_output_dir, pg_bin, capsys):
def test_isolation(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, capsys):
env = neon_simple_env
env.neon_cli.create_branch("test_isolation", "empty")
@@ -17,9 +17,8 @@ def test_isolation(neon_simple_env: NeonEnv, test_output_dir, pg_bin, capsys):
pg.safe_psql('CREATE DATABASE isolation_regression')
# Create some local directories for pg_isolation_regress to run in.
runpath = os.path.join(test_output_dir, 'regress')
mkdir_if_needed(runpath)
mkdir_if_needed(os.path.join(runpath, 'testtablespace'))
runpath = test_output_dir / 'regress'
(runpath / 'testtablespace').mkdir(parents=True)
# Compute all the file locations that pg_isolation_regress will need.
build_path = os.path.join(pg_distrib_dir, 'build/src/test/isolation')

View File

@@ -1,6 +1,6 @@
import os
from pathlib import Path
from fixtures.utils import mkdir_if_needed
from fixtures.neon_fixtures import (NeonEnv,
check_restored_datadir_content,
base_dir,
@@ -8,7 +8,7 @@ from fixtures.neon_fixtures import (NeonEnv,
from fixtures.log_helper import log
def test_neon_regress(neon_simple_env: NeonEnv, test_output_dir, pg_bin, capsys):
def test_neon_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, capsys):
env = neon_simple_env
env.neon_cli.create_branch("test_neon_regress", "empty")
@@ -17,9 +17,8 @@ def test_neon_regress(neon_simple_env: NeonEnv, test_output_dir, pg_bin, capsys)
pg.safe_psql('CREATE DATABASE regression')
# Create some local directories for pg_regress to run in.
runpath = os.path.join(test_output_dir, 'regress')
mkdir_if_needed(runpath)
mkdir_if_needed(os.path.join(runpath, 'testtablespace'))
runpath = test_output_dir / 'regress'
(runpath / 'testtablespace').mkdir(parents=True)
# Compute all the file locations that pg_regress will need.
# This test runs neon specific tests

View File

@@ -1,13 +1,13 @@
import os
import pathlib
import pytest
from fixtures.utils import mkdir_if_needed
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content, base_dir, pg_distrib_dir
# The pg_regress tests run for a long time, especially in debug mode,
# so use a larger-than-default timeout.
@pytest.mark.timeout(1800)
def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: str, pg_bin, capsys):
def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: pathlib.Path, pg_bin, capsys):
env = neon_simple_env
env.neon_cli.create_branch("test_pg_regress", "empty")
@@ -16,9 +16,8 @@ def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: str, pg_bin, caps
pg.safe_psql('CREATE DATABASE regression')
# Create some local directories for pg_regress to run in.
runpath = os.path.join(test_output_dir, 'regress')
mkdir_if_needed(runpath)
mkdir_if_needed(os.path.join(runpath, 'testtablespace'))
runpath = test_output_dir / 'regress'
(runpath / 'testtablespace').mkdir(parents=True)
# Compute all the file locations that pg_regress will need.
build_path = os.path.join(pg_distrib_dir, 'build/src/test/regress')
@@ -51,7 +50,7 @@ def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: str, pg_bin, caps
# checkpoint one more time to ensure that the lsn we get is the latest one
pg.safe_psql('CHECKPOINT')
lsn = pg.safe_psql('select pg_current_wal_insert_lsn()')[0][0]
pg.safe_psql('select pg_current_wal_insert_lsn()')[0][0]
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, pg)

View File

@@ -35,12 +35,7 @@ from typing_extensions import Literal
import requests
import backoff # type: ignore
from .utils import (etcd_path,
get_self_dir,
mkdir_if_needed,
subprocess_capture,
lsn_from_hex,
lsn_to_hex)
from .utils import (etcd_path, get_self_dir, subprocess_capture, lsn_from_hex, lsn_to_hex)
from fixtures.log_helper import log
"""
This file contains pytest fixtures. A fixture is a test resource that can be
@@ -127,7 +122,7 @@ def pytest_configure(config):
top_output_dir = env_test_output
else:
top_output_dir = os.path.join(base_dir, DEFAULT_OUTPUT_DIR)
mkdir_if_needed(top_output_dir)
pathlib.Path(top_output_dir).mkdir(exist_ok=True)
# Find the postgres installation.
global pg_distrib_dir
@@ -1316,7 +1311,7 @@ def append_pageserver_param_overrides(
class PgBin:
""" A helper class for executing postgres binaries """
def __init__(self, log_dir: str):
def __init__(self, log_dir: Path):
self.log_dir = log_dir
self.pg_bin_path = os.path.join(str(pg_distrib_dir), 'bin')
self.env = os.environ.copy()
@@ -1367,22 +1362,27 @@ class PgBin:
self._fixpath(command)
log.info('Running command "{}"'.format(' '.join(command)))
env = self._build_env(env)
return subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True, **kwargs)
return subprocess_capture(str(self.log_dir),
command,
env=env,
cwd=cwd,
check=True,
**kwargs)
@pytest.fixture(scope='function')
def pg_bin(test_output_dir: str) -> PgBin:
def pg_bin(test_output_dir: Path) -> PgBin:
return PgBin(test_output_dir)
class VanillaPostgres(PgProtocol):
def __init__(self, pgdatadir: str, pg_bin: PgBin, port: int, init=True):
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init=True):
super().__init__(host='localhost', port=port, dbname='postgres')
self.pgdatadir = pgdatadir
self.pg_bin = pg_bin
self.running = False
if init:
self.pg_bin.run_capture(['initdb', '-D', pgdatadir])
self.pg_bin.run_capture(['initdb', '-D', str(pgdatadir)])
self.configure([f"port = {port}\n"])
def configure(self, options: List[str]):
@@ -1398,12 +1398,13 @@ class VanillaPostgres(PgProtocol):
if log_path is None:
log_path = os.path.join(self.pgdatadir, "pg.log")
self.pg_bin.run_capture(['pg_ctl', '-w', '-D', self.pgdatadir, '-l', log_path, 'start'])
self.pg_bin.run_capture(
['pg_ctl', '-w', '-D', str(self.pgdatadir), '-l', log_path, 'start'])
def stop(self):
assert self.running
self.running = False
self.pg_bin.run_capture(['pg_ctl', '-w', '-D', self.pgdatadir, 'stop'])
self.pg_bin.run_capture(['pg_ctl', '-w', '-D', str(self.pgdatadir), 'stop'])
def get_subdir_size(self, subdir) -> int:
"""Return size of pgdatadir subdirectory in bytes."""
@@ -1418,9 +1419,9 @@ class VanillaPostgres(PgProtocol):
@pytest.fixture(scope='function')
def vanilla_pg(test_output_dir: str,
def vanilla_pg(test_output_dir: Path,
port_distributor: PortDistributor) -> Iterator[VanillaPostgres]:
pgdatadir = os.path.join(test_output_dir, "pgdata-vanilla")
pgdatadir = test_output_dir / "pgdata-vanilla"
pg_bin = PgBin(test_output_dir)
port = port_distributor.get_port()
with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
@@ -1457,7 +1458,7 @@ class RemotePostgres(PgProtocol):
@pytest.fixture(scope='function')
def remote_pg(test_output_dir: str) -> Iterator[RemotePostgres]:
def remote_pg(test_output_dir: Path) -> Iterator[RemotePostgres]:
pg_bin = PgBin(test_output_dir)
connstr = os.getenv("BENCHMARK_CONNSTR")
@@ -1924,9 +1925,12 @@ class Etcd:
datadir: str
port: int
peer_port: int
binary_path: Path = etcd_path()
binary_path: Path = field(init=False)
handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon
def __post_init__(self):
self.binary_path = etcd_path()
def client_url(self):
return f'http://127.0.0.1:{self.port}'
@@ -1980,11 +1984,13 @@ class Etcd:
self.handle.wait()
def get_test_output_dir(request: Any) -> str:
def get_test_output_dir(request: Any) -> pathlib.Path:
""" Compute the working directory for an individual test. """
test_name = request.node.name
test_dir = os.path.join(str(top_output_dir), test_name)
test_dir = pathlib.Path(top_output_dir) / test_name.replace("/", "-")
log.info(f'get_test_output_dir is {test_dir}')
# make mypy happy
assert isinstance(test_dir, pathlib.Path)
return test_dir
@@ -1998,14 +2004,14 @@ def get_test_output_dir(request: Any) -> str:
# this fixture ensures that the directory exists. That works because
# 'autouse' fixtures are run before other fixtures.
@pytest.fixture(scope='function', autouse=True)
def test_output_dir(request: Any) -> str:
def test_output_dir(request: Any) -> pathlib.Path:
""" Create the working directory for an individual test. """
# one directory per test
test_dir = get_test_output_dir(request)
log.info(f'test_output_dir is {test_dir}')
shutil.rmtree(test_dir, ignore_errors=True)
mkdir_if_needed(test_dir)
test_dir.mkdir()
return test_dir
@@ -2051,7 +2057,7 @@ def should_skip_file(filename: str) -> bool:
#
# Test helpers
#
def list_files_to_compare(pgdata_dir: str):
def list_files_to_compare(pgdata_dir: pathlib.Path):
pgdata_files = []
for root, _file, filenames in os.walk(pgdata_dir):
for filename in filenames:
@@ -2068,7 +2074,7 @@ def list_files_to_compare(pgdata_dir: str):
# pg is the existing and running compute node, that we want to compare with a basebackup
def check_restored_datadir_content(test_output_dir: str, env: NeonEnv, pg: Postgres):
def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Postgres):
# Get the timeline ID. We need it for the 'basebackup' command
with closing(pg.connect()) as conn:
@@ -2080,8 +2086,8 @@ def check_restored_datadir_content(test_output_dir: str, env: NeonEnv, pg: Postg
pg.stop()
# Take a basebackup from pageserver
restored_dir_path = os.path.join(env.repo_dir, f"{pg.node_name}_restored_datadir")
mkdir_if_needed(restored_dir_path)
restored_dir_path = env.repo_dir / f"{pg.node_name}_restored_datadir"
restored_dir_path.mkdir(exist_ok=True)
pg_bin = PgBin(test_output_dir)
psql_path = os.path.join(pg_bin.pg_bin_path, 'psql')
@@ -2108,7 +2114,7 @@ def check_restored_datadir_content(test_output_dir: str, env: NeonEnv, pg: Postg
# list files we're going to compare
assert pg.pgdata_dir
pgdata_files = list_files_to_compare(pg.pgdata_dir)
pgdata_files = list_files_to_compare(pathlib.Path(pg.pgdata_dir))
restored_files = list_files_to_compare(restored_dir_path)
# check that file sets are equal
@@ -2140,7 +2146,7 @@ def check_restored_datadir_content(test_output_dir: str, env: NeonEnv, pg: Postg
assert (mismatch, error) == ([], [])
def wait_until(number_of_iterations: int, interval: int, func):
def wait_until(number_of_iterations: int, interval: float, func):
"""
Wait until 'func' returns successfully, without exception. Returns the last return value
from the the function.

View File

@@ -12,18 +12,6 @@ def get_self_dir() -> str:
return os.path.dirname(os.path.abspath(__file__))
def mkdir_if_needed(path: str) -> None:
""" Create a directory if it doesn't already exist
Note this won't try to create intermediate directories.
"""
try:
os.mkdir(path)
except FileExistsError:
pass
assert os.path.isdir(path)
def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str:
""" Run a process and capture its output

View File

@@ -0,0 +1,2 @@
bin/
obj/

View File

@@ -0,0 +1,2 @@
bin/
obj/

View File

@@ -0,0 +1,14 @@
FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build
WORKDIR /source
COPY *.csproj .
RUN dotnet restore
COPY . .
RUN dotnet publish -c release -o /app --no-restore
FROM mcr.microsoft.com/dotnet/runtime:6.0
WORKDIR /app
COPY --from=build /app .
ENTRYPOINT ["dotnet", "csharp-npgsql.dll"]

View File

@@ -0,0 +1,19 @@
using Npgsql;
var host = Environment.GetEnvironmentVariable("NEON_HOST");
var database = Environment.GetEnvironmentVariable("NEON_DATABASE");
var user = Environment.GetEnvironmentVariable("NEON_USER");
var password = Environment.GetEnvironmentVariable("NEON_PASSWORD");
var connString = $"Host={host};Username={user};Password={password};Database={database}";
await using var conn = new NpgsqlConnection(connString);
await conn.OpenAsync();
await using (var cmd = new NpgsqlCommand("SELECT 1", conn))
await using (var reader = await cmd.ExecuteReaderAsync())
{
while (await reader.ReadAsync())
Console.WriteLine(reader.GetInt32(0));
}
await conn.CloseAsync();

View File

@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Npgsql" Version="6.0.5" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,10 @@
FROM openjdk:17
WORKDIR /source
COPY . .
WORKDIR /app
RUN curl --output postgresql.jar https://jdbc.postgresql.org/download/postgresql-42.4.0.jar && \
javac -d /app /source/Example.java
CMD ["java", "-cp", "/app/postgresql.jar:.", "Example"]

View File

@@ -0,0 +1,31 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Properties;
public class Example
{
public static void main( String[] args ) throws Exception
{
String host = System.getenv("NEON_HOST");
String database = System.getenv("NEON_DATABASE");
String user = System.getenv("NEON_USER");
String password = System.getenv("NEON_PASSWORD");
String url = "jdbc:postgresql://%s/%s".formatted(host, database);
Properties props = new Properties();
props.setProperty("user", user);
props.setProperty("password", password);
Connection conn = DriverManager.getConnection(url, props);
Statement st = conn.createStatement();
ResultSet rs = st.executeQuery("SELECT 1");
while (rs.next())
{
System.out.println(rs.getString(1));
}
rs.close();
st.close();
}
}

View File

@@ -0,0 +1,8 @@
FROM python:3.10
WORKDIR /source
COPY . .
RUN python3 -m pip install --no-cache-dir -r requirements.txt
CMD ["python3", "asyncpg_example.py"]

View File

@@ -0,0 +1,30 @@
#! /usr/bin/env python3
import asyncio
import os
import asyncpg
async def run(**kwargs) -> asyncpg.Record:
conn = await asyncpg.connect(
**kwargs,
statement_cache_size=0, # Prepared statements doesn't work pgbouncer
)
rv = await conn.fetchrow("SELECT 1")
await conn.close()
return rv
if __name__ == "__main__":
kwargs = {
k.lstrip("NEON_").lower(): v
for k in ("NEON_HOST", "NEON_DATABASE", "NEON_USER", "NEON_PASSWORD")
if (v := os.environ.get(k, None)) is not None
}
loop = asyncio.new_event_loop()
row = loop.run_until_complete(run(**kwargs))
print(row[0])

View File

@@ -0,0 +1 @@
asyncpg==0.25.0

View File

@@ -0,0 +1,8 @@
FROM python:3.10
WORKDIR /source
COPY . .
RUN python3 -m pip install --no-cache-dir -r requirements.txt
CMD ["python3", "pg8000_example.py"]

View File

@@ -0,0 +1,23 @@
#! /usr/bin/env python3
import os
import ssl
import pg8000.dbapi
if __name__ == "__main__":
kwargs = {
k.lstrip("NEON_").lower(): v
for k in ("NEON_HOST", "NEON_DATABASE", "NEON_USER", "NEON_PASSWORD")
if (v := os.environ.get(k, None)) is not None
}
conn = pg8000.dbapi.connect(
**kwargs,
ssl_context=True,
)
cursor = conn.cursor()
cursor.execute("SELECT 1")
row = cursor.fetchone()
print(row[0])
conn.close()

View File

@@ -0,0 +1 @@
pg8000==1.29.1

View File

@@ -0,0 +1 @@
.build/

View File

@@ -0,0 +1 @@
.build/

View File

@@ -0,0 +1,11 @@
FROM swift:5.6 AS build
RUN apt-get -q update && apt-get -q install -y libssl-dev
WORKDIR /source
COPY . .
RUN swift build --configuration release
FROM swift:5.6
WORKDIR /app
COPY --from=build /source/.build/release/release .
CMD ["/app/PostgresClientKitExample"]

View File

@@ -0,0 +1,41 @@
{
"pins" : [
{
"identity" : "bluesocket",
"kind" : "remoteSourceControl",
"location" : "https://github.com/IBM-Swift/BlueSocket.git",
"state" : {
"revision" : "dd924c3bc2c1c144c42b8dda3896f1a03115ded4",
"version" : "2.0.2"
}
},
{
"identity" : "bluesslservice",
"kind" : "remoteSourceControl",
"location" : "https://github.com/IBM-Swift/BlueSSLService",
"state" : {
"revision" : "c249988fb748749739144e7f554710552acdc0bd",
"version" : "2.0.1"
}
},
{
"identity" : "postgresclientkit",
"kind" : "remoteSourceControl",
"location" : "https://github.com/codewinsdotcom/PostgresClientKit.git",
"state" : {
"branch" : "v1.4.3",
"revision" : "beafedaea6dc9f04712e9a8547b77f47c406a47e"
}
},
{
"identity" : "swift-argument-parser",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-argument-parser",
"state" : {
"revision" : "6b2aa2748a7881eebb9f84fb10c01293e15b52ca",
"version" : "0.5.0"
}
}
],
"version" : 2
}

View File

@@ -0,0 +1,17 @@
// swift-tools-version:5.6
import PackageDescription
let package = Package(
name: "PostgresClientKitExample",
dependencies: [
.package(
url: "https://github.com/codewinsdotcom/PostgresClientKit.git",
revision: "v1.4.3"
)
],
targets: [
.target(
name: "PostgresClientKitExample",
dependencies: [ "PostgresClientKit" ])
]
)

View File

@@ -0,0 +1,38 @@
import Foundation
import PostgresClientKit
do {
var configuration = PostgresClientKit.ConnectionConfiguration()
let env = ProcessInfo.processInfo.environment
if let host = env["NEON_HOST"] {
configuration.host = host
}
if let database = env["NEON_DATABASE"] {
configuration.database = database
}
if let user = env["NEON_USER"] {
configuration.user = user
}
if let password = env["NEON_PASSWORD"] {
configuration.credential = .scramSHA256(password: password)
}
let connection = try PostgresClientKit.Connection(configuration: configuration)
defer { connection.close() }
let text = "SELECT 1;"
let statement = try connection.prepareStatement(text: text)
defer { statement.close() }
let cursor = try statement.execute(parameterValues: [ ])
defer { cursor.close() }
for row in cursor {
let columns = try row.get().columns
print(columns[0])
}
} catch {
print(error)
}

View File

@@ -0,0 +1,54 @@
import os
import shutil
import subprocess
from pathlib import Path
from tempfile import NamedTemporaryFile
from urllib.parse import urlparse
import pytest
from fixtures.neon_fixtures import RemotePostgres
@pytest.mark.remote_cluster
@pytest.mark.parametrize(
"client",
[
"csharp/npgsql",
"java/jdbc",
"python/asyncpg",
pytest.param(
"python/pg8000", # See https://github.com/neondatabase/neon/pull/2008#discussion_r912264281
marks=pytest.mark.xfail(reason="Handles SSL in incompatible with Neon way")),
pytest.param(
"swift/PostgresClientKit", # See https://github.com/neondatabase/neon/pull/2008#discussion_r911896592
marks=pytest.mark.xfail(reason="Neither SNI nor parameters is supported")),
"typescript/postgresql-client",
],
)
def test_pg_clients(remote_pg: RemotePostgres, client: str):
conn_options = remote_pg.conn_options()
env_file = None
with NamedTemporaryFile(mode="w", delete=False) as f:
env_file = f.name
f.write(f"""
NEON_HOST={conn_options["host"]}
NEON_DATABASE={conn_options["dbname"]}
NEON_USER={conn_options["user"]}
NEON_PASSWORD={conn_options["password"]}
""")
image_tag = client.lower()
docker_bin = shutil.which("docker")
if docker_bin is None:
raise RuntimeError("docker is required for running this test")
build_cmd = [
docker_bin, "build", "--quiet", "--tag", image_tag, f"{Path(__file__).parent / client}"
]
run_cmd = [docker_bin, "run", "--rm", "--env-file", env_file, image_tag]
subprocess.run(build_cmd, check=True)
result = subprocess.run(run_cmd, check=True, capture_output=True, text=True)
assert result.stdout.strip() == "1"

View File

@@ -0,0 +1 @@
node_modules/

View File

@@ -0,0 +1 @@
node_modules/

View File

@@ -0,0 +1,7 @@
FROM node:16
WORKDIR /source
COPY . .
RUN npm clean-install
CMD ["/source/index.js"]

View File

@@ -0,0 +1,25 @@
#! /usr/bin/env node
import {Connection} from 'postgresql-client';
const params = {
"host": process.env.NEON_HOST,
"database": process.env.NEON_DATABASE,
"user": process.env.NEON_USER,
"password": process.env.NEON_PASSWORD,
"ssl": true,
}
for (const key in params) {
if (params[key] === undefined) {
delete params[key];
}
}
const connection = new Connection(params);
await connection.connect();
const result = await connection.query(
'select 1'
);
const rows = result.rows;
await connection.close();
console.log(rows[0][0]);

View File

@@ -0,0 +1,262 @@
{
"name": "typescript",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"dependencies": {
"postgresql-client": "^2.1.3"
}
},
"node_modules/debug": {
"version": "4.3.4",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz",
"integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==",
"dependencies": {
"ms": "2.1.2"
},
"engines": {
"node": ">=6.0"
},
"peerDependenciesMeta": {
"supports-color": {
"optional": true
}
}
},
"node_modules/doublylinked": {
"version": "2.5.1",
"resolved": "https://registry.npmjs.org/doublylinked/-/doublylinked-2.5.1.tgz",
"integrity": "sha512-Lpqb+qyHpR5Bew8xfKsxVYdjXEYAQ7HLp1IX47kHKmVCZeXErInytonjkL+kE+L4yaKSYEmDNR9MJYr5zwuAKA==",
"engines": {
"node": ">= 10.0"
}
},
"node_modules/lightning-pool": {
"version": "3.1.3",
"resolved": "https://registry.npmjs.org/lightning-pool/-/lightning-pool-3.1.3.tgz",
"integrity": "sha512-OgWuoh0BBrikWx/mc/XwIKwC9HHTe/GU3XODLMBPibv7jv8u0o2gQFS7KVEg5U8Oufg6N7mkm8Y1RoiLER0zeQ==",
"dependencies": {
"doublylinked": "^2.4.3",
"putil-promisify": "^1.8.2"
},
"engines": {
"node": ">= 10.0"
}
},
"node_modules/ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
},
"node_modules/obuf": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/obuf/-/obuf-1.1.2.tgz",
"integrity": "sha512-PX1wu0AmAdPqOL1mWhqmlOd8kOIZQwGZw6rh7uby9fTc5lhaOWFLX3I6R1hrF9k3zUY40e6igsLGkDXK92LJNg=="
},
"node_modules/postgres-bytea": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-3.0.0.tgz",
"integrity": "sha512-CNd4jim9RFPkObHSjVHlVrxoVQXz7quwNFpz7RY1okNNme49+sVyiTvTRobiLV548Hx/hb1BG+iE7h9493WzFw==",
"dependencies": {
"obuf": "~1.1.2"
},
"engines": {
"node": ">= 6"
}
},
"node_modules/postgresql-client": {
"version": "2.1.3",
"resolved": "https://registry.npmjs.org/postgresql-client/-/postgresql-client-2.1.3.tgz",
"integrity": "sha512-36Ga6JzhydsRzcCRcA/Y2hrX9C9sI0wS6sgRNBlOGkOwACXQVybmhDM7mAUbi9cT00N39Ee7btR0eMCyD//5Xg==",
"dependencies": {
"debug": "^4.3.4",
"doublylinked": "^2.5.1",
"lightning-pool": "^3.1.3",
"postgres-bytea": "^3.0.0",
"power-tasks": "^0.8.0",
"putil-merge": "^3.8.0",
"putil-promisify": "^1.8.5",
"putil-varhelpers": "^1.6.4"
},
"engines": {
"node": ">=14.0",
"npm": ">=7.0.0"
}
},
"node_modules/power-tasks": {
"version": "0.8.0",
"resolved": "https://registry.npmjs.org/power-tasks/-/power-tasks-0.8.0.tgz",
"integrity": "sha512-HhMcx+y5UkzlEmKslruz8uAU2Yq8CODJsFEMFsYMrGp5EzKpkNHGu0RNvBqyewKJDZHPNKtBSULsEAxMqQIBVQ==",
"dependencies": {
"debug": "^4.3.4",
"doublylinked": "^2.5.1",
"strict-typed-events": "^2.2.0"
},
"engines": {
"node": ">=14.0",
"npm": ">=7.0.0"
}
},
"node_modules/putil-merge": {
"version": "3.8.0",
"resolved": "https://registry.npmjs.org/putil-merge/-/putil-merge-3.8.0.tgz",
"integrity": "sha512-5tXPafJawWFoYZWLhkYXZ7IC/qkI45HgJsgv36lJBeq3qjFZfUITZE01CmWUBIlIn9f1yDiikqgYERARhVmgrg==",
"engines": {
"node": ">= 10.0"
}
},
"node_modules/putil-promisify": {
"version": "1.8.5",
"resolved": "https://registry.npmjs.org/putil-promisify/-/putil-promisify-1.8.5.tgz",
"integrity": "sha512-DItclasWWZokvpq3Aiaq0iV7WC8isP/0o/8mhC0yV6CQ781N/7NQHA1VyOm6hfpeFEwIQoo1C4Yjc5eH0q6Jbw==",
"engines": {
"node": ">= 6.0"
}
},
"node_modules/putil-varhelpers": {
"version": "1.6.4",
"resolved": "https://registry.npmjs.org/putil-varhelpers/-/putil-varhelpers-1.6.4.tgz",
"integrity": "sha512-nM2nO1HS2yJUyPgz0grd2XZAM0Spr6/tt6F4xXeNDjByV00BV2mq6lZ+sDff8WIfQBI9Hn1Czh93H1xBvKESxw==",
"engines": {
"node": ">= 6.0"
}
},
"node_modules/strict-typed-events": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/strict-typed-events/-/strict-typed-events-2.2.0.tgz",
"integrity": "sha512-yvHRtEfRRV7TJWi9cLhMt4Mb12JtAwXXONltUlLCA3fRB0LRy94B4E4e2gIlXzT5nZHTZVpOjJNOshri3LZ5bw==",
"dependencies": {
"putil-promisify": "^1.8.5",
"ts-gems": "^2.0.0"
},
"engines": {
"node": ">=14.0"
}
},
"node_modules/ts-gems": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/ts-gems/-/ts-gems-2.1.0.tgz",
"integrity": "sha512-5IqiG4nq1tsOhYPc4CwxA6bsE+TfU6uAABzf6bH4sdElgXpt/mlStvIYedvvtU7BM1+RRJxCaTLaaVFcCqNaiA==",
"peerDependencies": {
"typescript": ">=4.0.0"
}
},
"node_modules/typescript": {
"version": "4.7.4",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-4.7.4.tgz",
"integrity": "sha512-C0WQT0gezHuw6AdY1M2jxUO83Rjf0HP7Sk1DtXj6j1EwkQNZrHAg2XPWlq62oqEhYvONq5pkC2Y9oPljWToLmQ==",
"peer": true,
"bin": {
"tsc": "bin/tsc",
"tsserver": "bin/tsserver"
},
"engines": {
"node": ">=4.2.0"
}
}
},
"dependencies": {
"debug": {
"version": "4.3.4",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz",
"integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==",
"requires": {
"ms": "2.1.2"
}
},
"doublylinked": {
"version": "2.5.1",
"resolved": "https://registry.npmjs.org/doublylinked/-/doublylinked-2.5.1.tgz",
"integrity": "sha512-Lpqb+qyHpR5Bew8xfKsxVYdjXEYAQ7HLp1IX47kHKmVCZeXErInytonjkL+kE+L4yaKSYEmDNR9MJYr5zwuAKA=="
},
"lightning-pool": {
"version": "3.1.3",
"resolved": "https://registry.npmjs.org/lightning-pool/-/lightning-pool-3.1.3.tgz",
"integrity": "sha512-OgWuoh0BBrikWx/mc/XwIKwC9HHTe/GU3XODLMBPibv7jv8u0o2gQFS7KVEg5U8Oufg6N7mkm8Y1RoiLER0zeQ==",
"requires": {
"doublylinked": "^2.4.3",
"putil-promisify": "^1.8.2"
}
},
"ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
},
"obuf": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/obuf/-/obuf-1.1.2.tgz",
"integrity": "sha512-PX1wu0AmAdPqOL1mWhqmlOd8kOIZQwGZw6rh7uby9fTc5lhaOWFLX3I6R1hrF9k3zUY40e6igsLGkDXK92LJNg=="
},
"postgres-bytea": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-3.0.0.tgz",
"integrity": "sha512-CNd4jim9RFPkObHSjVHlVrxoVQXz7quwNFpz7RY1okNNme49+sVyiTvTRobiLV548Hx/hb1BG+iE7h9493WzFw==",
"requires": {
"obuf": "~1.1.2"
}
},
"postgresql-client": {
"version": "2.1.3",
"resolved": "https://registry.npmjs.org/postgresql-client/-/postgresql-client-2.1.3.tgz",
"integrity": "sha512-36Ga6JzhydsRzcCRcA/Y2hrX9C9sI0wS6sgRNBlOGkOwACXQVybmhDM7mAUbi9cT00N39Ee7btR0eMCyD//5Xg==",
"requires": {
"debug": "^4.3.4",
"doublylinked": "^2.5.1",
"lightning-pool": "^3.1.3",
"postgres-bytea": "^3.0.0",
"power-tasks": "^0.8.0",
"putil-merge": "^3.8.0",
"putil-promisify": "^1.8.5",
"putil-varhelpers": "^1.6.4"
}
},
"power-tasks": {
"version": "0.8.0",
"resolved": "https://registry.npmjs.org/power-tasks/-/power-tasks-0.8.0.tgz",
"integrity": "sha512-HhMcx+y5UkzlEmKslruz8uAU2Yq8CODJsFEMFsYMrGp5EzKpkNHGu0RNvBqyewKJDZHPNKtBSULsEAxMqQIBVQ==",
"requires": {
"debug": "^4.3.4",
"doublylinked": "^2.5.1",
"strict-typed-events": "^2.2.0"
}
},
"putil-merge": {
"version": "3.8.0",
"resolved": "https://registry.npmjs.org/putil-merge/-/putil-merge-3.8.0.tgz",
"integrity": "sha512-5tXPafJawWFoYZWLhkYXZ7IC/qkI45HgJsgv36lJBeq3qjFZfUITZE01CmWUBIlIn9f1yDiikqgYERARhVmgrg=="
},
"putil-promisify": {
"version": "1.8.5",
"resolved": "https://registry.npmjs.org/putil-promisify/-/putil-promisify-1.8.5.tgz",
"integrity": "sha512-DItclasWWZokvpq3Aiaq0iV7WC8isP/0o/8mhC0yV6CQ781N/7NQHA1VyOm6hfpeFEwIQoo1C4Yjc5eH0q6Jbw=="
},
"putil-varhelpers": {
"version": "1.6.4",
"resolved": "https://registry.npmjs.org/putil-varhelpers/-/putil-varhelpers-1.6.4.tgz",
"integrity": "sha512-nM2nO1HS2yJUyPgz0grd2XZAM0Spr6/tt6F4xXeNDjByV00BV2mq6lZ+sDff8WIfQBI9Hn1Czh93H1xBvKESxw=="
},
"strict-typed-events": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/strict-typed-events/-/strict-typed-events-2.2.0.tgz",
"integrity": "sha512-yvHRtEfRRV7TJWi9cLhMt4Mb12JtAwXXONltUlLCA3fRB0LRy94B4E4e2gIlXzT5nZHTZVpOjJNOshri3LZ5bw==",
"requires": {
"putil-promisify": "^1.8.5",
"ts-gems": "^2.0.0"
}
},
"ts-gems": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/ts-gems/-/ts-gems-2.1.0.tgz",
"integrity": "sha512-5IqiG4nq1tsOhYPc4CwxA6bsE+TfU6uAABzf6bH4sdElgXpt/mlStvIYedvvtU7BM1+RRJxCaTLaaVFcCqNaiA==",
"requires": {}
},
"typescript": {
"version": "4.7.4",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-4.7.4.tgz",
"integrity": "sha512-C0WQT0gezHuw6AdY1M2jxUO83Rjf0HP7Sk1DtXj6j1EwkQNZrHAg2XPWlq62oqEhYvONq5pkC2Y9oPljWToLmQ==",
"peer": true
}
}
}

View File

@@ -0,0 +1,6 @@
{
"type": "module",
"dependencies": {
"postgresql-client": "^2.1.3"
}
}