Compare commits

..

14 Commits

Author SHA1 Message Date
Vadim Kharitonov
8f49af45a7 Try to fix test 2023-05-24 15:39:39 +02:00
Vadim Kharitonov
c7872d3f6d Make clippy happy 2023-05-24 14:18:32 +02:00
Vadim Kharitonov
c82b00110e Fix SQL over HTTP endpoint 2023-05-24 14:05:03 +02:00
Vadim Kharitonov
d425f2cb14 Fix ruff after rebase 2023-05-24 13:56:57 +02:00
Dmitry Ivanov
2b0066f67c Minor edits 2023-05-24 12:54:27 +02:00
Dmitry Ivanov
01be823084 Tune console notifications handler 2023-05-24 12:54:27 +02:00
Dmitry Ivanov
69bb1eee18 Update workspace_hack 2023-05-24 12:54:27 +02:00
Dmitry Ivanov
0c040aca63 Fix mypy warnings 2023-05-24 12:54:25 +02:00
Dmitry Ivanov
d6c7b4d994 Fix formatting 2023-05-24 12:53:23 +02:00
Dmitry Ivanov
251e410add Implement console notifications listener 2023-05-24 12:51:04 +02:00
Dmitry Ivanov
ce416c8160 Implement a cache for get_auth_info method 2023-05-24 12:51:04 +02:00
Dmitry Ivanov
f7e9ec49be Add stub for auth info cache 2023-05-24 12:51:04 +02:00
Dmitry Ivanov
2b60ad0285 Test compute node cache invalidation 2023-05-24 12:51:00 +02:00
Dmitry Ivanov
9b99d4caa9 New cache impl 2023-05-24 12:45:57 +02:00
81 changed files with 2203 additions and 3333 deletions

View File

@@ -57,14 +57,14 @@ runs:
if ! which allure; then
ALLURE_ZIP=allure-${ALLURE_VERSION}.zip
wget -q https://github.com/allure-framework/allure2/releases/download/${ALLURE_VERSION}/${ALLURE_ZIP}
echo "${ALLURE_ZIP_SHA256} ${ALLURE_ZIP}" | sha256sum --check
echo "${ALLURE_ZIP_MD5} ${ALLURE_ZIP}" | md5sum -c
unzip -q ${ALLURE_ZIP}
echo "$(pwd)/allure-${ALLURE_VERSION}/bin" >> $GITHUB_PATH
rm -f ${ALLURE_ZIP}
fi
env:
ALLURE_VERSION: 2.22.1
ALLURE_ZIP_SHA256: fdc7a62d94b14c5e0bf25198ae1feded6b005fdbed864b4d3cb4e5e901720b0b
ALLURE_VERSION: 2.22.0
ALLURE_ZIP_MD5: d5c9f0989b896482536956340a7d5ec9
# Potentially we could have several running build for the same key (for example, for the main branch), so we use improvised lock for this
- name: Acquire lock

View File

@@ -36,6 +36,14 @@ inputs:
description: 'Region name for real s3 tests'
required: false
default: ''
real_s3_access_key_id:
description: 'Access key id'
required: false
default: ''
real_s3_secret_access_key:
description: 'Secret access key'
required: false
default: ''
rerun_flaky:
description: 'Whether to rerun flaky tests'
required: false
@@ -96,6 +104,8 @@ runs:
COMPATIBILITY_POSTGRES_DISTRIB_DIR: /tmp/neon-previous/pg_install
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: ${{ inputs.build_type }}
AWS_ACCESS_KEY_ID: ${{ inputs.real_s3_access_key_id }}
AWS_SECRET_ACCESS_KEY: ${{ inputs.real_s3_secret_access_key }}
COMPATIBILITY_SNAPSHOT_DIR: /tmp/compatibility_snapshot_pg${{ inputs.pg_version }}
ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'backward compatibility breakage')
ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage')

View File

@@ -5,7 +5,6 @@ on:
branches:
- main
- release
- bayandin/code-coverage
pull_request:
defaults:
@@ -347,8 +346,10 @@ jobs:
test_selection: regress
needs_postgres_source: true
run_with_real_s3: true
real_s3_bucket: neon-github-ci-tests
real_s3_region: eu-central-1
real_s3_bucket: ci-tests-s3
real_s3_region: us-west-2
real_s3_access_key_id: "${{ secrets.AWS_ACCESS_KEY_ID_CI_TESTS_S3 }}"
real_s3_secret_access_key: "${{ secrets.AWS_SECRET_ACCESS_KEY_CI_TESTS_S3 }}"
rerun_flaky: true
pg_version: ${{ matrix.pg_version }}
env:
@@ -495,29 +496,19 @@ jobs:
env:
COMMIT_URL: ${{ github.server_url }}/${{ github.repository }}/commit/${{ github.event.pull_request.head.sha || github.sha }}
run: |
scripts/coverage --dir=/tmp/coverage \
report \
scripts/coverage \
--dir=/tmp/coverage report \
--input-objects=/tmp/coverage/binaries.list \
--commit-url=${COMMIT_URL} \
--format=github
scripts/coverage --dir=/tmp/coverage \
report \
--input-objects=/tmp/coverage/binaries.list \
--format=lcov
scripts/coverage --dir=/tmp/coverage \
report \
--input-objects=/tmp/coverage/binaries.list \
--format=json
- name: Upload coverage report
id: upload-coverage-report
env:
BUCKET: neon-github-public-dev
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
run: |
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://${BUCKET}/code-coverage/${COMMIT_SHA}
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://neon-github-public-dev/code-coverage/${COMMIT_SHA}
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/code-coverage/${COMMIT_SHA}/index.html
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
@@ -672,9 +663,6 @@ jobs:
project: nrdv0s4kcs
push: true
tags: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:depot-${{needs.tag.outputs.build-tag}}
build-args: |
GIT_VERSION=${{ github.sha }}
REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
compute-tools-image:
runs-on: [ self-hosted, gen3, large ]

697
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,6 @@ members = [
"compute_tools",
"control_plane",
"pageserver",
"pageserver/ctl",
"proxy",
"safekeeper",
"storage_broker",
@@ -23,7 +22,7 @@ async-stream = "0.3"
async-trait = "0.1"
atty = "0.2.14"
aws-config = { version = "0.55", default-features = false, features=["rustls"] }
aws-sdk-s3 = "0.27"
aws-sdk-s3 = "0.25"
aws-smithy-http = "0.55"
aws-credential-types = "0.55"
aws-types = "0.55"
@@ -78,10 +77,12 @@ pin-project-lite = "0.2"
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
prost = "0.11"
rand = "0.8"
ref-cast = "1.0"
redis = { version = "0.23.0", features = ["tokio-rustls-comp"] }
regex = "1.4"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_18"] }
reqwest-middleware = "0.2.0"
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_18"] }
routerify = "3"
rpds = "0.13"
rustls = "0.20"

View File

@@ -47,7 +47,8 @@ RUN set -e \
&& mold -run cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \
--bin pageserver_binutils \
--bin draw_timeline_dir \
--bin safekeeper \
--bin storage_broker \
--bin proxy \
@@ -72,7 +73,8 @@ RUN set -e \
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pg_sni_router /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pagectl /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/draw_timeline_dir /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin

View File

@@ -632,7 +632,6 @@ RUN apt update && \
libxml2 \
libxslt1.1 \
libzstd1 \
libcurl4-openssl-dev \
procps && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8

View File

@@ -362,8 +362,6 @@ impl ComputeNode {
};
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
client.simple_query("SET neon.forward_ddl = false")?;
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
handle_roles(spec, &mut client)?;
handle_databases(spec, &mut client)?;
@@ -405,9 +403,7 @@ impl ComputeNode {
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
if spec.mode == ComputeMode::Primary {
client.simple_query("SET neon.forward_ddl = false")?;
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;

View File

@@ -121,8 +121,9 @@ impl RoleExt for Role {
/// string of arguments.
fn to_pg_options(&self) -> String {
// XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane.
let mut params: String = self.options.as_pg_options();
params.push_str(" LOGIN");
// For now, we do not use generic `options` for roles. Once used, add
// `self.options.as_pg_options()` somewhere here.
let mut params: String = "LOGIN".to_string();
if let Some(pass) = &self.encrypted_password {
// Some time ago we supported only md5 and treated all encrypted_password as md5.

View File

@@ -62,7 +62,7 @@ fn do_control_plane_request(
}
}
/// Request spec from the control-plane by compute_id. If `NEON_CONTROL_PLANE_TOKEN`
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
/// env variable is set, it will be used for authorization.
pub fn get_spec_from_control_plane(
base_uri: &str,

View File

@@ -16,7 +16,7 @@ mod pg_helpers_tests {
);
assert_eq!(
spec.cluster.roles.first().unwrap().to_pg_options(),
" LOGIN PASSWORD 'md56b1d16b78004bbd51fa06af9eda75972'"
"LOGIN PASSWORD 'md56b1d16b78004bbd51fa06af9eda75972'"
);
}

View File

@@ -41,7 +41,7 @@ const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
const DEFAULT_BRANCH_NAME: &str = "main";
project_git_version!(GIT_VERSION);
const DEFAULT_PG_VERSION: &str = "15";
const DEFAULT_PG_VERSION: &str = "14";
fn default_conf() -> String {
format!(

View File

@@ -24,7 +24,7 @@ use utils::{
use crate::safekeeper::SafekeeperNode;
pub const DEFAULT_PG_VERSION: u32 = 15;
pub const DEFAULT_PG_VERSION: u32 = 14;
//
// This data structures represents neon_local CLI config

View File

@@ -234,28 +234,6 @@ impl TenantConfigRequest {
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TenantAttachRequest {
pub config: TenantAttachConfig,
}
/// Newtype to enforce deny_unknown_fields on TenantConfig for
/// its usage inside `TenantAttachRequest`.
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct TenantAttachConfig {
#[serde(flatten)]
allowing_unknown_fields: TenantConfig,
}
impl std::ops::Deref for TenantAttachConfig {
type Target = TenantConfig;
fn deref(&self) -> &Self::Target {
&self.allowing_unknown_fields
}
}
/// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
#[derive(Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
@@ -818,17 +796,5 @@ mod tests {
"expect unknown field `unknown_field` error, got: {}",
err
);
let attach_request = json!({
"config": {
"unknown_field": "unknown_value".to_string(),
},
});
let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
assert!(
err.to_string().contains("unknown field `unknown_field`"),
"expect unknown field `unknown_field` error, got: {}",
err
);
}
}

View File

@@ -8,26 +8,12 @@ use super::error::ApiError;
pub async fn json_request<T: for<'de> Deserialize<'de>>(
request: &mut Request<Body>,
) -> Result<T, ApiError> {
json_request_or_empty_body(request)
.await?
.context("missing request body")
.map_err(ApiError::BadRequest)
}
/// Will be removed as part of https://github.com/neondatabase/neon/issues/4282
pub async fn json_request_or_empty_body<T: for<'de> Deserialize<'de>>(
request: &mut Request<Body>,
) -> Result<Option<T>, ApiError> {
let body = hyper::body::aggregate(request.body_mut())
let whole_body = hyper::body::aggregate(request.body_mut())
.await
.context("Failed to read request body")
.map_err(ApiError::BadRequest)?;
if body.remaining() == 0 {
return Ok(None);
}
serde_json::from_reader(body.reader())
serde_json::from_reader(whole_body.reader())
.context("Failed to parse json request")
.map(Some)
.map_err(ApiError::BadRequest)
}

View File

@@ -1,18 +0,0 @@
[package]
name = "pagectl"
version = "0.1.0"
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow.workspace = true
bytes.workspace = true
clap = { workspace = true, features = ["string"] }
git-version.workspace = true
pageserver = { path = ".." }
postgres_ffi.workspace = true
utils.workspace = true
svg_fmt.workspace = true
workspace_hack.workspace = true

View File

@@ -1,169 +0,0 @@
use std::path::{Path, PathBuf};
use anyhow::Result;
use clap::Subcommand;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::{page_cache, virtual_file};
use pageserver::{
repository::{Key, KEY_SIZE},
tenant::{
block_io::FileBlockReader, disk_btree::VisitDirection,
storage_layer::delta_layer::DELTA_KEY_SIZE,
},
virtual_file::VirtualFile,
};
use std::fs;
use utils::bin_ser::BeSer;
use crate::layer_map_analyzer::parse_filename;
#[derive(Subcommand)]
pub(crate) enum LayerCmd {
/// List all tenants and timelines under the pageserver path
///
/// Example: `cargo run --bin pagectl layer list .neon/`
List { path: PathBuf },
/// List all layers of a given tenant and timeline
///
/// Example: `cargo run --bin pagectl layer list .neon/`
ListLayer {
path: PathBuf,
tenant: String,
timeline: String,
},
/// Dump all information of a layer file
DumpLayer {
path: PathBuf,
tenant: String,
timeline: String,
/// The id from list-layer command
id: usize,
},
}
fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
use pageserver::tenant::blob_io::BlobCursor;
use pageserver::tenant::block_io::BlockReader;
let path = path.as_ref();
virtual_file::init(10);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path)?);
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
actual_summary.index_start_blk,
actual_summary.index_root_blk,
&file,
);
// TODO(chi): dedup w/ `delta_layer.rs` by exposing the API.
let mut all = vec![];
tree_reader.visit(
&[0u8; DELTA_KEY_SIZE],
VisitDirection::Forwards,
|key, value_offset| {
let curr = Key::from_slice(&key[..KEY_SIZE]);
all.push((curr, BlobRef(value_offset)));
true
},
)?;
let mut cursor = BlockCursor::new(&file);
for (k, v) in all {
let value = cursor.read_blob(v.pos())?;
println!("key:{} value_len:{}", k, value.len());
}
// TODO(chi): special handling for last key?
Ok(())
}
pub(crate) fn main(cmd: &LayerCmd) -> Result<()> {
match cmd {
LayerCmd::List { path } => {
for tenant in fs::read_dir(path.join("tenants"))? {
let tenant = tenant?;
if !tenant.file_type()?.is_dir() {
continue;
}
println!("tenant {}", tenant.file_name().to_string_lossy());
for timeline in fs::read_dir(tenant.path().join("timelines"))? {
let timeline = timeline?;
if !timeline.file_type()?.is_dir() {
continue;
}
println!("- timeline {}", timeline.file_name().to_string_lossy());
}
}
}
LayerCmd::ListLayer {
path,
tenant,
timeline,
} => {
let timeline_path = path
.join("tenants")
.join(tenant)
.join("timelines")
.join(timeline);
let mut idx = 0;
for layer in fs::read_dir(timeline_path)? {
let layer = layer?;
if let Some(layer_file) = parse_filename(&layer.file_name().into_string().unwrap())
{
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
idx += 1;
}
}
}
LayerCmd::DumpLayer {
path,
tenant,
timeline,
id,
} => {
let timeline_path = path
.join("tenants")
.join(tenant)
.join("timelines")
.join(timeline);
let mut idx = 0;
for layer in fs::read_dir(timeline_path)? {
let layer = layer?;
if let Some(layer_file) = parse_filename(&layer.file_name().into_string().unwrap())
{
if *id == idx {
// TODO(chi): dedup code
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
if layer_file.is_delta {
read_delta_file(layer.path())?;
} else {
anyhow::bail!("not supported yet :(");
}
break;
}
idx += 1;
}
}
}
}
Ok(())
}

View File

@@ -1,179 +0,0 @@
//! A helper tool to manage pageserver binary files.
//! Accepts a file as an argument, attempts to parse it with all ways possible
//! and prints its interpreted context.
//!
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
mod draw_timeline_dir;
mod layer_map_analyzer;
mod layers;
use clap::{Parser, Subcommand};
use layers::LayerCmd;
use pageserver::{
context::{DownloadBehavior, RequestContext},
page_cache,
task_mgr::TaskKind,
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
use postgres_ffi::ControlFileData;
use std::path::{Path, PathBuf};
use utils::{lsn::Lsn, project_git_version};
project_git_version!(GIT_VERSION);
#[derive(Parser)]
#[command(
version = GIT_VERSION,
about = "Neon Pageserver binutils",
long_about = "Reads pageserver (and related) binary files management utility"
)]
#[command(propagate_version = true)]
struct CliOpts {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Metadata(MetadataCmd),
PrintLayerFile(PrintLayerFileCmd),
DrawTimeline {},
AnalyzeLayerMap(AnalyzeLayerMapCmd),
#[command(subcommand)]
Layer(LayerCmd),
}
/// Read and update pageserver metadata file
#[derive(Parser)]
struct MetadataCmd {
/// Input metadata file path
metadata_path: PathBuf,
/// Replace disk consistent Lsn
disk_consistent_lsn: Option<Lsn>,
/// Replace previous record Lsn
prev_record_lsn: Option<Lsn>,
/// Replace latest gc cuttoff
latest_gc_cuttoff: Option<Lsn>,
}
#[derive(Parser)]
struct PrintLayerFileCmd {
/// Pageserver data path
path: PathBuf,
}
#[derive(Parser)]
struct AnalyzeLayerMapCmd {
/// Pageserver data path
path: PathBuf,
/// Max holes
max_holes: Option<usize>,
}
fn main() -> anyhow::Result<()> {
let cli = CliOpts::parse();
match cli.command {
Commands::Layer(cmd) => {
layers::main(&cmd)?;
}
Commands::Metadata(cmd) => {
handle_metadata(&cmd)?;
}
Commands::DrawTimeline {} => {
draw_timeline_dir::main()?;
}
Commands::AnalyzeLayerMap(cmd) => {
layer_map_analyzer::main(&cmd)?;
}
Commands::PrintLayerFile(cmd) => {
if let Err(e) = read_pg_control_file(&cmd.path) {
println!(
"Failed to read input file as a pg control one: {e:#}\n\
Attempting to read it as layer file"
);
print_layerfile(&cmd.path)?;
}
}
};
Ok(())
}
fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
let control_file = ControlFileData::decode(&std::fs::read(control_file_path)?)?;
println!("{control_file:?}");
let control_file_initdb = Lsn(control_file.checkPoint);
println!(
"pg_initdb_lsn: {}, aligned: {}",
control_file_initdb,
control_file_initdb.align()
);
Ok(())
}
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx)
}
fn handle_metadata(
MetadataCmd {
metadata_path: path,
disk_consistent_lsn,
prev_record_lsn,
latest_gc_cuttoff,
}: &MetadataCmd,
) -> Result<(), anyhow::Error> {
let metadata_bytes = std::fs::read(path)?;
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
println!("Current metadata:\n{meta:?}");
let mut update_meta = false;
if let Some(disk_consistent_lsn) = disk_consistent_lsn {
meta = TimelineMetadata::new(
*disk_consistent_lsn,
meta.prev_record_lsn(),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if let Some(prev_record_lsn) = prev_record_lsn {
meta = TimelineMetadata::new(
meta.disk_consistent_lsn(),
Some(*prev_record_lsn),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if let Some(latest_gc_cuttoff) = latest_gc_cuttoff {
meta = TimelineMetadata::new(
meta.disk_consistent_lsn(),
meta.prev_record_lsn(),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
*latest_gc_cuttoff,
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if update_meta {
let metadata_bytes = meta.to_bytes()?;
std::fs::write(path, metadata_bytes)?;
}
Ok(())
}

View File

@@ -12,7 +12,7 @@
//! Example use:
//! ```
//! $ ls test_output/test_pgbench\[neon-45-684\]/repo/tenants/$TENANT/timelines/$TIMELINE | \
//! $ grep "__" | cargo run --release --bin pagectl draw-timeline-dir > out.svg
//! $ grep "__" | cargo run --release --bin draw_timeline_dir > out.svg
//! $ firefox out.svg
//! ```
//!
@@ -62,7 +62,7 @@ fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
(keys, lsns)
}
pub fn main() -> Result<()> {
fn main() -> Result<()> {
// Parse layer filenames from stdin
let mut ranges: Vec<(Range<Key>, Range<Lsn>)> = vec![];
let stdin = io::stdin();

View File

@@ -6,7 +6,7 @@ use anyhow::Result;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ops::Range;
use std::{fs, path::Path, str};
use std::{env, fs, path::Path, path::PathBuf, str, str::FromStr};
use pageserver::page_cache::PAGE_SZ;
use pageserver::repository::{Key, KEY_SIZE};
@@ -18,14 +18,12 @@ use pageserver::virtual_file::VirtualFile;
use utils::{bin_ser::BeSer, lsn::Lsn};
use crate::AnalyzeLayerMapCmd;
const MIN_HOLE_LENGTH: i128 = (128 * 1024 * 1024 / PAGE_SZ) as i128;
const DEFAULT_MAX_HOLES: usize = 10;
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
#[derive(PartialEq, Eq)]
pub struct Hole(Range<Key>);
struct Hole(Range<Key>);
impl Ord for Hole {
fn cmp(&self, other: &Self) -> Ordering {
@@ -41,11 +39,11 @@ impl PartialOrd for Hole {
}
}
pub(crate) struct LayerFile {
pub key_range: Range<Key>,
pub lsn_range: Range<Lsn>,
pub is_delta: bool,
pub holes: Vec<Hole>,
struct LayerFile {
key_range: Range<Key>,
lsn_range: Range<Lsn>,
is_delta: bool,
holes: Vec<Hole>,
}
impl LayerFile {
@@ -69,7 +67,7 @@ impl LayerFile {
}
}
pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
fn parse_filename(name: &str) -> Option<LayerFile> {
let split: Vec<&str> = name.split("__").collect();
if split.len() != 2 {
return None;
@@ -129,9 +127,18 @@ fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
Ok(holes)
}
pub(crate) fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let storage_path = &cmd.path;
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
println!("Usage: layer_map_analyzer PAGESERVER_DATA_DIR [MAX_HOLES]");
return Ok(());
}
let storage_path = PathBuf::from_str(&args[1])?;
let max_holes = if args.len() > 2 {
args[2].parse::<usize>().unwrap()
} else {
DEFAULT_MAX_HOLES
};
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10);

View File

@@ -9,7 +9,6 @@ use clap::{Arg, ArgAction, Command};
use fail::FailScenario;
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use remote_storage::GenericRemoteStorage;
use tracing::*;
@@ -19,7 +18,9 @@ use pageserver::{
context::{DownloadBehavior, RequestContext},
http, page_cache, page_service, task_mgr,
task_mgr::TaskKind,
task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME},
task_mgr::{
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
},
tenant::mgr,
virtual_file,
};
@@ -275,18 +276,7 @@ fn start_pageserver(
let pageserver_listener = tcp_listener::bind(pg_addr)?;
// Launch broker client
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
let broker_client = WALRECEIVER_RUNTIME
.block_on(async {
// Note: we do not attempt connecting here (but validate endpoints sanity).
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)
})
.with_context(|| {
format!(
"create broker client for uri={:?} keepalive_interval={:?}",
&conf.broker_endpoint, conf.broker_keepalive_interval,
)
})?;
WALRECEIVER_RUNTIME.block_on(pageserver::broker_client::init_broker_client(conf))?;
// Initialize authentication for incoming connections
let http_auth;
@@ -336,11 +326,7 @@ fn start_pageserver(
let remote_storage = create_remote_storage_client(conf)?;
// Scan the local 'tenants/' directory and start loading the tenants
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
broker_client.clone(),
remote_storage.clone(),
))?;
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(conf, remote_storage.clone()))?;
// shared state between the disk-usage backed eviction background task and the http endpoint
// that allows triggering disk-usage based eviction manually. note that the http endpoint
@@ -365,7 +351,6 @@ fn start_pageserver(
conf,
launch_ts,
http_auth,
broker_client.clone(),
remote_storage,
disk_usage_eviction_state,
)?
@@ -442,7 +427,6 @@ fn start_pageserver(
async move {
page_service::libpq_listener_main(
conf,
broker_client,
pg_auth,
pageserver_listener,
conf.pg_auth_type,

View File

@@ -0,0 +1,174 @@
//! A helper tool to manage pageserver binary files.
//! Accepts a file as an argument, attempts to parse it with all ways possible
//! and prints its interpreted context.
//!
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
use std::{
path::{Path, PathBuf},
str::FromStr,
};
use anyhow::Context;
use clap::{value_parser, Arg, Command};
use pageserver::{
context::{DownloadBehavior, RequestContext},
page_cache,
task_mgr::TaskKind,
tenant::{dump_layerfile_from_path, metadata::TimelineMetadata},
virtual_file,
};
use postgres_ffi::ControlFileData;
use utils::{lsn::Lsn, project_git_version};
project_git_version!(GIT_VERSION);
const METADATA_SUBCOMMAND: &str = "metadata";
fn main() -> anyhow::Result<()> {
let arg_matches = cli().get_matches();
match arg_matches.subcommand() {
Some((subcommand_name, subcommand_matches)) => {
let path = subcommand_matches
.get_one::<PathBuf>("metadata_path")
.context("'metadata_path' argument is missing")?
.to_path_buf();
anyhow::ensure!(
subcommand_name == METADATA_SUBCOMMAND,
"Unknown subcommand {subcommand_name}"
);
handle_metadata(&path, subcommand_matches)?;
}
None => {
let path = arg_matches
.get_one::<PathBuf>("path")
.context("'path' argument is missing")?
.to_path_buf();
println!(
"No subcommand specified, attempting to guess the format for file {}",
path.display()
);
if let Err(e) = read_pg_control_file(&path) {
println!(
"Failed to read input file as a pg control one: {e:#}\n\
Attempting to read it as layer file"
);
print_layerfile(&path)?;
}
}
};
Ok(())
}
fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
let control_file = ControlFileData::decode(&std::fs::read(control_file_path)?)?;
println!("{control_file:?}");
let control_file_initdb = Lsn(control_file.checkPoint);
println!(
"pg_initdb_lsn: {}, aligned: {}",
control_file_initdb,
control_file_initdb.align()
);
Ok(())
}
fn print_layerfile(path: &Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx)
}
fn handle_metadata(path: &Path, arg_matches: &clap::ArgMatches) -> Result<(), anyhow::Error> {
let metadata_bytes = std::fs::read(path)?;
let mut meta = TimelineMetadata::from_bytes(&metadata_bytes)?;
println!("Current metadata:\n{meta:?}");
let mut update_meta = false;
if let Some(disk_consistent_lsn) = arg_matches.get_one::<String>("disk_consistent_lsn") {
meta = TimelineMetadata::new(
Lsn::from_str(disk_consistent_lsn)?,
meta.prev_record_lsn(),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if let Some(prev_record_lsn) = arg_matches.get_one::<String>("prev_record_lsn") {
meta = TimelineMetadata::new(
meta.disk_consistent_lsn(),
Some(Lsn::from_str(prev_record_lsn)?),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if let Some(latest_gc_cuttoff) = arg_matches.get_one::<String>("latest_gc_cuttoff") {
meta = TimelineMetadata::new(
meta.disk_consistent_lsn(),
meta.prev_record_lsn(),
meta.ancestor_timeline(),
meta.ancestor_lsn(),
Lsn::from_str(latest_gc_cuttoff)?,
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
if update_meta {
let metadata_bytes = meta.to_bytes()?;
std::fs::write(path, metadata_bytes)?;
}
Ok(())
}
fn cli() -> Command {
Command::new("Neon Pageserver binutils")
.about("Reads pageserver (and related) binary files management utility")
.version(GIT_VERSION)
.arg(
Arg::new("path")
.help("Input file path")
.value_parser(value_parser!(PathBuf))
.required(false),
)
.subcommand(
Command::new(METADATA_SUBCOMMAND)
.about("Read and update pageserver metadata file")
.arg(
Arg::new("metadata_path")
.help("Input metadata file path")
.value_parser(value_parser!(PathBuf))
.required(false),
)
.arg(
Arg::new("disk_consistent_lsn")
.long("disk_consistent_lsn")
.help("Replace disk consistent Lsn"),
)
.arg(
Arg::new("prev_record_lsn")
.long("prev_record_lsn")
.help("Replace previous record Lsn"),
)
.arg(
Arg::new("latest_gc_cuttoff")
.long("latest_gc_cuttoff")
.help("Replace latest gc cuttoff"),
),
)
}
#[test]
fn verify_cli() {
cli().debug_assert();
}

View File

@@ -0,0 +1,48 @@
//! The broker client instance of the pageserver, created during pageserver startup.
//! Used by each timelines' [`walreceiver`].
use crate::config::PageServerConf;
use anyhow::Context;
use once_cell::sync::OnceCell;
use storage_broker::BrokerClientChannel;
use tracing::*;
static BROKER_CLIENT: OnceCell<BrokerClientChannel> = OnceCell::new();
///
/// Initialize the broker client. This must be called once at page server startup.
///
pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> {
let broker_endpoint = conf.broker_endpoint.clone();
// Note: we do not attempt connecting here (but validate endpoints sanity).
let broker_client =
storage_broker::connect(broker_endpoint.clone(), conf.broker_keepalive_interval).context(
format!(
"Failed to create broker client to {}",
&conf.broker_endpoint
),
)?;
if BROKER_CLIENT.set(broker_client).is_err() {
panic!("broker already initialized");
}
info!(
"Initialized broker client with endpoints: {}",
broker_endpoint
);
Ok(())
}
///
/// Get a handle to the broker client
///
pub fn get_broker_client() -> &'static BrokerClientChannel {
BROKER_CLIENT.get().expect("broker client not initialized")
}
pub fn is_broker_client_initialized() -> bool {
BROKER_CLIENT.get().is_some()
}

View File

@@ -363,29 +363,11 @@ paths:
* MUST NOT ASSUME that the request has been lost, based on the observation
that a subsequent tenant status request returns 404. The request may
still be in flight. It must be retried.
The client SHOULD supply a `TenantConfig` for the tenant in the request body.
Settings specified in the config override the pageserver's defaults.
It is guaranteed that the config settings are applied before the pageserver
starts operating on the tenant. E.g., if the config specifies a specific
PITR interval for a tenant, then that setting will be in effect before the
pageserver starts the garbage collection loop. This enables a client to
guarantee a specific PITR setting across detach/attach cycles.
The pageserver will reject the request if it cannot parse the config, or
if there are any unknown fields in it.
If the client does not supply a config, the pageserver will use its defaults.
This behavior is deprecated: https://github.com/neondatabase/neon/issues/4282
requestBody:
required: false
content:
application/json:
schema:
$ref: "#/components/schemas/TenantAttachRequest"
responses:
"202":
description: Tenant attaching scheduled
"400":
description: Error when no tenant id found in path parameters
content:
application/json:
schema:
@@ -940,13 +922,6 @@ components:
new_tenant_id:
type: string
format: hex
TenantAttachRequest:
type: object
required:
- config
properties:
config:
$ref: '#/components/schemas/TenantConfig'
TenantConfigRequest:
allOf:
- $ref: '#/components/schemas/TenantConfig'

View File

@@ -5,14 +5,12 @@ use anyhow::{anyhow, Context, Result};
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::{DownloadRemoteLayersTaskSpawnRequest, TenantAttachRequest};
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::http::endpoint::RequestSpan;
use utils::http::json::json_request_or_empty_body;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
use super::models::{
@@ -25,9 +23,7 @@ use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
};
use crate::tenant::mgr::{TenantMapInsertError, TenantStateError};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
@@ -54,7 +50,6 @@ struct State {
auth: Option<Arc<JwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
}
@@ -63,7 +58,6 @@ impl State {
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
@@ -75,7 +69,6 @@ impl State {
auth,
allowlist_routes,
remote_storage,
broker_client,
disk_usage_eviction_state,
})
}
@@ -146,36 +139,6 @@ impl From<TenantStateError> for ApiError {
}
}
impl From<GetTenantError> for ApiError {
fn from(tse: GetTenantError) -> ApiError {
match tse {
GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid)),
e @ GetTenantError::NotActive(_) => {
// Why is this not `ApiError::NotFound`?
// Because we must be careful to never return 404 for a tenant if it does
// in fact exist locally. If we did, the caller could draw the conclusion
// that it can attach the tenant to another PS and we'd be in split-brain.
//
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
ApiError::InternalServerError(anyhow::Error::new(e))
}
}
}
}
impl From<SetNewTenantConfigError> for ApiError {
fn from(e: SetNewTenantConfigError) -> ApiError {
match e {
SetNewTenantConfigError::GetTenant(tid) => {
ApiError::NotFound(anyhow!("tenant {}", tid))
}
e @ SetNewTenantConfigError::Persist(_) => {
ApiError::InternalServerError(anyhow::Error::new(e))
}
}
}
}
impl From<crate::tenant::DeleteTimelineError> for ApiError {
fn from(value: crate::tenant::DeleteTimelineError) -> Self {
use crate::tenant::DeleteTimelineError::*;
@@ -195,7 +158,7 @@ impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
match value {
// Report Precondition failed so client can distinguish between
// "tenant is missing" case from "timeline is missing"
Tenant(GetTenantError::NotFound(..)) => {
Tenant(TenantStateError::NotFound(..)) => {
ApiError::PreconditionFailed("Requested tenant is missing")
}
Tenant(t) => ApiError::from(t),
@@ -307,8 +270,6 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
let state = get_state(&request);
async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
match tenant.create_timeline(
@@ -316,7 +277,6 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
request_data.ancestor_timeline_id.map(TimelineId::from),
request_data.ancestor_start_lsn,
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION),
state.broker_client.clone(),
&ctx,
)
.await {
@@ -426,16 +386,11 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
json_response(StatusCode::OK, result)
}
async fn tenant_attach_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
// TODO makes sense to provide tenant config right away the same way as it handled in tenant_create
async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let maybe_body: Option<TenantAttachRequest> = json_request_or_empty_body(&mut request).await?;
let tenant_conf = match maybe_body {
Some(request) => TenantConfOpt::try_from(&*request.config).map_err(ApiError::BadRequest)?,
None => TenantConfOpt::default(),
};
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
info!("Handling tenant attach {tenant_id}");
@@ -446,8 +401,9 @@ async fn tenant_attach_handler(mut request: Request<Body>) -> Result<Response<Bo
mgr::attach_tenant(
state.conf,
tenant_id,
tenant_conf,
state.broker_client.clone(),
// XXX: Attach should provide the config, especially during tenant migration.
// See https://github.com/neondatabase/neon/issues/1555
TenantConfOpt::default(),
remote_storage.clone(),
&ctx,
)
@@ -497,15 +453,9 @@ async fn tenant_load_handler(request: Request<Body>) -> Result<Response<Body>, A
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
mgr::load_tenant(
state.conf,
tenant_id,
state.broker_client.clone(),
state.remote_storage.clone(),
&ctx,
)
.instrument(info_span!("load", tenant = %tenant_id))
.await?;
mgr::load_tenant(state.conf, tenant_id, state.remote_storage.clone(), &ctx)
.instrument(info_span!("load", tenant = %tenant_id))
.await?;
json_response(StatusCode::ACCEPTED, ())
}
@@ -557,7 +507,7 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
}
let state = tenant.current_state();
Result::<_, ApiError>::Ok(TenantInfo {
Ok(TenantInfo {
id: tenant_id,
state: state.clone(),
current_physical_size: Some(current_physical_size),
@@ -565,7 +515,8 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
})
}
.instrument(info_span!("tenant_status_handler", tenant = %tenant_id))
.await?;
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, tenant_info)
}
@@ -789,7 +740,6 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
state.conf,
tenant_conf,
target_tenant_id,
state.broker_client.clone(),
state.remote_storage.clone(),
&ctx,
)
@@ -1145,7 +1095,6 @@ pub fn make_router(
conf: &'static PageServerConf,
launch_ts: &'static LaunchTimestamp,
auth: Option<Arc<JwtAuth>>,
broker_client: BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
@@ -1192,14 +1141,8 @@ pub fn make_router(
Ok(router
.data(Arc::new(
State::new(
conf,
auth,
remote_storage,
broker_client,
disk_usage_eviction_state,
)
.context("Failed to initialize router state")?,
State::new(conf, auth, remote_storage, disk_usage_eviction_state)
.context("Failed to initialize router state")?,
))
.get("/v1/status", |r| RequestSpan(status_handler).handle(r))
.put(

View File

@@ -1,5 +1,6 @@
mod auth;
pub mod basebackup;
pub mod broker_client;
pub mod config;
pub mod consumption_metrics;
pub mod context;
@@ -35,7 +36,7 @@ use tracing::info;
/// backwards-compatible changes to the metadata format.
pub const STORAGE_FORMAT_VERSION: u16 = 3;
pub const DEFAULT_PG_VERSION: u32 = 15;
pub const DEFAULT_PG_VERSION: u32 = 14;
// Magic constants used to identify different kinds of files
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;

View File

@@ -50,9 +50,7 @@ use crate::import_datadir::import_wal_from_tar;
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant;
use crate::tenant::mgr;
use crate::tenant::mgr::GetTenantError;
use crate::tenant::{Tenant, Timeline};
use crate::trace::Tracer;
@@ -174,7 +172,6 @@ async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()
///
pub async fn libpq_listener_main(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
listener: TcpListener,
auth_type: AuthType,
@@ -216,14 +213,7 @@ pub async fn libpq_listener_main(
None,
"serving compute connection task",
false,
page_service_conn_main(
conf,
broker_client.clone(),
local_auth,
socket,
auth_type,
connection_ctx,
),
page_service_conn_main(conf, local_auth, socket, auth_type, connection_ctx),
);
}
Err(err) => {
@@ -240,7 +230,6 @@ pub async fn libpq_listener_main(
async fn page_service_conn_main(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
socket: tokio::net::TcpStream,
auth_type: AuthType,
@@ -277,7 +266,7 @@ async fn page_service_conn_main(
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
let mut conn_handler = PageServerHandler::new(conf, auth, connection_ctx);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend
@@ -335,7 +324,6 @@ impl PageRequestMetrics {
struct PageServerHandler {
_conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
claims: Option<Claims>,
@@ -349,13 +337,11 @@ struct PageServerHandler {
impl PageServerHandler {
pub fn new(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
connection_ctx: RequestContext,
) -> Self {
PageServerHandler {
_conf: conf,
broker_client,
auth,
claims: None,
connection_ctx,
@@ -508,12 +494,7 @@ impl PageServerHandler {
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
timeline
.import_basebackup_from_tar(
&mut copyin_reader,
base_lsn,
self.broker_client.clone(),
&ctx,
)
.import_basebackup_from_tar(&mut copyin_reader, base_lsn, &ctx)
.await?;
// Read the end of the tar archive.
@@ -1150,9 +1131,7 @@ enum GetActiveTenantError {
wait_time: Duration,
},
#[error(transparent)]
NotFound(GetTenantError),
#[error(transparent)]
WaitTenantActive(tenant::WaitToBecomeActiveError),
Other(#[from] anyhow::Error),
}
impl From<GetActiveTenantError> for QueryError {
@@ -1161,8 +1140,7 @@ impl From<GetActiveTenantError> for QueryError {
GetActiveTenantError::WaitForActiveTimeout { .. } => QueryError::Disconnected(
ConnectionError::Io(io::Error::new(io::ErrorKind::TimedOut, e.to_string())),
),
GetActiveTenantError::WaitTenantActive(e) => QueryError::Other(anyhow::Error::new(e)),
GetActiveTenantError::NotFound(e) => QueryError::Other(anyhow::Error::new(e)),
GetActiveTenantError::Other(e) => QueryError::Other(e),
}
}
}
@@ -1178,16 +1156,13 @@ async fn get_active_tenant_with_timeout(
) -> Result<Arc<Tenant>, GetActiveTenantError> {
let tenant = match mgr::get_tenant(tenant_id, false).await {
Ok(tenant) => tenant,
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
Err(GetTenantError::NotActive(_)) => {
unreachable!("we're calling get_tenant with active=false")
}
Err(e) => return Err(GetActiveTenantError::Other(e.into())),
};
let wait_time = Duration::from_secs(30);
match tokio::time::timeout(wait_time, tenant.wait_to_become_active()).await {
Ok(Ok(())) => Ok(tenant),
// no .context(), the error message is good enough and some tests depend on it
Ok(Err(e)) => Err(GetActiveTenantError::WaitTenantActive(e)),
Ok(Err(wait_error)) => Err(GetActiveTenantError::Other(wait_error)),
Err(_) => {
let latest_state = tenant.current_state();
if latest_state == TenantState::Active {
@@ -1202,34 +1177,13 @@ async fn get_active_tenant_with_timeout(
}
}
#[derive(Debug, thiserror::Error)]
enum GetActiveTimelineError {
#[error(transparent)]
Tenant(GetActiveTenantError),
#[error(transparent)]
Timeline(anyhow::Error),
}
impl From<GetActiveTimelineError> for QueryError {
fn from(e: GetActiveTimelineError) -> Self {
match e {
GetActiveTimelineError::Tenant(e) => e.into(),
GetActiveTimelineError::Timeline(e) => QueryError::Other(e),
}
}
}
/// Shorthand for getting a reference to a Timeline of an Active tenant.
async fn get_active_tenant_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, GetActiveTimelineError> {
let tenant = get_active_tenant_with_timeout(tenant_id, ctx)
.await
.map_err(GetActiveTimelineError::Tenant)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(GetActiveTimelineError::Timeline)?;
) -> Result<Arc<Timeline>, GetActiveTenantError> {
let tenant = get_active_tenant_with_timeout(tenant_id, ctx).await?;
let timeline = tenant.get_timeline(timeline_id, true)?;
Ok(timeline)
}

View File

@@ -1600,7 +1600,9 @@ pub fn create_test_timeline(
pg_version: u32,
ctx: &RequestContext,
) -> anyhow::Result<std::sync::Arc<Timeline>> {
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
let tline = tenant
.create_empty_timeline(timeline_id, Lsn(8), pg_version, ctx)?
.initialize(ctx)?;
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit()?;

View File

@@ -16,7 +16,6 @@ use futures::FutureExt;
use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio::task::JoinSet;
use tracing::*;
@@ -78,7 +77,7 @@ use utils::{
lsn::{Lsn, RecordLsn},
};
pub mod blob_io;
mod blob_io;
pub mod block_io;
pub mod disk_btree;
pub(crate) mod ephemeral_file;
@@ -185,14 +184,24 @@ impl UninitializedTimeline<'_> {
/// Ensures timeline data is valid, loads it into pageserver's memory and removes
/// uninit mark file on success.
///
/// This function launches the flush loop if not already done.
///
/// The caller is responsible for activating the timeline (function `.activate()`).
/// The new timeline is initialized in Active state, and its background jobs are
/// started
pub fn initialize(self, ctx: &RequestContext) -> anyhow::Result<Arc<Timeline>> {
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
self.initialize_with_lock(ctx, &mut timelines, true, true)
}
/// Like `initialize`, but the caller is already holding lock on Tenant::timelines.
/// If `launch_wal_receiver` is false, the WAL receiver not launched, even though
/// timeline is initialized in Active state. This is used during tenant load and
/// attach, where the WAL receivers are launched only after all the timelines have
/// been initialized.
fn initialize_with_lock(
mut self,
_ctx: &RequestContext,
ctx: &RequestContext,
timelines: &mut HashMap<TimelineId, Arc<Timeline>>,
load_layer_map: bool,
activate: bool,
) -> anyhow::Result<Arc<Timeline>> {
let timeline_id = self.timeline_id;
let tenant_id = self.owning_tenant.tenant_id;
@@ -228,6 +237,12 @@ impl UninitializedTimeline<'_> {
v.insert(Arc::clone(&new_timeline));
new_timeline.maybe_spawn_flush_loop();
if activate {
new_timeline
.activate(ctx)
.context("initializing timeline activation")?;
}
}
}
@@ -239,7 +254,6 @@ impl UninitializedTimeline<'_> {
self,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let raw_timeline = self.raw_timeline()?;
@@ -265,9 +279,7 @@ impl UninitializedTimeline<'_> {
// Initialize without loading the layer map. We started with an empty layer map, and already
// updated it for the layers that we created during the import.
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
let tl = self.initialize_with_lock(ctx, &mut timelines, false)?;
tl.activate(broker_client, ctx)?;
Ok(tl)
self.initialize_with_lock(ctx, &mut timelines, false, true)
}
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
@@ -452,34 +464,6 @@ struct RemoteStartupData {
remote_metadata: TimelineMetadata,
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum WaitToBecomeActiveError {
WillNotBecomeActive {
tenant_id: TenantId,
state: TenantState,
},
TenantDropped {
tenant_id: TenantId,
},
}
impl std::fmt::Display for WaitToBecomeActiveError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WaitToBecomeActiveError::WillNotBecomeActive { tenant_id, state } => {
write!(
f,
"Tenant {} will not become active. Current state: {:?}",
tenant_id, state
)
}
WaitToBecomeActiveError::TenantDropped { tenant_id } => {
write!(f, "Tenant {tenant_id} will not become active (dropped)")
}
}
}
}
impl Tenant {
/// Yet another helper for timeline initialization.
/// Contains the common part of `load_local_timeline` and `load_remote_timeline`.
@@ -535,7 +519,7 @@ impl Tenant {
// Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote
// But we shouldnt start walreceiver before we have all the data locally, because working walreceiver
// will ingest data which may require looking at the layers which are not yet available locally
match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true) {
match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true, false) {
Ok(new_timeline) => new_timeline,
Err(e) => {
error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}");
@@ -615,7 +599,6 @@ impl Tenant {
pub(crate) fn spawn_attach(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
@@ -645,12 +628,7 @@ impl Tenant {
"attach tenant",
false,
async move {
let doit = async {
tenant_clone.attach(&ctx).await?;
tenant_clone.activate(broker_client, &ctx)?;
anyhow::Ok(())
};
match doit.await {
match tenant_clone.attach(ctx).await {
Ok(_) => {}
Err(e) => {
tenant_clone.set_broken(e.to_string());
@@ -658,12 +636,7 @@ impl Tenant {
}
}
Ok(())
}
.instrument({
let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_id);
span.follows_from(Span::current());
span
}),
},
);
Ok(tenant)
}
@@ -671,9 +644,8 @@ impl Tenant {
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
async fn attach(self: &Arc<Tenant>, ctx: RequestContext) -> anyhow::Result<()> {
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
if !tokio::fs::try_exists(&marker_file)
.await
@@ -763,14 +735,20 @@ impl Tenant {
.expect("just put it in above");
// TODO again handle early failure
self.load_remote_timeline(timeline_id, index_part, remote_metadata, remote_client, ctx)
.await
.with_context(|| {
format!(
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_id
)
})?;
self.load_remote_timeline(
timeline_id,
index_part,
remote_metadata,
remote_client,
&ctx,
)
.await
.with_context(|| {
format!(
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_id
)
})?;
}
std::fs::remove_file(&marker_file)
@@ -780,6 +758,10 @@ impl Tenant {
utils::failpoint_sleep_millis_async!("attach-before-activate");
// Start background operations and open the tenant for business.
// The loops will shut themselves down when they notice that the tenant is inactive.
self.activate(&ctx)?;
info!("Done");
Ok(())
@@ -885,7 +867,6 @@ impl Tenant {
pub fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Arc<Tenant> {
@@ -920,12 +901,7 @@ impl Tenant {
"initial tenant load",
false,
async move {
let doit = async {
tenant_clone.load(&ctx).await?;
tenant_clone.activate(broker_client, &ctx)?;
anyhow::Ok(())
};
match doit.await {
match tenant_clone.load(&ctx).await {
Ok(()) => {}
Err(err) => {
tenant_clone.set_broken(err.to_string());
@@ -934,12 +910,7 @@ impl Tenant {
}
info!("initial load for tenant {tenant_id} finished!");
Ok(())
}
.instrument({
let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
span.follows_from(Span::current());
span
}),
},
);
info!("spawned load into background");
@@ -951,9 +922,8 @@ impl Tenant {
/// Background task to load in-memory data structures for this tenant, from
/// files on disk. Used at pageserver startup.
///
#[instrument(skip(self, ctx), fields(tenant_id=%self.tenant_id))]
async fn load(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
info!("loading tenant task");
utils::failpoint_sleep_millis_async!("before-loading-tenant");
@@ -1069,6 +1039,10 @@ impl Tenant {
.with_context(|| format!("load local timeline {timeline_id}"))?;
}
// Start background operations and open the tenant for business.
// The loops will shut themselves down when they notice that the tenant is inactive.
self.activate(ctx)?;
info!("Done");
Ok(())
@@ -1232,27 +1206,6 @@ impl Tenant {
)
}
/// Helper for unit tests to create an emtpy timeline.
///
/// The timeline is has state value `Active` but its background loops are not running.
// This makes the various functions which anyhow::ensure! for Active state work in tests.
// Our current tests don't need the background loops.
#[cfg(test)]
pub fn create_test_timeline(
&self,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
pg_version: u32,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
let mut timelines = self.timelines.lock().unwrap();
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, true)?;
// The non-test code would call tl.activate() here.
tl.set_state(TimelineState::Active);
Ok(tl)
}
/// Create a new timeline.
///
/// Returns the new timeline ID and reference to its Timeline object.
@@ -1266,7 +1219,6 @@ impl Tenant {
ancestor_timeline_id: Option<TimelineId>,
mut ancestor_start_lsn: Option<Lsn>,
pg_version: u32,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<Option<Arc<Timeline>>> {
anyhow::ensure!(
@@ -1333,8 +1285,6 @@ impl Tenant {
}
};
loaded_timeline.activate(broker_client, ctx)?;
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
// Ok, the timeline is durable in remote storage.
@@ -1638,11 +1588,7 @@ impl Tenant {
}
/// Changes tenant status to active, unless shutdown was already requested.
fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<()> {
fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
let mut result = Ok(());
@@ -1675,14 +1621,14 @@ impl Tenant {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self);
tasks::start_background_loops(self.tenant_id);
let mut activated_timelines = 0;
let mut timelines_broken_during_activation = 0;
for timeline in not_broken_timelines {
match timeline
.activate(broker_client.clone(), ctx)
.activate(ctx)
.context("timeline activation for activating tenant")
{
Ok(()) => {
@@ -1790,30 +1736,25 @@ impl Tenant {
self.state.subscribe()
}
pub(crate) async fn wait_to_become_active(&self) -> Result<(), WaitToBecomeActiveError> {
pub async fn wait_to_become_active(&self) -> anyhow::Result<()> {
let mut receiver = self.state.subscribe();
loop {
let current_state = receiver.borrow_and_update().clone();
match current_state {
TenantState::Loading | TenantState::Attaching => {
// in these states, there's a chance that we can reach ::Active
receiver.changed().await.map_err(
|_e: tokio::sync::watch::error::RecvError| {
WaitToBecomeActiveError::TenantDropped {
tenant_id: self.tenant_id,
}
},
)?;
receiver.changed().await?;
}
TenantState::Active { .. } => {
return Ok(());
}
TenantState::Broken { .. } | TenantState::Stopping => {
// There's no chance the tenant can transition back into ::Active
return Err(WaitToBecomeActiveError::WillNotBecomeActive {
tenant_id: self.tenant_id,
state: current_state,
});
anyhow::bail!(
"Tenant {} will not become active. Current state: {:?}",
self.tenant_id,
&current_state,
);
}
}
}
@@ -2337,45 +2278,13 @@ impl Tenant {
Ok(gc_timelines)
}
/// A substitute for `branch_timeline` for use in unit tests.
/// The returned timeline will have state value `Active` to make various `anyhow::ensure!()`
/// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the
/// timeline background tasks are launched, except the flush loop.
#[cfg(test)]
async fn branch_timeline_test(
&self,
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let tl = self
.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
.await?;
tl.set_state(TimelineState::Active);
Ok(tl)
}
/// Branch an existing timeline.
///
/// The caller is responsible for activating the returned timeline.
/// Branch an existing timeline
async fn branch_timeline(
&self,
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
.await
}
async fn branch_timeline_impl(
&self,
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let src_id = src_timeline.timeline_id;
@@ -2469,7 +2378,7 @@ impl Tenant {
false,
Some(Arc::clone(src_timeline)),
)?
.initialize_with_lock(ctx, &mut timelines, true)?
.initialize_with_lock(ctx, &mut timelines, true, true)?
};
// Root timeline gets its layers during creation and uploads them along with the metadata.
@@ -2490,8 +2399,6 @@ impl Tenant {
/// - run initdb to init temporary instance and get bootstrap data
/// - after initialization complete, remove the temp dir.
///
/// The caller is responsible for activating the returned timeline.
async fn bootstrap_timeline(
&self,
timeline_id: TimelineId,
@@ -2586,7 +2493,7 @@ impl Tenant {
// map above, when we imported the datadir.
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
raw_timeline.initialize_with_lock(ctx, &mut timelines, false)?
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?
};
info!(
@@ -3227,14 +3134,8 @@ pub mod harness {
let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?;
timelines_to_load.insert(timeline_id, timeline_metadata);
}
tenant
.load(ctx)
.instrument(info_span!("try_load", tenant_id=%self.tenant_id))
.await?;
tenant.state.send_replace(TenantState::Active);
for timeline in tenant.timelines.lock().unwrap().values() {
timeline.set_state(TimelineState::Active);
}
// FIXME starts background jobs
tenant.load(ctx).await?;
Ok(tenant)
}
@@ -3292,7 +3193,8 @@ mod tests {
#[tokio::test]
async fn test_basic() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
@@ -3325,7 +3227,9 @@ mod tests {
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
.load()
.await;
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let timeline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let _ = timeline.initialize(&ctx)?;
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) {
Ok(_) => panic!("duplicate timeline creation should fail"),
@@ -3356,7 +3260,8 @@ mod tests {
use std::str::from_utf8;
let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
let writer = tline.writer();
#[allow(non_snake_case)]
@@ -3378,7 +3283,7 @@ mod tests {
// Branch the history, modify relation differently on the new timeline
tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
.await?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
@@ -3453,7 +3358,8 @@ mod tests {
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
.load()
.await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
@@ -3466,7 +3372,7 @@ mod tests {
// try to branch at lsn 25, should fail because we already garbage collected the data
match tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
.await
{
Ok(_) => panic!("branching should have failed"),
@@ -3490,11 +3396,12 @@ mod tests {
.load()
.await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?;
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?
.initialize(&ctx)?;
// try to branch at lsn 0x25, should fail because initdb lsn is 0x50
match tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
.await
{
Ok(_) => panic!("branching should have failed"),
@@ -3540,11 +3447,13 @@ mod tests {
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
.load()
.await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?
.initialize(&ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.await?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
@@ -3588,11 +3497,12 @@ mod tests {
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
.load()
.await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.await?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
@@ -3611,11 +3521,12 @@ mod tests {
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
.load()
.await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.await?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
@@ -3644,7 +3555,8 @@ mod tests {
{
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?;
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
}
@@ -3664,14 +3576,14 @@ mod tests {
{
let (tenant, ctx) = harness.load().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
let child_tline = tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
tenant
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.await?;
child_tline.set_state(TimelineState::Active);
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
@@ -3701,8 +3613,9 @@ mod tests {
let harness = TenantHarness::create(TEST_NAME)?;
let (tenant, ctx) = harness.load().await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
drop(tline);
tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?
.initialize(&ctx)?;
drop(tenant);
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
@@ -3739,7 +3652,8 @@ mod tests {
#[tokio::test]
async fn test_images() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
@@ -3804,7 +3718,8 @@ mod tests {
#[tokio::test]
async fn test_bulk_insert() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
let mut lsn = Lsn(0x10);
@@ -3846,7 +3761,8 @@ mod tests {
#[tokio::test]
async fn test_random_updates() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let tline = tline.initialize(&ctx)?;
const NUM_KEYS: usize = 1000;
@@ -3919,8 +3835,9 @@ mod tests {
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
.load()
.await;
let mut tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let mut tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?
.initialize(&ctx)?;
const NUM_KEYS: usize = 1000;
@@ -3953,7 +3870,7 @@ mod tests {
for _ in 0..50 {
let new_tline_id = TimelineId::generate();
tenant
.branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
.branch_timeline(&tline, new_tline_id, Some(lsn), &ctx)
.await?;
tline = tenant
.get_timeline(new_tline_id, true)
@@ -4002,8 +3919,9 @@ mod tests {
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
.load()
.await;
let mut tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let mut tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?
.initialize(&ctx)?;
const NUM_KEYS: usize = 100;
const NUM_TLINES: usize = 50;
@@ -4018,7 +3936,7 @@ mod tests {
for idx in 0..NUM_TLINES {
let new_tline_id = TimelineId::generate();
tenant
.branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
.branch_timeline(&tline, new_tline_id, Some(lsn), &ctx)
.await?;
tline = tenant
.get_timeline(new_tline_id, true)

View File

@@ -58,10 +58,9 @@ static TENANTS: Lazy<RwLock<TenantsMap>> = Lazy::new(|| RwLock::new(TenantsMap::
/// Initialize repositories with locally available timelines.
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
/// are scheduled for download and added to the tenant once download is completed.
#[instrument(skip_all)]
#[instrument(skip(conf, remote_storage))]
pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
@@ -117,7 +116,6 @@ pub async fn init_tenant_mgr(
match schedule_local_tenant_processing(
conf,
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
&ctx,
) {
@@ -152,7 +150,6 @@ pub async fn init_tenant_mgr(
pub fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
@@ -189,7 +186,7 @@ pub fn schedule_local_tenant_processing(
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
if let Some(remote_storage) = remote_storage {
match Tenant::spawn_attach(conf, tenant_id, broker_client, remote_storage, ctx) {
match Tenant::spawn_attach(conf, tenant_id, remote_storage, ctx) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
@@ -207,7 +204,7 @@ pub fn schedule_local_tenant_processing(
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(conf, tenant_id, broker_client, remote_storage, ctx)
Tenant::spawn_load(conf, tenant_id, remote_storage, ctx)
};
Ok(tenant)
}
@@ -278,7 +275,6 @@ pub async fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
@@ -291,7 +287,7 @@ pub async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, ctx)?;
schedule_local_tenant_processing(conf, &tenant_directory, remote_storage, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -304,19 +300,11 @@ pub async fn create_tenant(
}).await
}
#[derive(Debug, thiserror::Error)]
pub enum SetNewTenantConfigError {
#[error(transparent)]
GetTenant(#[from] GetTenantError),
#[error(transparent)]
Persist(anyhow::Error),
}
pub async fn set_new_tenant_config(
conf: &'static PageServerConf,
new_tenant_conf: TenantConfOpt,
tenant_id: TenantId,
) -> Result<(), SetNewTenantConfigError> {
) -> Result<(), TenantStateError> {
info!("configuring tenant {tenant_id}");
let tenant = get_tenant(tenant_id, true).await?;
@@ -326,32 +314,23 @@ pub async fn set_new_tenant_config(
&tenant_config_path,
new_tenant_conf,
false,
)
.map_err(SetNewTenantConfigError::Persist)?;
)?;
tenant.set_new_tenant_config(new_tenant_conf);
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum GetTenantError {
#[error("Tenant {0} not found")]
NotFound(TenantId),
#[error("Tenant {0} is not active")]
NotActive(TenantId),
}
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
pub async fn get_tenant(
tenant_id: TenantId,
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
) -> Result<Arc<Tenant>, TenantStateError> {
let m = TENANTS.read().await;
let tenant = m
.get(&tenant_id)
.ok_or(GetTenantError::NotFound(tenant_id))?;
.ok_or(TenantStateError::NotFound(tenant_id))?;
if active_only && !tenant.is_active() {
Err(GetTenantError::NotActive(tenant_id))
Err(TenantStateError::NotActive(tenant_id))
} else {
Ok(Arc::clone(tenant))
}
@@ -360,7 +339,7 @@ pub async fn get_tenant(
#[derive(Debug, thiserror::Error)]
pub enum DeleteTimelineError {
#[error("Tenant {0}")]
Tenant(#[from] GetTenantError),
Tenant(#[from] TenantStateError),
#[error("Timeline {0}")]
Timeline(#[from] crate::tenant::DeleteTimelineError),
@@ -425,7 +404,6 @@ pub async fn detach_tenant(
pub async fn load_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
@@ -437,7 +415,7 @@ pub async fn load_tenant(
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, ctx)
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, remote_storage, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -494,7 +472,6 @@ pub async fn attach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_conf: TenantConfOpt,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
@@ -510,7 +487,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, Some(remote_storage), ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233

View File

@@ -1264,7 +1264,9 @@ mod tests {
let harness = TenantHarness::create(test_name)?;
let (tenant, ctx) = runtime.block_on(harness.load());
// create an empty timeline directory
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let timeline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
let _ = timeline.initialize(&ctx).unwrap();
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;

View File

@@ -542,7 +542,7 @@ impl From<LayerFileName> for LayerDescriptor {
///
/// This is used by DeltaLayer and ImageLayer. Normally, this holds a reference to the
/// global config, and paths to layer files are constructed using the tenant/timeline
/// path from the config. But in the 'pagectl' binary, we need to construct a Layer
/// path from the config. But in the 'pageserver_binutils' binary, we need to construct a Layer
/// struct for a file on disk, without having a page server running, so that we have no
/// config. In that case, we use the Path variant to hold the full path to the file on
/// disk.

View File

@@ -110,7 +110,7 @@ const WILL_INIT: u64 = 1;
/// reading/deserializing records themselves.
///
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub struct BlobRef(pub u64);
struct BlobRef(u64);
impl BlobRef {
pub fn will_init(&self) -> bool {
@@ -619,7 +619,7 @@ impl DeltaLayer {
/// Create a DeltaLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
/// This variant is only used for debugging purposes, by the 'pageserver_binutils' binary.
pub fn new_for_path(path: &Path, file: File) -> Result<Self> {
let mut summary_buf = Vec::new();
summary_buf.resize(PAGE_SZ, 0);

View File

@@ -422,7 +422,7 @@ impl ImageLayer {
/// Create an ImageLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
/// This variant is only used for debugging purposes, by the 'pageserver_binutils' binary.
pub fn new_for_path(path: &Path, file: File) -> Result<ImageLayer> {
let mut summary_buf = Vec::new();
summary_buf.resize(PAGE_SZ, 0);

View File

@@ -9,12 +9,13 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::mgr;
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TenantId;
pub fn start_background_loops(tenant: &Arc<Tenant>) {
let tenant_id = tenant.tenant_id;
pub fn start_background_loops(tenant_id: TenantId) {
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
@@ -22,14 +23,11 @@ pub fn start_background_loops(tenant: &Arc<Tenant>) {
None,
&format!("compactor for tenant {tenant_id}"),
false,
{
let tenant = Arc::clone(tenant);
async move {
compaction_loop(tenant)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
}
async move {
compaction_loop(tenant_id)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
},
);
task_mgr::spawn(
@@ -39,14 +37,11 @@ pub fn start_background_loops(tenant: &Arc<Tenant>) {
None,
&format!("garbage collector for tenant {tenant_id}"),
false,
{
let tenant = Arc::clone(tenant);
async move {
gc_loop(tenant)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;
Ok(())
}
async move {
gc_loop(tenant_id)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;
Ok(())
},
);
}
@@ -54,7 +49,7 @@ pub fn start_background_loops(tenant: &Arc<Tenant>) {
///
/// Compaction task's main loop
///
async fn compaction_loop(tenant: Arc<Tenant>) {
async fn compaction_loop(tenant_id: TenantId) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
@@ -65,16 +60,16 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
loop {
trace!("waking up");
tokio::select! {
let tenant = tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
ControlFlow::Continue(tenant) => tenant,
},
}
};
let period = tenant.get_compaction_period();
@@ -124,7 +119,7 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
///
/// GC task's main loop
///
async fn gc_loop(tenant: Arc<Tenant>) {
async fn gc_loop(tenant_id: TenantId) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
@@ -132,22 +127,21 @@ async fn gc_loop(tenant: Arc<Tenant>) {
let cancel = task_mgr::shutdown_token();
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx =
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let mut first = true;
loop {
trace!("waking up");
tokio::select! {
let tenant = tokio::select! {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
ControlFlow::Continue(tenant) => tenant,
},
}
};
let period = tenant.get_gc_period();
@@ -167,9 +161,7 @@ async fn gc_loop(tenant: Arc<Tenant>) {
Duration::from_secs(10)
} else {
// Run gc
let res = tenant
.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx)
.await;
let res = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await;
if let Err(e) = res {
error!("Gc failed, retrying in {:?}: {e:?}", wait_duration);
wait_duration
@@ -195,10 +187,23 @@ async fn gc_loop(tenant: Arc<Tenant>) {
trace!("GC loop stopped.");
}
async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
async fn wait_for_active_tenant(
tenant_id: TenantId,
wait: Duration,
) -> ControlFlow<(), Arc<Tenant>> {
let tenant = loop {
match mgr::get_tenant(tenant_id, false).await {
Ok(tenant) => break tenant,
Err(e) => {
error!("Failed to get a tenant {tenant_id}: {e:#}");
tokio::time::sleep(wait).await;
}
}
};
// if the tenant has a proper status already, no need to wait for anything
if tenant.current_state() == TenantState::Active {
ControlFlow::Continue(())
ControlFlow::Continue(tenant)
} else {
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
loop {
@@ -208,7 +213,7 @@ async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
match new_state {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");
return ControlFlow::Continue(());
return ControlFlow::Continue(tenant);
}
state => {
debug!("Not running the task loop, tenant is not active: {state:?}");

View File

@@ -31,6 +31,7 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::broker_client::{get_broker_client, is_broker_client_initialized};
use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
@@ -906,12 +907,15 @@ impl Timeline {
Ok(())
}
pub fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.launch_wal_receiver(ctx, broker_client)?;
pub fn activate(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
if is_broker_client_initialized() {
self.launch_wal_receiver(ctx, get_broker_client().clone())?;
} else if cfg!(test) {
info!("not launching WAL receiver because broker client hasn't been initialized");
} else {
anyhow::bail!("broker client not initialized");
}
self.set_state(TimelineState::Active);
self.launch_eviction_task();
Ok(())

View File

@@ -1309,8 +1309,9 @@ mod tests {
async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState {
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
.create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
.expect("Failed to create an empty timeline for dummy wal connection manager");
let timeline = timeline.initialize(&ctx).unwrap();
ConnectionManagerState {
id: TenantTimelineId {

View File

@@ -11,12 +11,10 @@ OBJS = \
pagestore_smgr.o \
relsize_cache.o \
walproposer.o \
walproposer_utils.o \
control_plane_connector.o
walproposer_utils.o
PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
EXTENSION = neon
DATA = neon--1.0.sql

View File

@@ -1,830 +0,0 @@
/*-------------------------------------------------------------------------
*
* control_plane_connector.c
* Captures updates to roles/databases using ProcessUtility_hook and
* sends them to the control ProcessUtility_hook. The changes are sent
* via HTTP to the URL specified by the GUC neon.console_url when the
* transaction commits. Forwarding may be disabled temporarily by
* setting neon.forward_ddl to false.
*
* Currently, the transaction may abort AFTER
* changes have already been forwarded, and that case is not handled.
* Subtransactions are handled using a stack of hash tables, which
* accumulate changes. On subtransaction commit, the top of the stack
* is merged with the table below it.
*
* IDENTIFICATION
* contrib/neon/control_plane_connector.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "access/xact.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "commands/defrem.h"
#include "miscadmin.h"
#include "utils/acl.h"
#include "fmgr.h"
#include "utils/guc.h"
#include "port.h"
#include <curl/curl.h>
#include "utils/jsonb.h"
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
/* GUCs */
static char *ConsoleURL = NULL;
static bool ForwardDDL = true;
/* Curl structures for sending the HTTP requests */
static CURL * CurlHandle;
static struct curl_slist *ContentHeader = NULL;
/*
* CURL docs say that this buffer must exist until we call curl_easy_cleanup
* (which we never do), so we make this a static
*/
static char CurlErrorBuf[CURL_ERROR_SIZE];
typedef enum
{
Op_Set, /* An upsert: Either a creation or an alter */
Op_Delete,
} OpType;
typedef struct
{
char name[NAMEDATALEN];
Oid owner;
char old_name[NAMEDATALEN];
OpType type;
} DbEntry;
typedef struct
{
char name[NAMEDATALEN];
char old_name[NAMEDATALEN];
const char *password;
OpType type;
} RoleEntry;
/*
* We keep one of these for each subtransaction in a stack. When a subtransaction
* commits, we merge the top of the stack into the table below it. It is allocated in the
* subtransaction's context.
*/
typedef struct DdlHashTable
{
struct DdlHashTable *prev_table;
HTAB *db_table;
HTAB *role_table;
} DdlHashTable;
static DdlHashTable RootTable;
static DdlHashTable * CurrentDdlTable = &RootTable;
static void
PushKeyValue(JsonbParseState **state, char *key, char *value)
{
JsonbValue k,
v;
k.type = jbvString;
k.val.string.len = strlen(key);
k.val.string.val = key;
v.type = jbvString;
v.val.string.len = strlen(value);
v.val.string.val = value;
pushJsonbValue(state, WJB_KEY, &k);
pushJsonbValue(state, WJB_VALUE, &v);
}
static char *
ConstructDeltaMessage()
{
JsonbParseState *state = NULL;
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
if (RootTable.db_table)
{
JsonbValue dbs;
dbs.type = jbvString;
dbs.val.string.val = "dbs";
dbs.val.string.len = strlen(dbs.val.string.val);
pushJsonbValue(&state, WJB_KEY, &dbs);
pushJsonbValue(&state, WJB_BEGIN_ARRAY, NULL);
HASH_SEQ_STATUS status;
DbEntry *entry;
hash_seq_init(&status, RootTable.db_table);
while ((entry = hash_seq_search(&status)) != NULL)
{
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
PushKeyValue(&state, "op", entry->type == Op_Set ? "set" : "del");
PushKeyValue(&state, "name", entry->name);
if (entry->owner != InvalidOid)
{
PushKeyValue(&state, "owner", GetUserNameFromId(entry->owner, false));
}
if (entry->old_name[0] != '\0')
{
PushKeyValue(&state, "old_name", entry->old_name);
}
pushJsonbValue(&state, WJB_END_OBJECT, NULL);
}
pushJsonbValue(&state, WJB_END_ARRAY, NULL);
}
if (RootTable.role_table)
{
JsonbValue roles;
roles.type = jbvString;
roles.val.string.val = "roles";
roles.val.string.len = strlen(roles.val.string.val);
pushJsonbValue(&state, WJB_KEY, &roles);
pushJsonbValue(&state, WJB_BEGIN_ARRAY, NULL);
HASH_SEQ_STATUS status;
RoleEntry *entry;
hash_seq_init(&status, RootTable.role_table);
while ((entry = hash_seq_search(&status)) != NULL)
{
pushJsonbValue(&state, WJB_BEGIN_OBJECT, NULL);
PushKeyValue(&state, "op", entry->type == Op_Set ? "set" : "del");
PushKeyValue(&state, "name", entry->name);
if (entry->password)
{
PushKeyValue(&state, "password", (char *) entry->password);
}
if (entry->old_name[0] != '\0')
{
PushKeyValue(&state, "old_name", entry->old_name);
}
pushJsonbValue(&state, WJB_END_OBJECT, NULL);
}
pushJsonbValue(&state, WJB_END_ARRAY, NULL);
}
JsonbValue *result = pushJsonbValue(&state, WJB_END_OBJECT, NULL);
Jsonb *jsonb = JsonbValueToJsonb(result);
return JsonbToCString(NULL, &jsonb->root, 0 /* estimated_len */ );
}
#define ERROR_SIZE 1024
typedef struct
{
char str[ERROR_SIZE];
size_t size;
} ErrorString;
static size_t
ErrorWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata)
{
/* Docs say size is always 1 */
ErrorString *str = userdata;
size_t to_write = nmemb;
/* +1 for null terminator */
if (str->size + nmemb + 1 >= ERROR_SIZE)
to_write = ERROR_SIZE - str->size - 1;
/* Ignore everyrthing past the first ERROR_SIZE bytes */
if (to_write == 0)
return nmemb;
memcpy(str->str + str->size, ptr, to_write);
str->size += to_write;
str->str[str->size] = '\0';
return nmemb;
}
static void
SendDeltasToControlPlane()
{
if (!RootTable.db_table && !RootTable.role_table)
return;
if (!ConsoleURL)
{
elog(LOG, "ConsoleURL not set, skipping forwarding");
return;
}
if (!ForwardDDL)
return;
char *message = ConstructDeltaMessage();
ErrorString str = {};
curl_easy_setopt(CurlHandle, CURLOPT_CUSTOMREQUEST, "PATCH");
curl_easy_setopt(CurlHandle, CURLOPT_HTTPHEADER, ContentHeader);
curl_easy_setopt(CurlHandle, CURLOPT_POSTFIELDS, message);
curl_easy_setopt(CurlHandle, CURLOPT_URL, ConsoleURL);
curl_easy_setopt(CurlHandle, CURLOPT_ERRORBUFFER, CurlErrorBuf);
curl_easy_setopt(CurlHandle, CURLOPT_TIMEOUT, 3L /* seconds */ );
curl_easy_setopt(CurlHandle, CURLOPT_WRITEDATA, &str);
curl_easy_setopt(CurlHandle, CURLOPT_WRITEFUNCTION, ErrorWriteCallback);
const int num_retries = 5;
int curl_status;
for (int i = 0; i < num_retries; i++)
{
if ((curl_status = curl_easy_perform(CurlHandle)) == 0)
break;
elog(LOG, "Curl request failed on attempt %d: %s", i, CurlErrorBuf);
pg_usleep(1000 * 1000);
}
if (curl_status != 0)
{
elog(ERROR, "Failed to perform curl request: %s", CurlErrorBuf);
}
else
{
long response_code;
if (curl_easy_getinfo(CurlHandle, CURLINFO_RESPONSE_CODE, &response_code) != CURLE_UNKNOWN_OPTION)
{
bool error_exists = str.size != 0;
if (response_code != 200)
{
if (error_exists)
{
elog(ERROR,
"Received HTTP code %ld from control plane: %s",
response_code,
str.str);
}
else
{
elog(ERROR,
"Received HTTP code %ld from control plane",
response_code);
}
}
}
}
}
static void
InitDbTableIfNeeded()
{
if (!CurrentDdlTable->db_table)
{
HASHCTL db_ctl = {};
db_ctl.keysize = NAMEDATALEN;
db_ctl.entrysize = sizeof(DbEntry);
db_ctl.hcxt = CurTransactionContext;
CurrentDdlTable->db_table = hash_create(
"Dbs Created",
4,
&db_ctl,
HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
}
}
static void
InitRoleTableIfNeeded()
{
if (!CurrentDdlTable->role_table)
{
HASHCTL role_ctl = {};
role_ctl.keysize = NAMEDATALEN;
role_ctl.entrysize = sizeof(RoleEntry);
role_ctl.hcxt = CurTransactionContext;
CurrentDdlTable->role_table = hash_create(
"Roles Created",
4,
&role_ctl,
HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
}
}
static void
PushTable()
{
DdlHashTable *new_table = MemoryContextAlloc(CurTransactionContext, sizeof(DdlHashTable));
new_table->prev_table = CurrentDdlTable;
new_table->role_table = NULL;
new_table->db_table = NULL;
CurrentDdlTable = new_table;
}
static void
MergeTable()
{
DdlHashTable *old_table = CurrentDdlTable;
CurrentDdlTable = old_table->prev_table;
if (old_table->db_table)
{
InitDbTableIfNeeded();
DbEntry *entry;
HASH_SEQ_STATUS status;
hash_seq_init(&status, old_table->db_table);
while ((entry = hash_seq_search(&status)) != NULL)
{
DbEntry *to_write = hash_search(
CurrentDdlTable->db_table,
entry->name,
HASH_ENTER,
NULL);
to_write->type = entry->type;
if (entry->owner != InvalidOid)
to_write->owner = entry->owner;
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
if (entry->old_name[0] != '\0')
{
bool found_old = false;
DbEntry *old = hash_search(
CurrentDdlTable->db_table,
entry->old_name,
HASH_FIND,
&found_old);
if (found_old)
{
if (old->old_name[0] != '\0')
strlcpy(to_write->old_name, old->old_name, NAMEDATALEN);
else
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
hash_search(
CurrentDdlTable->db_table,
entry->old_name,
HASH_REMOVE,
NULL);
}
}
}
hash_destroy(old_table->db_table);
}
if (old_table->role_table)
{
InitRoleTableIfNeeded();
RoleEntry *entry;
HASH_SEQ_STATUS status;
hash_seq_init(&status, old_table->role_table);
while ((entry = hash_seq_search(&status)) != NULL)
{
RoleEntry *to_write = hash_search(
CurrentDdlTable->role_table,
entry->name,
HASH_ENTER,
NULL);
to_write->type = entry->type;
if (entry->password)
to_write->password = entry->password;
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
if (entry->old_name[0] != '\0')
{
bool found_old = false;
RoleEntry *old = hash_search(
CurrentDdlTable->role_table,
entry->old_name,
HASH_FIND,
&found_old);
if (found_old)
{
if (old->old_name[0] != '\0')
strlcpy(to_write->old_name, old->old_name, NAMEDATALEN);
else
strlcpy(to_write->old_name, entry->old_name, NAMEDATALEN);
hash_search(CurrentDdlTable->role_table,
entry->old_name,
HASH_REMOVE,
NULL);
}
}
}
hash_destroy(old_table->role_table);
}
}
static void
PopTable()
{
/*
* Current table gets freed because it is allocated in aborted
* subtransaction's memory context.
*/
CurrentDdlTable = CurrentDdlTable->prev_table;
}
static void
NeonSubXactCallback(
SubXactEvent event,
SubTransactionId mySubid,
SubTransactionId parentSubid,
void *arg)
{
switch (event)
{
case SUBXACT_EVENT_START_SUB:
return PushTable();
case SUBXACT_EVENT_COMMIT_SUB:
return MergeTable();
case SUBXACT_EVENT_ABORT_SUB:
return PopTable();
default:
return;
}
}
static void
NeonXactCallback(XactEvent event, void *arg)
{
if (event == XACT_EVENT_PRE_COMMIT || event == XACT_EVENT_PARALLEL_PRE_COMMIT)
{
SendDeltasToControlPlane();
}
RootTable.role_table = NULL;
RootTable.db_table = NULL;
Assert(CurrentDdlTable == &RootTable);
}
static void
HandleCreateDb(CreatedbStmt *stmt)
{
InitDbTableIfNeeded();
DefElem *downer = NULL;
ListCell *option;
foreach(option, stmt->options)
{
DefElem *defel = lfirst(option);
if (strcmp(defel->defname, "owner") == 0)
downer = defel;
}
bool found = false;
DbEntry *entry = hash_search(
CurrentDdlTable->db_table,
stmt->dbname,
HASH_ENTER,
&found);
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
entry->type = Op_Set;
if (downer && downer->arg)
entry->owner = get_role_oid(defGetString(downer), false);
else
entry->owner = GetUserId();
}
static void
HandleAlterOwner(AlterOwnerStmt *stmt)
{
if (stmt->objectType != OBJECT_DATABASE)
return;
InitDbTableIfNeeded();
const char *name = strVal(stmt->object);
bool found = false;
DbEntry *entry = hash_search(
CurrentDdlTable->db_table,
name,
HASH_ENTER,
&found);
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
entry->owner = get_role_oid(get_rolespec_name(stmt->newowner), false);
entry->type = Op_Set;
}
static void
HandleDbRename(RenameStmt *stmt)
{
Assert(stmt->renameType == OBJECT_DATABASE);
InitDbTableIfNeeded();
bool found = false;
DbEntry *entry = hash_search(
CurrentDdlTable->db_table,
stmt->subname,
HASH_FIND,
&found);
DbEntry *entry_for_new_name = hash_search(
CurrentDdlTable->db_table,
stmt->newname,
HASH_ENTER,
NULL);
entry_for_new_name->type = Op_Set;
if (found)
{
if (entry->old_name[0] != '\0')
strlcpy(entry_for_new_name->old_name, entry->old_name, NAMEDATALEN);
else
strlcpy(entry_for_new_name->old_name, entry->name, NAMEDATALEN);
entry_for_new_name->owner = entry->owner;
hash_search(
CurrentDdlTable->db_table,
stmt->subname,
HASH_REMOVE,
NULL);
}
else
{
strlcpy(entry_for_new_name->old_name, stmt->subname, NAMEDATALEN);
entry_for_new_name->owner = InvalidOid;
}
}
static void
HandleDropDb(DropdbStmt *stmt)
{
InitDbTableIfNeeded();
bool found = false;
DbEntry *entry = hash_search(
CurrentDdlTable->db_table,
stmt->dbname,
HASH_ENTER,
&found);
entry->type = Op_Delete;
entry->owner = InvalidOid;
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
}
static void
HandleCreateRole(CreateRoleStmt *stmt)
{
InitRoleTableIfNeeded();
bool found = false;
RoleEntry *entry = hash_search(
CurrentDdlTable->role_table,
stmt->role,
HASH_ENTER,
&found);
DefElem *dpass = NULL;
ListCell *option;
foreach(option, stmt->options)
{
DefElem *defel = lfirst(option);
if (strcmp(defel->defname, "password") == 0)
dpass = defel;
}
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
if (dpass && dpass->arg)
entry->password = MemoryContextStrdup(CurTransactionContext, strVal(dpass->arg));
else
entry->password = NULL;
entry->type = Op_Set;
}
static void
HandleAlterRole(AlterRoleStmt *stmt)
{
InitRoleTableIfNeeded();
DefElem *dpass = NULL;
ListCell *option;
foreach(option, stmt->options)
{
DefElem *defel = lfirst(option);
if (strcmp(defel->defname, "password") == 0)
dpass = defel;
}
/* We only care about updates to the password */
if (!dpass)
return;
bool found = false;
RoleEntry *entry = hash_search(
CurrentDdlTable->role_table,
stmt->role->rolename,
HASH_ENTER,
&found);
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
if (dpass->arg)
entry->password = MemoryContextStrdup(CurTransactionContext, strVal(dpass->arg));
else
entry->password = NULL;
entry->type = Op_Set;
}
static void
HandleRoleRename(RenameStmt *stmt)
{
InitRoleTableIfNeeded();
Assert(stmt->renameType == OBJECT_ROLE);
bool found = false;
RoleEntry *entry = hash_search(
CurrentDdlTable->role_table,
stmt->subname,
HASH_FIND,
&found);
RoleEntry *entry_for_new_name = hash_search(
CurrentDdlTable->role_table,
stmt->newname,
HASH_ENTER,
NULL);
entry_for_new_name->type = Op_Set;
if (found)
{
if (entry->old_name[0] != '\0')
strlcpy(entry_for_new_name->old_name, entry->old_name, NAMEDATALEN);
else
strlcpy(entry_for_new_name->old_name, entry->name, NAMEDATALEN);
entry_for_new_name->password = entry->password;
hash_search(
CurrentDdlTable->role_table,
entry->name,
HASH_REMOVE,
NULL);
}
else
{
strlcpy(entry_for_new_name->old_name, stmt->subname, NAMEDATALEN);
entry_for_new_name->password = NULL;
}
}
static void
HandleDropRole(DropRoleStmt *stmt)
{
InitRoleTableIfNeeded();
ListCell *item;
foreach(item, stmt->roles)
{
RoleSpec *spec = lfirst(item);
bool found = false;
RoleEntry *entry = hash_search(
CurrentDdlTable->role_table,
spec->rolename,
HASH_ENTER,
&found);
entry->type = Op_Delete;
entry->password = NULL;
if (!found)
memset(entry->old_name, 0, sizeof(entry));
}
}
static void
HandleRename(RenameStmt *stmt)
{
if (stmt->renameType == OBJECT_DATABASE)
return HandleDbRename(stmt);
else if (stmt->renameType == OBJECT_ROLE)
return HandleRoleRename(stmt);
}
static void
NeonProcessUtility(
PlannedStmt *pstmt,
const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *qc)
{
Node *parseTree = pstmt->utilityStmt;
switch (nodeTag(parseTree))
{
case T_CreatedbStmt:
HandleCreateDb(castNode(CreatedbStmt, parseTree));
break;
case T_AlterOwnerStmt:
HandleAlterOwner(castNode(AlterOwnerStmt, parseTree));
break;
case T_RenameStmt:
HandleRename(castNode(RenameStmt, parseTree));
break;
case T_DropdbStmt:
HandleDropDb(castNode(DropdbStmt, parseTree));
break;
case T_CreateRoleStmt:
HandleCreateRole(castNode(CreateRoleStmt, parseTree));
break;
case T_AlterRoleStmt:
HandleAlterRole(castNode(AlterRoleStmt, parseTree));
break;
case T_DropRoleStmt:
HandleDropRole(castNode(DropRoleStmt, parseTree));
break;
default:
break;
}
if (PreviousProcessUtilityHook)
{
PreviousProcessUtilityHook(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
else
{
standard_ProcessUtility(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
}
extern void
InitControlPlaneConnector()
{
PreviousProcessUtilityHook = ProcessUtility_hook;
ProcessUtility_hook = NeonProcessUtility;
RegisterXactCallback(NeonXactCallback, NULL);
RegisterSubXactCallback(NeonSubXactCallback, NULL);
DefineCustomStringVariable(
"neon.console_url",
"URL of the Neon Console, which will be forwarded changes to dbs and roles",
NULL,
&ConsoleURL,
NULL,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
DefineCustomBoolVariable(
"neon.forward_ddl",
"Controls whether to forward DDL to the control plane",
NULL,
&ForwardDDL,
true,
PGC_SUSET,
0,
NULL,
NULL,
NULL);
const char *jwt_token = getenv("NEON_CONTROL_PLANE_TOKEN");
if (!jwt_token)
{
elog(LOG, "Missing NEON_CONTROL_PLANE_TOKEN environment variable, forwarding will not be authenticated");
}
if (curl_global_init(CURL_GLOBAL_DEFAULT))
{
elog(ERROR, "Failed to initialize curl");
}
if ((CurlHandle = curl_easy_init()) == NULL)
{
elog(ERROR, "Failed to initialize curl handle");
}
if ((ContentHeader = curl_slist_append(ContentHeader, "Content-Type: application/json")) == NULL)
{
elog(ERROR, "Failed to initialize content header");
}
if (jwt_token)
{
char auth_header[8192];
snprintf(auth_header, sizeof(auth_header), "Authorization: Bearer %s", jwt_token);
if ((ContentHeader = curl_slist_append(ContentHeader, auth_header)) == NULL)
{
elog(ERROR, "Failed to initialize authorization header");
}
}
}

View File

@@ -1,6 +0,0 @@
#ifndef CONTROL_PLANE_CONNECTOR_H
#define CONTROL_PLANE_CONNECTOR_H
void InitControlPlaneConnector();
#endif

View File

@@ -25,7 +25,6 @@
#include "neon.h"
#include "walproposer.h"
#include "pagestore_client.h"
#include "control_plane_connector.h"
PG_MODULE_MAGIC;
void _PG_init(void);
@@ -35,11 +34,7 @@ _PG_init(void)
{
pg_init_libpagestore();
pg_init_walproposer();
InitControlPlaneConnector();
// Important: This must happen after other parts of the extension
// are loaded, otherwise any settings to GUCs that were set before
// the extension was loaded will be removed.
EmitWarningsOnPlaceholders("neon");
}

301
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand.
# This file is automatically @generated by Poetry and should not be changed by hand.
[[package]]
name = "aiohttp"
@@ -79,30 +79,30 @@ sa = ["sqlalchemy[postgresql-psycopg2binary] (>=1.3,<1.5)"]
[[package]]
name = "allure-pytest"
version = "2.13.2"
version = "2.13.1"
description = "Allure pytest integration"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "allure-pytest-2.13.2.tar.gz", hash = "sha256:22243159e8ec81ce2b5254b4013802198821b1b42f118f69d4a289396607c7b3"},
{file = "allure_pytest-2.13.2-py3-none-any.whl", hash = "sha256:17de9dbee7f61c8e66a5b5e818b00e419dbcea44cb55c24319401ba813220690"},
{file = "allure-pytest-2.13.1.tar.gz", hash = "sha256:68d69456eeb65af4061ec06a80bc941163b0616e8216554d36b070a6bf070e08"},
{file = "allure_pytest-2.13.1-py3-none-any.whl", hash = "sha256:a8de2fc3b3effe2d8f98801646920de3f055b779710f4c806dbee7c613c24633"},
]
[package.dependencies]
allure-python-commons = "2.13.2"
allure-python-commons = "2.13.1"
pytest = ">=4.5.0"
[[package]]
name = "allure-python-commons"
version = "2.13.2"
version = "2.13.1"
description = "Common module for integrate allure with python-based frameworks"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
{file = "allure-python-commons-2.13.2.tar.gz", hash = "sha256:8a03681330231b1deadd86b97ff68841c6591320114ae638570f1ed60d7a2033"},
{file = "allure_python_commons-2.13.2-py3-none-any.whl", hash = "sha256:2bb3646ec3fbf5b36d178a5e735002bc130ae9f9ba80f080af97d368ba375051"},
{file = "allure-python-commons-2.13.1.tar.gz", hash = "sha256:3fc13e1da8ebb23f9ab5c9c72ad04595023cdd5078dbb8604939997faebed5cb"},
{file = "allure_python_commons-2.13.1-py3-none-any.whl", hash = "sha256:d08e04867bddf44fef55def3d67f4bc25af58a1bf9fcffcf4ec3331f7f2ef0d0"},
]
[package.dependencies]
@@ -172,6 +172,17 @@ dev = ["Cython (>=0.29.24,<0.30.0)", "Sphinx (>=4.1.2,<4.2.0)", "flake8 (>=5.0.4
docs = ["Sphinx (>=4.1.2,<4.2.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)"]
test = ["flake8 (>=5.0.4,<5.1.0)", "uvloop (>=0.15.3)"]
[[package]]
name = "atomicwrites"
version = "1.4.1"
description = "Atomic file writes."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
files = [
{file = "atomicwrites-1.4.1.tar.gz", hash = "sha256:81b2c9071a49367a7f770170e5eec8cb66567cfbbc8c73d20ce5ca4a8d71cf11"},
]
[[package]]
name = "attrs"
version = "21.4.0"
@@ -228,49 +239,49 @@ wrapt = "*"
[[package]]
name = "backoff"
version = "2.2.1"
version = "1.11.1"
description = "Function decoration for backoff and retry"
category = "main"
optional = false
python-versions = ">=3.7,<4.0"
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
files = [
{file = "backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8"},
{file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"},
{file = "backoff-1.11.1-py2.py3-none-any.whl", hash = "sha256:61928f8fa48d52e4faa81875eecf308eccfb1016b018bb6bd21e05b5d90a96c5"},
{file = "backoff-1.11.1.tar.gz", hash = "sha256:ccb962a2378418c667b3c979b504fdeb7d9e0d29c0579e3b13b86467177728cb"},
]
[[package]]
name = "black"
version = "23.3.0"
version = "23.1.0"
description = "The uncompromising code formatter."
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
{file = "black-23.3.0-cp310-cp310-macosx_10_16_arm64.whl", hash = "sha256:0945e13506be58bf7db93ee5853243eb368ace1c08a24c65ce108986eac65915"},
{file = "black-23.3.0-cp310-cp310-macosx_10_16_universal2.whl", hash = "sha256:67de8d0c209eb5b330cce2469503de11bca4085880d62f1628bd9972cc3366b9"},
{file = "black-23.3.0-cp310-cp310-macosx_10_16_x86_64.whl", hash = "sha256:7c3eb7cea23904399866c55826b31c1f55bbcd3890ce22ff70466b907b6775c2"},
{file = "black-23.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:32daa9783106c28815d05b724238e30718f34155653d4d6e125dc7daec8e260c"},
{file = "black-23.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:35d1381d7a22cc5b2be2f72c7dfdae4072a3336060635718cc7e1ede24221d6c"},
{file = "black-23.3.0-cp311-cp311-macosx_10_16_arm64.whl", hash = "sha256:a8a968125d0a6a404842fa1bf0b349a568634f856aa08ffaff40ae0dfa52e7c6"},
{file = "black-23.3.0-cp311-cp311-macosx_10_16_universal2.whl", hash = "sha256:c7ab5790333c448903c4b721b59c0d80b11fe5e9803d8703e84dcb8da56fec1b"},
{file = "black-23.3.0-cp311-cp311-macosx_10_16_x86_64.whl", hash = "sha256:a6f6886c9869d4daae2d1715ce34a19bbc4b95006d20ed785ca00fa03cba312d"},
{file = "black-23.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6f3c333ea1dd6771b2d3777482429864f8e258899f6ff05826c3a4fcc5ce3f70"},
{file = "black-23.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:11c410f71b876f961d1de77b9699ad19f939094c3a677323f43d7a29855fe326"},
{file = "black-23.3.0-cp37-cp37m-macosx_10_16_x86_64.whl", hash = "sha256:1d06691f1eb8de91cd1b322f21e3bfc9efe0c7ca1f0e1eb1db44ea367dff656b"},
{file = "black-23.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:50cb33cac881766a5cd9913e10ff75b1e8eb71babf4c7104f2e9c52da1fb7de2"},
{file = "black-23.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:e114420bf26b90d4b9daa597351337762b63039752bdf72bf361364c1aa05925"},
{file = "black-23.3.0-cp38-cp38-macosx_10_16_arm64.whl", hash = "sha256:48f9d345675bb7fbc3dd85821b12487e1b9a75242028adad0333ce36ed2a6d27"},
{file = "black-23.3.0-cp38-cp38-macosx_10_16_universal2.whl", hash = "sha256:714290490c18fb0126baa0fca0a54ee795f7502b44177e1ce7624ba1c00f2331"},
{file = "black-23.3.0-cp38-cp38-macosx_10_16_x86_64.whl", hash = "sha256:064101748afa12ad2291c2b91c960be28b817c0c7eaa35bec09cc63aa56493c5"},
{file = "black-23.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:562bd3a70495facf56814293149e51aa1be9931567474993c7942ff7d3533961"},
{file = "black-23.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:e198cf27888ad6f4ff331ca1c48ffc038848ea9f031a3b40ba36aced7e22f2c8"},
{file = "black-23.3.0-cp39-cp39-macosx_10_16_arm64.whl", hash = "sha256:3238f2aacf827d18d26db07524e44741233ae09a584273aa059066d644ca7b30"},
{file = "black-23.3.0-cp39-cp39-macosx_10_16_universal2.whl", hash = "sha256:f0bd2f4a58d6666500542b26354978218a9babcdc972722f4bf90779524515f3"},
{file = "black-23.3.0-cp39-cp39-macosx_10_16_x86_64.whl", hash = "sha256:92c543f6854c28a3c7f39f4d9b7694f9a6eb9d3c5e2ece488c327b6e7ea9b266"},
{file = "black-23.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3a150542a204124ed00683f0db1f5cf1c2aaaa9cc3495b7a3b5976fb136090ab"},
{file = "black-23.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:6b39abdfb402002b8a7d030ccc85cf5afff64ee90fa4c5aebc531e3ad0175ddb"},
{file = "black-23.3.0-py3-none-any.whl", hash = "sha256:ec751418022185b0c1bb7d7736e6933d40bbb14c14a0abcf9123d1b159f98dd4"},
{file = "black-23.3.0.tar.gz", hash = "sha256:1c7b8d606e728a41ea1ccbd7264677e494e87cf630e399262ced92d4a8dac940"},
{file = "black-23.1.0-cp310-cp310-macosx_10_16_arm64.whl", hash = "sha256:b6a92a41ee34b883b359998f0c8e6eb8e99803aa8bf3123bf2b2e6fec505a221"},
{file = "black-23.1.0-cp310-cp310-macosx_10_16_universal2.whl", hash = "sha256:57c18c5165c1dbe291d5306e53fb3988122890e57bd9b3dcb75f967f13411a26"},
{file = "black-23.1.0-cp310-cp310-macosx_10_16_x86_64.whl", hash = "sha256:9880d7d419bb7e709b37e28deb5e68a49227713b623c72b2b931028ea65f619b"},
{file = "black-23.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e6663f91b6feca5d06f2ccd49a10f254f9298cc1f7f49c46e498a0771b507104"},
{file = "black-23.1.0-cp310-cp310-win_amd64.whl", hash = "sha256:9afd3f493666a0cd8f8df9a0200c6359ac53940cbde049dcb1a7eb6ee2dd7074"},
{file = "black-23.1.0-cp311-cp311-macosx_10_16_arm64.whl", hash = "sha256:bfffba28dc52a58f04492181392ee380e95262af14ee01d4bc7bb1b1c6ca8d27"},
{file = "black-23.1.0-cp311-cp311-macosx_10_16_universal2.whl", hash = "sha256:c1c476bc7b7d021321e7d93dc2cbd78ce103b84d5a4cf97ed535fbc0d6660648"},
{file = "black-23.1.0-cp311-cp311-macosx_10_16_x86_64.whl", hash = "sha256:382998821f58e5c8238d3166c492139573325287820963d2f7de4d518bd76958"},
{file = "black-23.1.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2bf649fda611c8550ca9d7592b69f0637218c2369b7744694c5e4902873b2f3a"},
{file = "black-23.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:121ca7f10b4a01fd99951234abdbd97728e1240be89fde18480ffac16503d481"},
{file = "black-23.1.0-cp37-cp37m-macosx_10_16_x86_64.whl", hash = "sha256:a8471939da5e824b891b25751955be52ee7f8a30a916d570a5ba8e0f2eb2ecad"},
{file = "black-23.1.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8178318cb74f98bc571eef19068f6ab5613b3e59d4f47771582f04e175570ed8"},
{file = "black-23.1.0-cp37-cp37m-win_amd64.whl", hash = "sha256:a436e7881d33acaf2536c46a454bb964a50eff59b21b51c6ccf5a40601fbef24"},
{file = "black-23.1.0-cp38-cp38-macosx_10_16_arm64.whl", hash = "sha256:a59db0a2094d2259c554676403fa2fac3473ccf1354c1c63eccf7ae65aac8ab6"},
{file = "black-23.1.0-cp38-cp38-macosx_10_16_universal2.whl", hash = "sha256:0052dba51dec07ed029ed61b18183942043e00008ec65d5028814afaab9a22fd"},
{file = "black-23.1.0-cp38-cp38-macosx_10_16_x86_64.whl", hash = "sha256:49f7b39e30f326a34b5c9a4213213a6b221d7ae9d58ec70df1c4a307cf2a1580"},
{file = "black-23.1.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:162e37d49e93bd6eb6f1afc3e17a3d23a823042530c37c3c42eeeaf026f38468"},
{file = "black-23.1.0-cp38-cp38-win_amd64.whl", hash = "sha256:8b70eb40a78dfac24842458476135f9b99ab952dd3f2dab738c1881a9b38b753"},
{file = "black-23.1.0-cp39-cp39-macosx_10_16_arm64.whl", hash = "sha256:a29650759a6a0944e7cca036674655c2f0f63806ddecc45ed40b7b8aa314b651"},
{file = "black-23.1.0-cp39-cp39-macosx_10_16_universal2.whl", hash = "sha256:bb460c8561c8c1bec7824ecbc3ce085eb50005883a6203dcfb0122e95797ee06"},
{file = "black-23.1.0-cp39-cp39-macosx_10_16_x86_64.whl", hash = "sha256:c91dfc2c2a4e50df0026f88d2215e166616e0c80e86004d0003ece0488db2739"},
{file = "black-23.1.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2a951cc83ab535d248c89f300eccbd625e80ab880fbcfb5ac8afb5f01a258ac9"},
{file = "black-23.1.0-cp39-cp39-win_amd64.whl", hash = "sha256:0680d4380db3719ebcfb2613f34e86c8e6d15ffeabcf8ec59355c5e7b85bb555"},
{file = "black-23.1.0-py3-none-any.whl", hash = "sha256:7a0f701d314cfa0896b9001df70a530eb2472babb76086344e688829efd97d32"},
{file = "black-23.1.0.tar.gz", hash = "sha256:b0bd97bea8903f5a2ba7219257a44e3f1f9d00073d6cc1add68f0beec69692ac"},
]
[package.dependencies]
@@ -940,21 +951,6 @@ six = ">=1.9.0"
gmpy = ["gmpy"]
gmpy2 = ["gmpy2"]
[[package]]
name = "exceptiongroup"
version = "1.1.1"
description = "Backport of PEP 654 (exception groups)"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "exceptiongroup-1.1.1-py3-none-any.whl", hash = "sha256:232c37c63e4f682982c8b6459f33a8981039e5fb8756b2074364e5055c498c9e"},
{file = "exceptiongroup-1.1.1.tar.gz", hash = "sha256:d484c3090ba2889ae2928419117447a14daf3c1231d5e30d0aae34f354f01785"},
]
[package.extras]
test = ["pytest (>=6)"]
[[package]]
name = "execnet"
version = "1.9.0"
@@ -1414,38 +1410,38 @@ files = [
[[package]]
name = "mypy"
version = "1.3.0"
version = "1.1.1"
description = "Optional static typing for Python"
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
{file = "mypy-1.3.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c1eb485cea53f4f5284e5baf92902cd0088b24984f4209e25981cc359d64448d"},
{file = "mypy-1.3.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:4c99c3ecf223cf2952638da9cd82793d8f3c0c5fa8b6ae2b2d9ed1e1ff51ba85"},
{file = "mypy-1.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:550a8b3a19bb6589679a7c3c31f64312e7ff482a816c96e0cecec9ad3a7564dd"},
{file = "mypy-1.3.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:cbc07246253b9e3d7d74c9ff948cd0fd7a71afcc2b77c7f0a59c26e9395cb152"},
{file = "mypy-1.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:a22435632710a4fcf8acf86cbd0d69f68ac389a3892cb23fbad176d1cddaf228"},
{file = "mypy-1.3.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6e33bb8b2613614a33dff70565f4c803f889ebd2f859466e42b46e1df76018dd"},
{file = "mypy-1.3.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:7d23370d2a6b7a71dc65d1266f9a34e4cde9e8e21511322415db4b26f46f6b8c"},
{file = "mypy-1.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:658fe7b674769a0770d4b26cb4d6f005e88a442fe82446f020be8e5f5efb2fae"},
{file = "mypy-1.3.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:6e42d29e324cdda61daaec2336c42512e59c7c375340bd202efa1fe0f7b8f8ca"},
{file = "mypy-1.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:d0b6c62206e04061e27009481cb0ec966f7d6172b5b936f3ead3d74f29fe3dcf"},
{file = "mypy-1.3.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:76ec771e2342f1b558c36d49900dfe81d140361dd0d2df6cd71b3db1be155409"},
{file = "mypy-1.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ebc95f8386314272bbc817026f8ce8f4f0d2ef7ae44f947c4664efac9adec929"},
{file = "mypy-1.3.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:faff86aa10c1aa4a10e1a301de160f3d8fc8703b88c7e98de46b531ff1276a9a"},
{file = "mypy-1.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:8c5979d0deb27e0f4479bee18ea0f83732a893e81b78e62e2dda3e7e518c92ee"},
{file = "mypy-1.3.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:c5d2cc54175bab47011b09688b418db71403aefad07cbcd62d44010543fc143f"},
{file = "mypy-1.3.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:87df44954c31d86df96c8bd6e80dfcd773473e877ac6176a8e29898bfb3501cb"},
{file = "mypy-1.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:473117e310febe632ddf10e745a355714e771ffe534f06db40702775056614c4"},
{file = "mypy-1.3.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:74bc9b6e0e79808bf8678d7678b2ae3736ea72d56eede3820bd3849823e7f305"},
{file = "mypy-1.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:44797d031a41516fcf5cbfa652265bb994e53e51994c1bd649ffcd0c3a7eccbf"},
{file = "mypy-1.3.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:ddae0f39ca146972ff6bb4399f3b2943884a774b8771ea0a8f50e971f5ea5ba8"},
{file = "mypy-1.3.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:1c4c42c60a8103ead4c1c060ac3cdd3ff01e18fddce6f1016e08939647a0e703"},
{file = "mypy-1.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e86c2c6852f62f8f2b24cb7a613ebe8e0c7dc1402c61d36a609174f63e0ff017"},
{file = "mypy-1.3.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:f9dca1e257d4cc129517779226753dbefb4f2266c4eaad610fc15c6a7e14283e"},
{file = "mypy-1.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:95d8d31a7713510685b05fbb18d6ac287a56c8f6554d88c19e73f724a445448a"},
{file = "mypy-1.3.0-py3-none-any.whl", hash = "sha256:a8763e72d5d9574d45ce5881962bc8e9046bf7b375b0abf031f3e6811732a897"},
{file = "mypy-1.3.0.tar.gz", hash = "sha256:e1f4d16e296f5135624b34e8fb741eb0eadedca90862405b1f1fde2040b9bd11"},
{file = "mypy-1.1.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:39c7119335be05630611ee798cc982623b9e8f0cff04a0b48dfc26100e0b97af"},
{file = "mypy-1.1.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:61bf08362e93b6b12fad3eab68c4ea903a077b87c90ac06c11e3d7a09b56b9c1"},
{file = "mypy-1.1.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dbb19c9f662e41e474e0cff502b7064a7edc6764f5262b6cd91d698163196799"},
{file = "mypy-1.1.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:315ac73cc1cce4771c27d426b7ea558fb4e2836f89cb0296cbe056894e3a1f78"},
{file = "mypy-1.1.1-cp310-cp310-win_amd64.whl", hash = "sha256:5cb14ff9919b7df3538590fc4d4c49a0f84392237cbf5f7a816b4161c061829e"},
{file = "mypy-1.1.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:26cdd6a22b9b40b2fd71881a8a4f34b4d7914c679f154f43385ca878a8297389"},
{file = "mypy-1.1.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:5b5f81b40d94c785f288948c16e1f2da37203c6006546c5d947aab6f90aefef2"},
{file = "mypy-1.1.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:21b437be1c02712a605591e1ed1d858aba681757a1e55fe678a15c2244cd68a5"},
{file = "mypy-1.1.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:d809f88734f44a0d44959d795b1e6f64b2bbe0ea4d9cc4776aa588bb4229fc1c"},
{file = "mypy-1.1.1-cp311-cp311-win_amd64.whl", hash = "sha256:a380c041db500e1410bb5b16b3c1c35e61e773a5c3517926b81dfdab7582be54"},
{file = "mypy-1.1.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:b7c7b708fe9a871a96626d61912e3f4ddd365bf7f39128362bc50cbd74a634d5"},
{file = "mypy-1.1.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c1c10fa12df1232c936830839e2e935d090fc9ee315744ac33b8a32216b93707"},
{file = "mypy-1.1.1-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:0a28a76785bf57655a8ea5eb0540a15b0e781c807b5aa798bd463779988fa1d5"},
{file = "mypy-1.1.1-cp37-cp37m-win_amd64.whl", hash = "sha256:ef6a01e563ec6a4940784c574d33f6ac1943864634517984471642908b30b6f7"},
{file = "mypy-1.1.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:d64c28e03ce40d5303450f547e07418c64c241669ab20610f273c9e6290b4b0b"},
{file = "mypy-1.1.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:64cc3afb3e9e71a79d06e3ed24bb508a6d66f782aff7e56f628bf35ba2e0ba51"},
{file = "mypy-1.1.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ce61663faf7a8e5ec6f456857bfbcec2901fbdb3ad958b778403f63b9e606a1b"},
{file = "mypy-1.1.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:2b0c373d071593deefbcdd87ec8db91ea13bd8f1328d44947e88beae21e8d5e9"},
{file = "mypy-1.1.1-cp38-cp38-win_amd64.whl", hash = "sha256:2888ce4fe5aae5a673386fa232473014056967f3904f5abfcf6367b5af1f612a"},
{file = "mypy-1.1.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:19ba15f9627a5723e522d007fe708007bae52b93faab00f95d72f03e1afa9598"},
{file = "mypy-1.1.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:59bbd71e5c58eed2e992ce6523180e03c221dcd92b52f0e792f291d67b15a71c"},
{file = "mypy-1.1.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9401e33814cec6aec8c03a9548e9385e0e228fc1b8b0a37b9ea21038e64cdd8a"},
{file = "mypy-1.1.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:4b398d8b1f4fba0e3c6463e02f8ad3346f71956b92287af22c9b12c3ec965a9f"},
{file = "mypy-1.1.1-cp39-cp39-win_amd64.whl", hash = "sha256:69b35d1dcb5707382810765ed34da9db47e7f95b3528334a3c999b0c90fe523f"},
{file = "mypy-1.1.1-py3-none-any.whl", hash = "sha256:4e4e8b362cdf99ba00c2b218036002bdcdf1e0de085cdb296a49df03fb31dfc4"},
{file = "mypy-1.1.1.tar.gz", hash = "sha256:ae9ceae0f5b9059f33dbc62dea087e942c0ccab4b7a003719cb70f9b8abfa32f"},
]
[package.dependencies]
@@ -1725,6 +1721,18 @@ files = [
{file = "psycopg2_binary-2.9.3-cp39-cp39-win_amd64.whl", hash = "sha256:accfe7e982411da3178ec690baaceaad3c278652998b2c45828aaac66cd8285f"},
]
[[package]]
name = "py"
version = "1.11.0"
description = "library with cross-python path, ini-parsing, io, code, log facilities"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
files = [
{file = "py-1.11.0-py2.py3-none-any.whl", hash = "sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378"},
{file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"},
]
[[package]]
name = "pyasn1"
version = "0.4.8"
@@ -1833,56 +1841,57 @@ files = [
[[package]]
name = "pytest"
version = "7.3.1"
version = "6.2.5"
description = "pytest: simple powerful testing with Python"
category = "main"
optional = false
python-versions = ">=3.7"
python-versions = ">=3.6"
files = [
{file = "pytest-7.3.1-py3-none-any.whl", hash = "sha256:3799fa815351fea3a5e96ac7e503a96fa51cc9942c3753cda7651b93c1cfa362"},
{file = "pytest-7.3.1.tar.gz", hash = "sha256:434afafd78b1d78ed0addf160ad2b77a30d35d4bdf8af234fe621919d9ed15e3"},
{file = "pytest-6.2.5-py3-none-any.whl", hash = "sha256:7310f8d27bc79ced999e760ca304d69f6ba6c6649c0b60fb0e04a4a77cacc134"},
{file = "pytest-6.2.5.tar.gz", hash = "sha256:131b36680866a76e6781d13f101efb86cf674ebb9762eb70d3082b6f29889e89"},
]
[package.dependencies]
atomicwrites = {version = ">=1.0", markers = "sys_platform == \"win32\""}
attrs = ">=19.2.0"
colorama = {version = "*", markers = "sys_platform == \"win32\""}
exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""}
iniconfig = "*"
packaging = "*"
pluggy = ">=0.12,<2.0"
tomli = {version = ">=1.0.0", markers = "python_version < \"3.11\""}
py = ">=1.8.2"
toml = "*"
[package.extras]
testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "xmlschema"]
testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "requests", "xmlschema"]
[[package]]
name = "pytest-asyncio"
version = "0.21.0"
version = "0.19.0"
description = "Pytest support for asyncio"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "pytest-asyncio-0.21.0.tar.gz", hash = "sha256:2b38a496aef56f56b0e87557ec313e11e1ab9276fc3863f6a7be0f1d0e415e1b"},
{file = "pytest_asyncio-0.21.0-py3-none-any.whl", hash = "sha256:f2b3366b7cd501a4056858bd39349d5af19742aed2d81660b7998b6341c7eb9c"},
{file = "pytest-asyncio-0.19.0.tar.gz", hash = "sha256:ac4ebf3b6207259750bc32f4c1d8fcd7e79739edbc67ad0c58dd150b1d072fed"},
{file = "pytest_asyncio-0.19.0-py3-none-any.whl", hash = "sha256:7a97e37cfe1ed296e2e84941384bdd37c376453912d397ed39293e0916f521fa"},
]
[package.dependencies]
pytest = ">=7.0.0"
pytest = ">=6.1.0"
[package.extras]
docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"]
testing = ["coverage (>=6.2)", "flaky (>=3.5.0)", "hypothesis (>=5.7.1)", "mypy (>=0.931)", "pytest-trio (>=0.7.0)"]
[[package]]
name = "pytest-httpserver"
version = "1.0.8"
version = "1.0.6"
description = "pytest-httpserver is a httpserver for pytest"
category = "main"
optional = false
python-versions = ">=3.8,<4.0"
python-versions = ">=3.7,<4.0"
files = [
{file = "pytest_httpserver-1.0.8-py3-none-any.whl", hash = "sha256:24cd3d9f6a0b927c7bfc400d0b3fda7442721b8267ce29942bf307b190f0bb09"},
{file = "pytest_httpserver-1.0.8.tar.gz", hash = "sha256:e052f69bc8a9073db02484681e8e47004dd1fb3763b0ae833bd899e5895c559a"},
{file = "pytest_httpserver-1.0.6-py3-none-any.whl", hash = "sha256:ac2379acc91fe8bdbe2911c93af8dd130e33b5899fb9934d15669480739c6d32"},
{file = "pytest_httpserver-1.0.6.tar.gz", hash = "sha256:9040d07bf59ac45d8de3db1d4468fd2d1d607975e4da4c872ecc0402cdbf7b3e"},
]
[package.dependencies]
@@ -1905,14 +1914,14 @@ pytest = ">=3.2.5"
[[package]]
name = "pytest-order"
version = "1.1.0"
version = "1.0.1"
description = "pytest plugin to run your tests in a specific order"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
{file = "pytest-order-1.1.0.tar.gz", hash = "sha256:139d25b30826b78eebb42722f747eab14c44b88059d7a71d4f79d14a057269a5"},
{file = "pytest_order-1.1.0-py3-none-any.whl", hash = "sha256:3b3730969c97900fa5cd31ecff80847680ed56b2490954565c14949ba60d9371"},
{file = "pytest-order-1.0.1.tar.gz", hash = "sha256:5dd6b929fbd7eaa6d0ee07586f65c623babb0afe72b4843c5f15055d6b3b1b1f"},
{file = "pytest_order-1.0.1-py3-none-any.whl", hash = "sha256:bbe6e63a8e23741ab3e810d458d1ea7317e797b70f9550512d77d6e9e8fd1bbb"},
]
[package.dependencies]
@@ -1954,14 +1963,14 @@ pytest = ">=5.0.0"
[[package]]
name = "pytest-xdist"
version = "3.3.1"
description = "pytest xdist plugin for distributed testing, most importantly across multiple CPUs"
version = "3.0.2"
description = "pytest xdist plugin for distributed testing and loop-on-failing modes"
category = "main"
optional = false
python-versions = ">=3.7"
python-versions = ">=3.6"
files = [
{file = "pytest-xdist-3.3.1.tar.gz", hash = "sha256:d5ee0520eb1b7bcca50a60a518ab7a7707992812c578198f8b44fdfac78e8c93"},
{file = "pytest_xdist-3.3.1-py3-none-any.whl", hash = "sha256:ff9daa7793569e6a68544850fd3927cd257cc03a7ef76c95e86915355e82b5f2"},
{file = "pytest-xdist-3.0.2.tar.gz", hash = "sha256:688da9b814370e891ba5de650c9327d1a9d861721a524eb917e620eec3e90291"},
{file = "pytest_xdist-3.0.2-py3-none-any.whl", hash = "sha256:9feb9a18e1790696ea23e1434fa73b325ed4998b0e9fcb221f16fd1945e6df1b"},
]
[package.dependencies]
@@ -2139,29 +2148,29 @@ pyasn1 = ">=0.1.3"
[[package]]
name = "ruff"
version = "0.0.269"
version = "0.0.255"
description = "An extremely fast Python linter, written in Rust."
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
{file = "ruff-0.0.269-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:3569bcdee679045c09c0161fabc057599759c49219a08d9a4aad2cc3982ccba3"},
{file = "ruff-0.0.269-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:56347da63757a56cbce7d4b3d6044ca4f1941cd1bbff3714f7554360c3361f83"},
{file = "ruff-0.0.269-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6da8ee25ef2f0cc6cc8e6e20942c1d44d25a36dce35070d7184655bc14f63f63"},
{file = "ruff-0.0.269-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:bd81b8e681b9eaa6cf15484f3985bd8bd97c3d114e95bff3e8ea283bf8865062"},
{file = "ruff-0.0.269-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1f19f59ca3c28742955241fb452f3346241ddbd34e72ac5cb3d84fadebcf6bc8"},
{file = "ruff-0.0.269-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:f062059b8289a4fab7f6064601b811d447c2f9d3d432a17f689efe4d68988450"},
{file = "ruff-0.0.269-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:3f5dc7aac52c58e82510217e3c7efd80765c134c097c2815d59e40face0d1fe6"},
{file = "ruff-0.0.269-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:e131b4dbe798c391090c6407641d6ab12c0fa1bb952379dde45e5000e208dabb"},
{file = "ruff-0.0.269-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a374434e588e06550df0f8dcb74777290f285678de991fda4e1063c367ab2eb2"},
{file = "ruff-0.0.269-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:cec2f4b84a14b87f1b121488649eb5b4eaa06467a2387373f750da74bdcb5679"},
{file = "ruff-0.0.269-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:374b161753a247904aec7a32d45e165302b76b6e83d22d099bf3ff7c232c888f"},
{file = "ruff-0.0.269-py3-none-musllinux_1_2_i686.whl", hash = "sha256:9ca0a1ddb1d835b5f742db9711c6cf59f213a1ad0088cb1e924a005fd399e7d8"},
{file = "ruff-0.0.269-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:5a20658f0b97d207c7841c13d528f36d666bf445b00b01139f28a8ccb80093bb"},
{file = "ruff-0.0.269-py3-none-win32.whl", hash = "sha256:03ff42bc91ceca58e0f0f072cb3f9286a9208f609812753474e799a997cdad1a"},
{file = "ruff-0.0.269-py3-none-win_amd64.whl", hash = "sha256:f3b59ccff57b21ef0967ea8021fd187ec14c528ec65507d8bcbe035912050776"},
{file = "ruff-0.0.269-py3-none-win_arm64.whl", hash = "sha256:bbeb857b1e508a4487bdb02ca1e6d41dd8d5ac5335a5246e25de8a3dff38c1ff"},
{file = "ruff-0.0.269.tar.gz", hash = "sha256:11ddcfbab32cf5c420ea9dd5531170ace5a3e59c16d9251c7bd2581f7b16f602"},
{file = "ruff-0.0.255-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:b2d71fb6a7e50501a2473864acffc85dee6b750c25db198f7e71fe1dbbff1aad"},
{file = "ruff-0.0.255-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:6c97d746861a6010f941179e84bba9feb8a871815667471d9ed6beb98d45c252"},
{file = "ruff-0.0.255-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9a7fa60085079b91a298b963361be9b1b1c724582af6c84be954cbabdbd9309a"},
{file = "ruff-0.0.255-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:c089f7141496334ab5a127b54ce55e41f0d6714e68a4453a1e09d2204cdea8c3"},
{file = "ruff-0.0.255-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0423908caa7d437a416b853214565b9c33bbd1106c4f88147982216dddcbbd96"},
{file = "ruff-0.0.255-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:981493e92547cacbb8e0874904ec049fe744507ee890dc8736caf89a8864f9a7"},
{file = "ruff-0.0.255-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:59d5193d2aedb35db180824462b374dbcfc306b2e76076245088afa6e5837df2"},
{file = "ruff-0.0.255-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dd5e00733c9d160c8a34a22e62b390da9d1e9f326676402421cb8c1236beefc3"},
{file = "ruff-0.0.255-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:694418cf41838bd19c6229e4e1b2d04505b1e6b86fe3ab81165484fc96d36f01"},
{file = "ruff-0.0.255-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:5d0408985c9777369daebb5d3340a99e9f7294bdd7120642239261508185cf89"},
{file = "ruff-0.0.255-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:abd6376ef9d12f370d95a8c7c98682fbb9bfedfba59f40e84a816fef8ddcb8de"},
{file = "ruff-0.0.255-py3-none-musllinux_1_2_i686.whl", hash = "sha256:f9b1a5df0bc09193cbef58a6f78e4a9a0b058a4f9733c0442866d078006d1bb9"},
{file = "ruff-0.0.255-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:6a25c5f4ff087445b2e1bbcb9963f2ae7c868d65e4a8d5f84c36c12f71571179"},
{file = "ruff-0.0.255-py3-none-win32.whl", hash = "sha256:1ff87a8310354f9f1a099625e54a27fdd6756d9cd2a40b45922f2e943daf982d"},
{file = "ruff-0.0.255-py3-none-win_amd64.whl", hash = "sha256:f3d8416be618f023f93ec4fd6ee3048585ef85dba9563b2a7e38fc7e5131d5b1"},
{file = "ruff-0.0.255-py3-none-win_arm64.whl", hash = "sha256:8ba124819624145d7b6b53add40c367c44318893215ffc1bfe3d72e0225a1c9c"},
{file = "ruff-0.0.255.tar.gz", hash = "sha256:f9eb1d3b2eecbeedae419fa494c4e2a5e4484baf93a1ce0f81eddb005e1919c5"},
]
[[package]]
@@ -2262,7 +2271,7 @@ files = [
name = "tomli"
version = "2.0.1"
description = "A lil' TOML parser"
category = "main"
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
@@ -2272,54 +2281,42 @@ files = [
[[package]]
name = "types-psutil"
version = "5.9.5.12"
version = "5.9.5.4"
description = "Typing stubs for psutil"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "types-psutil-5.9.5.12.tar.gz", hash = "sha256:61a91679d3fe737250013b624dca09375e7cc3ad77dcc734553746c429c02aca"},
{file = "types_psutil-5.9.5.12-py3-none-any.whl", hash = "sha256:e9a147b8561235c6afcce5aa1adb973fad9ab2c50cf89820697687f53510358f"},
{file = "types-psutil-5.9.5.4.tar.gz", hash = "sha256:aa09102b80c65a3b4573216614372398dab78972d650488eaff1ff05482cc18f"},
{file = "types_psutil-5.9.5.4-py3-none-any.whl", hash = "sha256:28e59764630187e462d43788efa16d59d5e77b510115f9e25901b2d4007fca62"},
]
[[package]]
name = "types-psycopg2"
version = "2.9.21.10"
version = "2.9.18"
description = "Typing stubs for psycopg2"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "types-psycopg2-2.9.21.10.tar.gz", hash = "sha256:c2600892312ae1c34e12f145749795d93dc4eac3ef7dbf8a9c1bfd45385e80d7"},
{file = "types_psycopg2-2.9.21.10-py3-none-any.whl", hash = "sha256:918224a0731a3650832e46633e720703b5beef7693a064e777d9748654fcf5e5"},
]
[[package]]
name = "types-pytest-lazy-fixture"
version = "0.6.3.3"
description = "Typing stubs for pytest-lazy-fixture"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "types-pytest-lazy-fixture-0.6.3.3.tar.gz", hash = "sha256:2ef79d66bcde0e50acdac8dc55074b9ae0d4cfaeabdd638f5522f4cac7c8a2c7"},
{file = "types_pytest_lazy_fixture-0.6.3.3-py3-none-any.whl", hash = "sha256:a56a55649147ff960ff79d4b2c781a4f769351abc1876873f3116d0bd0c96353"},
{file = "types-psycopg2-2.9.18.tar.gz", hash = "sha256:9b0e9e1f097b15cd9fa8aad2596a9e3082fd72f8d9cfe52b190cfa709105b6c0"},
{file = "types_psycopg2-2.9.18-py3-none-any.whl", hash = "sha256:14c779dcab18c31453fa1cad3cf4b1601d33540a344adead3c47a6b8091cd2fa"},
]
[[package]]
name = "types-requests"
version = "2.31.0.0"
version = "2.28.5"
description = "Typing stubs for requests"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "types-requests-2.31.0.0.tar.gz", hash = "sha256:c1c29d20ab8d84dff468d7febfe8e0cb0b4664543221b386605e14672b44ea25"},
{file = "types_requests-2.31.0.0-py3-none-any.whl", hash = "sha256:7c5cea7940f8e92ec560bbc468f65bf684aa3dcf0554a6f8c4710f5f708dc598"},
{file = "types-requests-2.28.5.tar.gz", hash = "sha256:ac618bfefcb3742eaf97c961e13e9e5a226e545eda4a3dbe293b898d40933ad1"},
{file = "types_requests-2.28.5-py3-none-any.whl", hash = "sha256:98ab647ae88b5e2c41d6d20cfcb5117da1bea561110000b6fdeeea07b3e89877"},
]
[package.dependencies]
types-urllib3 = "*"
types-urllib3 = "<1.27"
[[package]]
name = "types-s3transfer"
@@ -2335,14 +2332,14 @@ files = [
[[package]]
name = "types-toml"
version = "0.10.8.6"
version = "0.10.8"
description = "Typing stubs for toml"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "types-toml-0.10.8.6.tar.gz", hash = "sha256:6d3ac79e36c9ee593c5d4fb33a50cca0e3adceb6ef5cff8b8e5aef67b4c4aaf2"},
{file = "types_toml-0.10.8.6-py3-none-any.whl", hash = "sha256:de7b2bb1831d6f7a4b554671ffe5875e729753496961b3e9b202745e4955dafa"},
{file = "types-toml-0.10.8.tar.gz", hash = "sha256:b7e7ea572308b1030dc86c3ba825c5210814c2825612ec679eb7814f8dd9295a"},
{file = "types_toml-0.10.8-py3-none-any.whl", hash = "sha256:8300fd093e5829eb9c1fba69cee38130347d4b74ddf32d0a7df650ae55c2b599"},
]
[[package]]
@@ -2359,14 +2356,14 @@ files = [
[[package]]
name = "typing-extensions"
version = "4.6.1"
version = "4.3.0"
description = "Backported and Experimental Type Hints for Python 3.7+"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
{file = "typing_extensions-4.6.1-py3-none-any.whl", hash = "sha256:6bac751f4789b135c43228e72de18637e9a6c29d12777023a703fd1a6858469f"},
{file = "typing_extensions-4.6.1.tar.gz", hash = "sha256:558bc0c4145f01e6405f4a5fdbd82050bd221b119f4bf72a961a1cfd471349d6"},
{file = "typing_extensions-4.3.0-py3-none-any.whl", hash = "sha256:25642c956049920a5aa49edcdd6ab1e06d7e5d467fc00e0506c44ac86fbfca02"},
{file = "typing_extensions-4.3.0.tar.gz", hash = "sha256:e6d2677a32f47fc7eb2795db1dd15c1f34eff616bcaf2cfb5e997f854fa1c4a6"},
]
[[package]]
@@ -2614,4 +2611,4 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "c6c217033f50430c31b0979b74db222e6bab2301abd8b9f0cce5a9d5bccc578f"
content-hash = "a0bd73376a3e9479f2379265ccec8dd6ac9df2e525909d12b77d918d590fba55"

View File

@@ -35,7 +35,9 @@ postgres_backend.workspace = true
pq_proto.workspace = true
prometheus.workspace = true
rand.workspace = true
ref-cast.workspace = true
regex.workspace = true
redis.workspace = true
reqwest = { workspace = true, features = ["json"] }
reqwest-middleware.workspace = true
reqwest-tracing.workspace = true
@@ -50,9 +52,9 @@ socket2.workspace = true
sync_wrapper.workspace = true
thiserror.workspace = true
tls-listener.workspace = true
tokio = { workspace = true, features = ["signal"] }
tokio-postgres.workspace = true
tokio-rustls.workspace = true
tokio = { workspace = true, features = ["signal"] }
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true

View File

@@ -6,6 +6,7 @@ pub use link::LinkAuthError;
use crate::{
auth::{self, ClientCredentials},
compute::ComputeNode,
console::{
self,
provider::{CachedNodeInfo, ConsoleReqExtra},
@@ -114,7 +115,7 @@ async fn auth_quirks(
creds: &mut ClientCredentials<'_>,
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
allow_cleartext: bool,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
) -> auth::Result<AuthSuccess<ComputeNode>> {
// If there's no project so far, that entails that client doesn't
// support SNI or other means of passing the endpoint (project) name.
// We now expect to see a very specific payload in the place of password.
@@ -156,7 +157,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
extra: &ConsoleReqExtra<'_>,
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
allow_cleartext: bool,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
) -> auth::Result<AuthSuccess<ComputeNode>> {
use BackendType::*;
let res = match self {
@@ -184,9 +185,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
Link(url) => {
info!("performing link authentication");
link::authenticate(url, client)
.await?
.map(CachedNodeInfo::new_uncached)
link::authenticate(url, client).await?
}
};

View File

@@ -1,8 +1,8 @@
use super::AuthSuccess;
use crate::{
auth::{self, AuthFlow, ClientCredentials},
compute,
console::{self, AuthInfo, CachedNodeInfo, ConsoleReqExtra},
compute::{self, ComputeNode, Password},
console::{self, AuthInfo, CachedAuthInfo, ConsoleReqExtra},
sasl, scram,
stream::PqStream,
};
@@ -14,25 +14,26 @@ pub(super) async fn authenticate(
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
) -> auth::Result<AuthSuccess<ComputeNode>> {
info!("fetching user's authentication info");
let info = api.get_auth_info(extra, creds).await?.unwrap_or_else(|| {
// If we don't have an authentication secret, we mock one to
// prevent malicious probing (possible due to missing protocol steps).
// This mocked secret will never lead to successful authentication.
info!("authentication info not found, mocking it");
AuthInfo::Scram(scram::ServerSecret::mock(creds.user, rand::random()))
let info = scram::ServerSecret::mock(creds.user, rand::random());
CachedAuthInfo::new_uncached(AuthInfo::Scram(info))
});
let flow = AuthFlow::new(client);
let scram_keys = match info {
let keys = match &*info {
AuthInfo::Md5(_) => {
info!("auth endpoint chooses MD5");
return Err(auth::AuthError::bad_auth_method("MD5"));
}
AuthInfo::Scram(secret) => {
info!("auth endpoint chooses SCRAM");
let scram = auth::Scram(&secret);
let scram = auth::Scram(secret);
let client_key = match flow.begin(scram).await?.authenticate().await? {
sasl::Outcome::Success(key) => key,
sasl::Outcome::Failure(reason) => {
@@ -41,21 +42,20 @@ pub(super) async fn authenticate(
}
};
Some(compute::ScramKeys {
compute::ScramKeys {
client_key: client_key.as_bytes(),
server_key: secret.server_key.as_bytes(),
})
}
}
};
let mut node = api.wake_compute(extra, creds).await?;
if let Some(keys) = scram_keys {
use tokio_postgres::config::AuthKeys;
node.config.auth_keys(AuthKeys::ScramSha256(keys));
}
let info = api.wake_compute(extra, creds).await?;
Ok(AuthSuccess {
reported_auth_ok: false,
value: node,
value: ComputeNode::Static {
password: Password::ScramKeys(keys),
info,
},
})
}

View File

@@ -1,10 +1,8 @@
use super::AuthSuccess;
use crate::{
auth::{self, AuthFlow, ClientCredentials},
console::{
self,
provider::{CachedNodeInfo, ConsoleReqExtra},
},
compute::{ComputeNode, Password},
console::{self, provider::ConsoleReqExtra},
stream,
};
use tokio::io::{AsyncRead, AsyncWrite};
@@ -19,7 +17,7 @@ pub async fn cleartext_hack(
extra: &ConsoleReqExtra<'_>,
creds: &mut ClientCredentials<'_>,
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
) -> auth::Result<AuthSuccess<ComputeNode>> {
warn!("cleartext auth flow override is enabled, proceeding");
let password = AuthFlow::new(client)
.begin(auth::CleartextPassword)
@@ -27,13 +25,15 @@ pub async fn cleartext_hack(
.authenticate()
.await?;
let mut node = api.wake_compute(extra, creds).await?;
node.config.password(password);
let info = api.wake_compute(extra, creds).await?;
// Report tentative success; compute node will check the password anyway.
Ok(AuthSuccess {
reported_auth_ok: false,
value: node,
value: ComputeNode::Static {
password: Password::ClearText(password),
info,
},
})
}
@@ -44,7 +44,7 @@ pub async fn password_hack(
extra: &ConsoleReqExtra<'_>,
creds: &mut ClientCredentials<'_>,
client: &mut stream::PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<AuthSuccess<CachedNodeInfo>> {
) -> auth::Result<AuthSuccess<ComputeNode>> {
warn!("project not specified, resorting to the password hack auth flow");
let payload = AuthFlow::new(client)
.begin(auth::PasswordHack)
@@ -55,12 +55,14 @@ pub async fn password_hack(
info!(project = &payload.endpoint, "received missing parameter");
creds.project = Some(payload.endpoint);
let mut node = api.wake_compute(extra, creds).await?;
node.config.password(payload.password);
let info = api.wake_compute(extra, creds).await?;
// Report tentative success; compute node will check the password anyway.
Ok(AuthSuccess {
reported_auth_ok: false,
value: node,
value: ComputeNode::Static {
password: Password::ClearText(payload.password),
info,
},
})
}

View File

@@ -1,15 +1,10 @@
use super::AuthSuccess;
use crate::{
auth, compute,
console::{self, provider::NodeInfo},
error::UserFacingError,
stream::PqStream,
waiters,
auth, compute::ComputeNode, console, error::UserFacingError, stream::PqStream, waiters,
};
use pq_proto::BeMessage as Be;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_postgres::config::SslMode;
use tracing::{info, info_span};
#[derive(Debug, Error)]
@@ -57,12 +52,12 @@ pub fn new_psql_session_id() -> String {
pub(super) async fn authenticate(
link_uri: &reqwest::Url,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
) -> auth::Result<AuthSuccess<NodeInfo>> {
) -> auth::Result<AuthSuccess<ComputeNode>> {
let psql_session_id = new_psql_session_id();
let span = info_span!("link", psql_session_id = &psql_session_id);
let span = info_span!("link", psql_session_id);
let greeting = hello_message(link_uri, &psql_session_id);
let db_info = console::mgmt::with_waiter(psql_session_id, |waiter| async {
let info = console::mgmt::with_waiter(psql_session_id, |waiter| async {
// Give user a URL to spawn a new database.
info!(parent: &span, "sending the auth URL to the user");
client
@@ -79,35 +74,8 @@ pub(super) async fn authenticate(
client.write_message_noflush(&Be::NoticeResponse("Connecting to database."))?;
// This config should be self-contained, because we won't
// take username or dbname from client's startup message.
let mut config = compute::ConnCfg::new();
config
.host(&db_info.host)
.port(db_info.port)
.dbname(&db_info.dbname)
.user(&db_info.user);
// Backwards compatibility. pg_sni_proxy uses "--" in domain names
// while direct connections do not. Once we migrate to pg_sni_proxy
// everywhere, we can remove this.
if db_info.host.contains("--") {
// we need TLS connection with SNI info to properly route it
config.ssl_mode(SslMode::Require);
} else {
config.ssl_mode(SslMode::Disable);
}
if let Some(password) = db_info.password {
config.password(password.as_ref());
}
Ok(AuthSuccess {
reported_auth_ok: true,
value: NodeInfo {
config,
aux: db_info.aux.into(),
allow_self_signed_compute: false, // caller may override
},
value: ComputeNode::Link(info),
})
}

View File

@@ -1,16 +1,15 @@
use proxy::auth;
use proxy::console;
use proxy::http;
use proxy::metrics;
use anyhow::bail;
use anyhow::{bail, Context};
use clap::{self, Arg};
use proxy::config::{self, ProxyConfig};
use std::{borrow::Cow, net::SocketAddr};
use futures::future::try_join_all;
use proxy::{
auth,
config::{self, MetricCollectionConfig, ProxyConfig, TlsConfig},
console, http, metrics,
};
use std::{borrow::Cow, net::SocketAddr, sync::atomic::Ordering};
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use tracing::{info, warn};
use utils::{project_git_version, sentry_init::init_sentry};
project_git_version!(GIT_VERSION);
@@ -25,8 +24,7 @@ async fn main() -> anyhow::Result<()> {
::metrics::set_build_info_metric(GIT_VERSION);
let args = cli().get_matches();
let config = build_config(&args)?;
let config: &ProxyConfig = Box::leak(Box::new(build_config(&args)?));
info!("Authentication backend: {}", config.auth_backend);
// Check that we can bind to address before further initialization
@@ -71,20 +69,29 @@ async fn main() -> anyhow::Result<()> {
tasks.push(tokio::spawn(metrics::task_main(metrics_config)));
}
let tasks = futures::future::try_join_all(tasks.into_iter().map(proxy::flatten_err));
let client_tasks =
futures::future::try_join_all(client_tasks.into_iter().map(proxy::flatten_err));
if let auth::BackendType::Console(api, _) = &config.auth_backend {
if let Some(url) = args.get_one::<String>("redis-notifications") {
info!("Starting redis notifications listener ({url})");
tasks.push(tokio::spawn(console::notifications::task_main(
url.to_owned(),
api.caches,
)));
}
}
let tasks = try_join_all(tasks.into_iter().map(proxy::flatten_err));
let client_tasks = try_join_all(client_tasks.into_iter().map(proxy::flatten_err));
tokio::select! {
// We are only expecting an error from these forever tasks
res = tasks => { res?; },
res = client_tasks => { res?; },
}
Ok(())
}
/// ProxyConfig is created at proxy startup, and lives forever.
fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig> {
let tls_config = match (
fn build_tls_config(args: &clap::ArgMatches) -> anyhow::Result<Option<TlsConfig>> {
let config = match (
args.get_one::<String>("tls-key"),
args.get_one::<String>("tls-cert"),
) {
@@ -101,16 +108,22 @@ fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig>
.get_one::<String>("allow-self-signed-compute")
.unwrap()
.parse()?;
if allow_self_signed_compute {
warn!("allowing self-signed compute certificates");
proxy::compute::ALLOW_SELF_SIGNED_COMPUTE.store(true, Ordering::Relaxed);
}
let metric_collection = match (
args.get_one::<String>("metric-collection-endpoint"),
args.get_one::<String>("metric-collection-interval"),
) {
Ok(config)
}
fn build_metrics_config(args: &clap::ArgMatches) -> anyhow::Result<Option<MetricCollectionConfig>> {
let endpoint = args.get_one::<String>("metric-collection-endpoint");
let interval = args.get_one::<String>("metric-collection-interval");
let config = match (endpoint, interval) {
(Some(endpoint), Some(interval)) => Some(config::MetricCollectionConfig {
endpoint: endpoint.parse()?,
endpoint: endpoint.parse().context("bad metrics endpoint")?,
interval: humantime::parse_duration(interval)?,
}),
(None, None) => None,
@@ -120,21 +133,40 @@ fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig>
),
};
let auth_backend = match args.get_one::<String>("auth-backend").unwrap().as_str() {
Ok(config)
}
fn make_caches(args: &clap::ArgMatches) -> anyhow::Result<console::caches::ApiCaches> {
let config::CacheOptions { size, ttl } = args
.get_one::<String>("get-auth-info-cache")
.unwrap()
.parse()?;
info!("Using AuthInfoCache (get_auth_info) with size={size} ttl={ttl:?}");
let auth_info = console::caches::AuthInfoCache::new("auth_info_cache", size, ttl);
let config::CacheOptions { size, ttl } = args
.get_one::<String>("wake-compute-cache")
.unwrap()
.parse()?;
info!("Using NodeInfoCache (wake_compute) with size={size} ttl={ttl:?}");
let node_info = console::caches::NodeInfoCache::new("node_info_cache", size, ttl);
let caches = console::caches::ApiCaches {
auth_info,
node_info,
};
Ok(caches)
}
fn build_auth_config(args: &clap::ArgMatches) -> anyhow::Result<auth::BackendType<'static, ()>> {
let config = match args.get_one::<String>("auth-backend").unwrap().as_str() {
"console" => {
let config::CacheOptions { size, ttl } = args
.get_one::<String>("wake-compute-cache")
.unwrap()
.parse()?;
info!("Using NodeInfoCache (wake_compute) with size={size} ttl={ttl:?}");
let caches = Box::leak(Box::new(console::caches::ApiCaches {
node_info: console::caches::NodeInfoCache::new("node_info_cache", size, ttl),
}));
let url = args.get_one::<String>("auth-endpoint").unwrap().parse()?;
let endpoint = http::Endpoint::new(url, http::new_client());
let caches = Box::leak(Box::new(make_caches(args)?));
let api = console::provider::neon::Api::new(endpoint, caches);
auth::BackendType::Console(Cow::Owned(api), ())
}
@@ -150,12 +182,16 @@ fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig>
other => bail!("unsupported auth backend: {other}"),
};
let config = Box::leak(Box::new(ProxyConfig {
tls_config,
auth_backend,
metric_collection,
allow_self_signed_compute,
}));
Ok(config)
}
/// ProxyConfig is created at proxy startup, and lives forever.
fn build_config(args: &clap::ArgMatches) -> anyhow::Result<ProxyConfig> {
let config = ProxyConfig {
tls_config: build_tls_config(args)?,
auth_backend: build_auth_config(args)?,
metric_collection: build_metrics_config(args)?,
};
Ok(config)
}
@@ -239,11 +275,22 @@ fn cli() -> clap::Command {
.long("metric-collection-interval")
.help("how often metrics should be sent to a collection endpoint"),
)
.arg(
Arg::new("redis-notifications")
.long("redis-notifications")
.help("for receiving notifications from console (e.g. redis://127.0.0.1:6379)"),
)
.arg(
Arg::new("get-auth-info-cache")
.long("get-auth-info-cache")
.help("cache for `get_auth_info` api method (use `size=0` to disable)")
.default_value(config::CacheOptions::DEFAULT_AUTH_INFO),
)
.arg(
Arg::new("wake-compute-cache")
.long("wake-compute-cache")
.help("cache for `wake_compute` api method (use `size=0` to disable)")
.default_value(config::CacheOptions::DEFAULT_OPTIONS_NODE_INFO),
.default_value(config::CacheOptions::DEFAULT_NODE_INFO),
)
.arg(
Arg::new("allow-self-signed-compute")

View File

@@ -1,304 +1,117 @@
use std::{
borrow::Borrow,
hash::Hash,
ops::{Deref, DerefMut},
time::{Duration, Instant},
};
use tracing::debug;
use std::{any::Any, sync::Arc, time::Instant};
// This seems to make more sense than `lru` or `cached`:
//
// * `near/nearcore` ditched `cached` in favor of `lru`
// (https://github.com/near/nearcore/issues?q=is%3Aissue+lru+is%3Aclosed).
//
// * `lru` methods use an obscure `KeyRef` type in their contraints (which is deliberately excluded from docs).
// This severely hinders its usage both in terms of creating wrappers and supported key types.
//
// On the other hand, `hashlink` has good download stats and appears to be maintained.
use hashlink::{linked_hash_map::RawEntryMut, LruCache};
/// A variant of LRU where every entry has a TTL.
pub mod timed_lru;
pub use timed_lru::TimedLru;
/// A generic trait which exposes types of cache's key and value,
/// as well as the notion of cache entry invalidation.
/// This is useful for [`timed_lru::Cached`].
pub trait Cache {
/// Entry's key.
type Key;
/// Useful type aliases.
pub mod types {
pub type Cached<T> = super::Cached<'static, T>;
}
/// Entry's value.
type Value;
/// Lookup information for cache entry invalidation.
#[derive(Clone)]
pub struct LookupInfo<K> {
/// Cache entry creation time.
/// We use this during invalidation lookups to prevent eviction of a newer
/// entry sharing the same key (it might've been inserted by a different
/// task after we got the entry we're trying to invalidate now).
created_at: Instant,
/// Used for entry invalidation.
type LookupInfo<Key>;
/// Search by this key.
key: K,
}
/// This type incapsulates everything needed for cache entry invalidation.
/// For convenience, we completely erase the types of a cache ref and a key.
/// This lets us store multiple tokens in a homogeneous collection (e.g. Vec).
#[derive(Clone)]
pub struct InvalidationToken<'a> {
// TODO: allow more than one type of references (e.g. Arc) if it's ever needed.
cache: &'a (dyn Cache + Sync + Send),
info: LookupInfo<Arc<dyn Any + Sync + Send>>,
}
impl InvalidationToken<'_> {
/// Invalidate a corresponding cache entry.
pub fn invalidate(&self) {
let info = LookupInfo {
created_at: self.info.created_at,
key: self.info.key.as_ref(),
};
self.cache.invalidate_entry(info);
}
}
/// A combination of a cache entry and its invalidation token.
/// Makes it easier to see how those two are connected.
#[derive(Clone)]
pub struct Cached<'a, T> {
pub token: Option<InvalidationToken<'a>>,
pub value: Arc<T>,
}
impl<T> Cached<'_, T> {
/// Place any entry into this wrapper; invalidation will be a no-op.
pub fn new_uncached(value: T) -> Self {
Self {
token: None,
value: value.into(),
}
}
/// Invalidate a corresponding cache entry.
pub fn invalidate(&self) {
if let Some(token) = &self.token {
token.invalidate();
}
}
}
impl<T> std::ops::Deref for Cached<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.value
}
}
/// This trait captures the notion of cache entry invalidation.
/// It doesn't have any associated types because we use dyn-based type erasure.
trait Cache {
/// Invalidate an entry using a lookup info.
/// We don't have an empty default impl because it's error-prone.
fn invalidate(&self, _: &Self::LookupInfo<Self::Key>);
fn invalidate_entry(&self, info: LookupInfo<&(dyn Any + Send + Sync)>);
}
impl<C: Cache> Cache for &C {
type Key = C::Key;
type Value = C::Value;
type LookupInfo<Key> = C::LookupInfo<Key>;
fn invalidate(&self, info: &Self::LookupInfo<Self::Key>) {
C::invalidate(self, info)
}
}
pub use timed_lru::TimedLru;
pub mod timed_lru {
#[cfg(test)]
mod tests {
use super::*;
/// An implementation of timed LRU cache with fixed capacity.
/// Key properties:
///
/// * Whenever a new entry is inserted, the least recently accessed one is evicted.
/// The cache also keeps track of entry's insertion time (`created_at`) and TTL (`expires_at`).
///
/// * When the entry is about to be retrieved, we check its expiration timestamp.
/// If the entry has expired, we remove it from the cache; Otherwise we bump the
/// expiration timestamp (e.g. +5mins) and change its place in LRU list to prolong
/// its existence.
///
/// * There's an API for immediate invalidation (removal) of a cache entry;
/// It's useful in case we know for sure that the entry is no longer correct.
/// See [`timed_lru::LookupInfo`] & [`timed_lru::Cached`] for more information.
///
/// * Expired entries are kept in the cache, until they are evicted by the LRU policy,
/// or by a successful lookup (i.e. the entry hasn't expired yet).
/// There is no background job to reap the expired records.
///
/// * It's possible for an entry that has not yet expired entry to be evicted
/// before expired items. That's a bit wasteful, but probably fine in practice.
pub struct TimedLru<K, V> {
/// Cache's name for tracing.
name: &'static str,
/// The underlying cache implementation.
cache: parking_lot::Mutex<LruCache<K, Entry<V>>>,
/// Default time-to-live of a single entry.
ttl: Duration,
#[test]
fn trivial_properties_of_cached() {
let cached = Cached::new_uncached(0);
assert_eq!(*cached, 0);
cached.invalidate();
}
impl<K: Hash + Eq, V> Cache for TimedLru<K, V> {
type Key = K;
type Value = V;
type LookupInfo<Key> = LookupInfo<Key>;
#[test]
fn invalidation_token_type_erasure() {
let lifetime = std::time::Duration::from_secs(10);
let foo = TimedLru::<u32, u32>::new("foo", 128, lifetime);
let bar = TimedLru::<String, usize>::new("bar", 128, lifetime);
fn invalidate(&self, info: &Self::LookupInfo<K>) {
self.invalidate_raw(info)
}
}
let (_, x) = foo.insert(100.into(), 0.into());
let (_, y) = bar.insert(String::new().into(), 404.into());
struct Entry<T> {
created_at: Instant,
expires_at: Instant,
value: T,
}
impl<K: Hash + Eq, V> TimedLru<K, V> {
/// Construct a new LRU cache with timed entries.
pub fn new(name: &'static str, capacity: usize, ttl: Duration) -> Self {
Self {
name,
cache: LruCache::new(capacity).into(),
ttl,
}
// Invalidation tokens should be cloneable and homogeneous (same type).
let tokens = [x.token.clone().unwrap(), y.token.clone().unwrap()];
for token in tokens {
token.invalidate();
}
/// Drop an entry from the cache if it's outdated.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn invalidate_raw(&self, info: &LookupInfo<K>) {
let now = Instant::now();
// Do costly things before taking the lock.
let mut cache = self.cache.lock();
let raw_entry = match cache.raw_entry_mut().from_key(&info.key) {
RawEntryMut::Vacant(_) => return,
RawEntryMut::Occupied(x) => x,
};
// Remove the entry if it was created prior to lookup timestamp.
let entry = raw_entry.get();
let (created_at, expires_at) = (entry.created_at, entry.expires_at);
let should_remove = created_at <= info.created_at || expires_at <= now;
if should_remove {
raw_entry.remove();
}
drop(cache); // drop lock before logging
debug!(
created_at = format_args!("{created_at:?}"),
expires_at = format_args!("{expires_at:?}"),
entry_removed = should_remove,
"processed a cache entry invalidation event"
);
}
/// Try retrieving an entry by its key, then execute `extract` if it exists.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn get_raw<Q, R>(&self, key: &Q, extract: impl FnOnce(&K, &Entry<V>) -> R) -> Option<R>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let now = Instant::now();
let deadline = now.checked_add(self.ttl).expect("time overflow");
// Do costly things before taking the lock.
let mut cache = self.cache.lock();
let mut raw_entry = match cache.raw_entry_mut().from_key(key) {
RawEntryMut::Vacant(_) => return None,
RawEntryMut::Occupied(x) => x,
};
// Immeditely drop the entry if it has expired.
let entry = raw_entry.get();
if entry.expires_at <= now {
raw_entry.remove();
return None;
}
let value = extract(raw_entry.key(), entry);
let (created_at, expires_at) = (entry.created_at, entry.expires_at);
// Update the deadline and the entry's position in the LRU list.
raw_entry.get_mut().expires_at = deadline;
raw_entry.to_back();
drop(cache); // drop lock before logging
debug!(
created_at = format_args!("{created_at:?}"),
old_expires_at = format_args!("{expires_at:?}"),
new_expires_at = format_args!("{deadline:?}"),
"accessed a cache entry"
);
Some(value)
}
/// Insert an entry to the cache. If an entry with the same key already
/// existed, return the previous value and its creation timestamp.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn insert_raw(&self, key: K, value: V) -> (Instant, Option<V>) {
let created_at = Instant::now();
let expires_at = created_at.checked_add(self.ttl).expect("time overflow");
let entry = Entry {
created_at,
expires_at,
value,
};
// Do costly things before taking the lock.
let old = self
.cache
.lock()
.insert(key, entry)
.map(|entry| entry.value);
debug!(
created_at = format_args!("{created_at:?}"),
expires_at = format_args!("{expires_at:?}"),
replaced = old.is_some(),
"created a cache entry"
);
(created_at, old)
}
}
impl<K: Hash + Eq + Clone, V: Clone> TimedLru<K, V> {
pub fn insert(&self, key: K, value: V) -> (Option<V>, Cached<&Self>) {
let (created_at, old) = self.insert_raw(key.clone(), value.clone());
let cached = Cached {
token: Some((self, LookupInfo { created_at, key })),
value,
};
(old, cached)
}
}
impl<K: Hash + Eq, V: Clone> TimedLru<K, V> {
/// Retrieve a cached entry in convenient wrapper.
pub fn get<Q>(&self, key: &Q) -> Option<timed_lru::Cached<&Self>>
where
K: Borrow<Q> + Clone,
Q: Hash + Eq + ?Sized,
{
self.get_raw(key, |key, entry| {
let info = LookupInfo {
created_at: entry.created_at,
key: key.clone(),
};
Cached {
token: Some((self, info)),
value: entry.value.clone(),
}
})
}
}
/// Lookup information for key invalidation.
pub struct LookupInfo<K> {
/// Time of creation of a cache [`Entry`].
/// We use this during invalidation lookups to prevent eviction of a newer
/// entry sharing the same key (it might've been inserted by a different
/// task after we got the entry we're trying to invalidate now).
created_at: Instant,
/// Search by this key.
key: K,
}
/// Wrapper for convenient entry invalidation.
pub struct Cached<C: Cache> {
/// Cache + lookup info.
token: Option<(C, C::LookupInfo<C::Key>)>,
/// The value itself.
pub value: C::Value,
}
impl<C: Cache> Cached<C> {
/// Place any entry into this wrapper; invalidation will be a no-op.
/// Unfortunately, rust doesn't let us implement [`From`] or [`Into`].
pub fn new_uncached(value: impl Into<C::Value>) -> Self {
Self {
token: None,
value: value.into(),
}
}
/// Drop this entry from a cache if it's still there.
pub fn invalidate(&self) {
if let Some((cache, info)) = &self.token {
cache.invalidate(info);
}
}
/// Tell if this entry is actually cached.
pub fn cached(&self) -> bool {
self.token.is_some()
}
}
impl<C: Cache> Deref for Cached<C> {
type Target = C::Value;
fn deref(&self) -> &Self::Target {
&self.value
}
}
impl<C: Cache> DerefMut for Cached<C> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.value
}
// Values are still there.
assert_eq!(*x, 0);
assert_eq!(*y, 404);
}
}

293
proxy/src/cache/timed_lru.rs vendored Normal file
View File

@@ -0,0 +1,293 @@
#[cfg(test)]
mod tests;
use super::{Cache, Cached, InvalidationToken, LookupInfo};
use ref_cast::RefCast;
use std::{
any::Any,
borrow::Borrow,
hash::Hash,
sync::Arc,
time::{Duration, Instant},
};
use tracing::debug;
// This seems to make more sense than `lru` or `cached`:
//
// * `near/nearcore` ditched `cached` in favor of `lru`
// (https://github.com/near/nearcore/issues?q=is%3Aissue+lru+is%3Aclosed).
//
// * `lru` methods use an obscure `KeyRef` type in their contraints (which is deliberately excluded from docs).
// This severely hinders its usage both in terms of creating wrappers and supported key types.
//
// On the other hand, `hashlink` has good download stats and appears to be maintained.
use hashlink::{linked_hash_map::RawEntryMut, LruCache};
/// An implementation of timed LRU cache with fixed capacity.
/// Key properties:
///
/// * Whenever a new entry is inserted, the least recently accessed one is evicted.
/// The cache also keeps track of entry's insertion time (`created_at`) and TTL (`expires_at`).
///
/// * When the entry is about to be retrieved, we check its expiration timestamp.
/// If the entry has expired, we remove it from the cache; Otherwise we bump the
/// expiration timestamp (e.g. +5mins) and change its place in LRU list to prolong
/// its existence.
///
/// * There's an API for immediate invalidation (removal) of a cache entry;
/// It's useful in case we know for sure that the entry is no longer correct.
/// See [`timed_lru::LookupInfo`] & [`timed_lru::Cached`] for more information.
///
/// * Expired entries are kept in the cache, until they are evicted by the LRU policy,
/// or by a successful lookup (i.e. the entry hasn't expired yet).
/// There is no background job to reap the expired records.
///
/// * It's possible for an entry that has not yet expired entry to be evicted
/// before expired items. That's a bit wasteful, but probably fine in practice.
pub struct TimedLru<K, V> {
/// Cache's name for tracing.
name: &'static str,
/// The underlying cache implementation.
cache: parking_lot::Mutex<LruCache<Key<K>, Entry<V>>>,
/// Default time-to-live of a single entry.
ttl: Duration,
}
#[derive(RefCast, Hash, PartialEq, Eq)]
#[repr(transparent)]
struct Query<Q: ?Sized>(Q);
#[derive(Hash, PartialEq, Eq)]
#[repr(transparent)]
struct Key<T: ?Sized>(Arc<T>);
/// It's impossible to implement this without [`Key`] & [`Query`]:
/// * We can't implement std traits for [`Arc`].
/// * Even if we could, it'd conflict with `impl<T> Borrow<T> for T`.
impl<Q, T> Borrow<Query<Q>> for Key<T>
where
Q: ?Sized,
T: Borrow<Q>,
{
#[inline(always)]
fn borrow(&self) -> &Query<Q> {
RefCast::ref_cast(self.0.as_ref().borrow())
}
}
struct Entry<T> {
created_at: Instant,
expires_at: Instant,
value: Arc<T>,
}
impl<K: Hash + Eq, V> TimedLru<K, V> {
/// Construct a new LRU cache with timed entries.
pub fn new(name: &'static str, capacity: usize, ttl: Duration) -> Self {
Self {
name,
cache: LruCache::new(capacity).into(),
ttl,
}
}
/// Get the number of entries in the cache.
/// Note that this method will not try to evict stale entries.
pub fn size(&self) -> usize {
self.cache.lock().len()
}
/// Try retrieving an entry by its key, then execute `extract` if it exists.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn get_raw<Q, F, R>(&self, key: &Q, extract: F) -> Option<R>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
F: FnOnce(&Arc<K>, &Entry<V>) -> R,
{
let key: &Query<Q> = RefCast::ref_cast(key);
let now = Instant::now();
let deadline = now.checked_add(self.ttl).expect("time overflow");
// Do costly things before taking the lock.
let mut cache = self.cache.lock();
let mut raw_entry = match cache.raw_entry_mut().from_key(key) {
RawEntryMut::Vacant(_) => return None,
RawEntryMut::Occupied(x) => x,
};
// Immeditely drop the entry if it has expired.
let entry = raw_entry.get();
if entry.expires_at <= now {
raw_entry.remove();
return None;
}
let value = extract(&raw_entry.key().0, entry);
let (created_at, expires_at) = (entry.created_at, entry.expires_at);
// Update the deadline and the entry's position in the LRU list.
raw_entry.get_mut().expires_at = deadline;
raw_entry.to_back();
drop(cache); // drop lock before logging
debug!(
?created_at,
old_expires_at = ?expires_at,
new_expires_at = ?deadline,
"accessed a cache entry"
);
Some(value)
}
/// Insert an entry to the cache. If an entry with the same key already
/// existed, return the previous value and its creation timestamp.
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn insert_raw(&self, key: Arc<K>, value: Arc<V>) -> (Option<Arc<V>>, Instant) {
let created_at = Instant::now();
let expires_at = created_at.checked_add(self.ttl).expect("time overflow");
let entry = Entry {
created_at,
expires_at,
value,
};
// Do costly things before taking the lock.
let old = self
.cache
.lock()
.insert(Key(key), entry)
.map(|entry| entry.value);
debug!(
?created_at,
?expires_at,
replaced = old.is_some(),
"created a cache entry"
);
(old, created_at)
}
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn remove_raw<Q>(&self, key: &Q)
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let key: &Query<Q> = RefCast::ref_cast(key);
let mut cache = self.cache.lock();
cache.remove(key);
debug!("removed a cache entry");
}
}
/// Convenient wrappers for raw methods.
impl<K: Hash + Eq + Sync + Send + 'static, V> TimedLru<K, V>
where
Self: Sync + Send,
{
pub fn get<Q>(&self, key: &Q) -> Option<Cached<V>>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.get_raw(key, |key, entry| {
let info = LookupInfo {
created_at: entry.created_at,
key: Arc::clone(key),
};
Cached {
token: Some(Self::invalidation_token(self, info)),
value: entry.value.clone(),
}
})
}
pub fn insert(&self, key: Arc<K>, value: Arc<V>) -> (Option<Arc<V>>, Cached<V>) {
let (old, created_at) = self.insert_raw(key.clone(), value.clone());
let info = LookupInfo { created_at, key };
let cached = Cached {
token: Some(Self::invalidation_token(self, info)),
value,
};
(old, cached)
}
pub fn remove<Q>(&self, key: &Q)
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.remove_raw(key)
}
}
/// Implementation details of the entry invalidation machinery.
impl<K: Hash + Eq + Sync + Send + 'static, V> TimedLru<K, V>
where
Self: Sync + Send,
{
/// This is a proper (safe) way to create an invalidation token for [`TimedLru`].
fn invalidation_token(&self, info: LookupInfo<Arc<K>>) -> InvalidationToken<'_> {
InvalidationToken {
cache: self,
info: LookupInfo {
created_at: info.created_at,
key: info.key,
},
}
}
}
/// This implementation depends on [`Self::invalidation_token`].
impl<K: Hash + Eq + 'static, V> Cache for TimedLru<K, V> {
fn invalidate_entry(&self, info: LookupInfo<&(dyn Any + Sync + Send)>) {
let info = LookupInfo {
created_at: info.created_at,
// NOTE: it's important to downcast to the correct type!
key: info.key.downcast_ref::<K>().expect("bad key type"),
};
self.invalidate_raw(info)
}
}
impl<K: Hash + Eq, V> TimedLru<K, V> {
#[tracing::instrument(level = "debug", fields(cache = self.name), skip_all)]
fn invalidate_raw(&self, info: LookupInfo<&K>) {
let key: &Query<K> = RefCast::ref_cast(info.key);
let now = Instant::now();
// Do costly things before taking the lock.
let mut cache = self.cache.lock();
let raw_entry = match cache.raw_entry_mut().from_key(key) {
RawEntryMut::Vacant(_) => return,
RawEntryMut::Occupied(x) => x,
};
// Remove the entry if it was created prior to lookup timestamp.
let entry = raw_entry.get();
let (created_at, expires_at) = (entry.created_at, entry.expires_at);
let should_remove = created_at <= info.created_at || expires_at <= now;
if should_remove {
raw_entry.remove();
}
drop(cache); // drop lock before logging
debug!(
?created_at,
?expires_at,
entry_removed = should_remove,
"processed a cache entry invalidation event"
);
}
}

104
proxy/src/cache/timed_lru/tests.rs vendored Normal file
View File

@@ -0,0 +1,104 @@
use super::*;
/// Check that we can define the cache for certain types.
#[test]
fn definition() {
// Check for trivial yet possible types.
let cache = TimedLru::<(), ()>::new("test", 128, Duration::from_secs(0));
let _ = cache.insert(Default::default(), Default::default());
let _ = cache.get(&());
// Now for something less trivial.
let cache = TimedLru::<String, String>::new("test", 128, Duration::from_secs(0));
let _ = cache.insert(Default::default(), Default::default());
let _ = cache.get(&String::default());
let _ = cache.get("str should work");
// It should also work for non-cloneable values.
struct NoClone;
let cache = TimedLru::<Box<str>, NoClone>::new("test", 128, Duration::from_secs(0));
let _ = cache.insert(Default::default(), NoClone.into());
let _ = cache.get(&Box::<str>::from("boxed str"));
let _ = cache.get("str should work");
}
#[test]
fn insert() {
const CAPACITY: usize = 2;
let cache = TimedLru::<String, u32>::new("test", CAPACITY, Duration::from_secs(10));
assert_eq!(cache.size(), 0);
let key = Arc::new(String::from("key"));
let (old, cached) = cache.insert(key.clone(), 42.into());
assert_eq!(old, None);
assert_eq!(*cached, 42);
assert_eq!(cache.size(), 1);
let (old, cached) = cache.insert(key, 1.into());
assert_eq!(old.as_deref(), Some(&42));
assert_eq!(*cached, 1);
assert_eq!(cache.size(), 1);
let (old, cached) = cache.insert(Arc::new("N1".to_owned()), 10.into());
assert_eq!(old, None);
assert_eq!(*cached, 10);
assert_eq!(cache.size(), 2);
let (old, cached) = cache.insert(Arc::new("N2".to_owned()), 20.into());
assert_eq!(old, None);
assert_eq!(*cached, 20);
assert_eq!(cache.size(), 2);
}
#[test]
fn get_none() {
let cache = TimedLru::<String, u32>::new("test", 2, Duration::from_secs(10));
let cached = cache.get("missing");
assert!(matches!(cached, None));
}
#[test]
fn invalidation_simple() {
let cache = TimedLru::<String, u32>::new("test", 2, Duration::from_secs(10));
let (_, cached) = cache.insert(String::from("key").into(), 100.into());
assert_eq!(cache.size(), 1);
cached.invalidate();
assert_eq!(cache.size(), 0);
assert!(matches!(cache.get("key"), None));
}
#[test]
fn invalidation_preserve_newer() {
let cache = TimedLru::<String, u32>::new("test", 2, Duration::from_secs(10));
let key = Arc::new(String::from("key"));
let (_, cached) = cache.insert(key.clone(), 100.into());
assert_eq!(cache.size(), 1);
let _ = cache.insert(key.clone(), 200.into());
assert_eq!(cache.size(), 1);
cached.invalidate();
assert_eq!(cache.size(), 1);
let cached = cache.get(key.as_ref());
assert_eq!(cached.as_deref(), Some(&200));
}
#[test]
fn auto_expiry() {
let lifetime = Duration::from_millis(300);
let cache = TimedLru::<String, u32>::new("test", 2, lifetime);
let key = Arc::new(String::from("key"));
let _ = cache.insert(key.clone(), 42.into());
let cached = cache.get(key.as_ref());
assert_eq!(cached.as_deref(), Some(&42));
std::thread::sleep(lifetime);
let cached = cache.get(key.as_ref());
assert_eq!(cached.as_deref(), None);
}

View File

@@ -1,13 +1,28 @@
use crate::{auth::parse_endpoint_param, cancellation::CancelClosure, error::UserFacingError};
use crate::{
auth::parse_endpoint_param,
cancellation::CancelClosure,
console::messages::{DatabaseInfo, MetricsAuxInfo},
console::CachedNodeInfo,
error::UserFacingError,
};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
use std::{io, net::SocketAddr, time::Duration};
use std::{
io,
net::SocketAddr,
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::tls::MakeTlsConnect;
use tracing::{error, info, warn};
/// Should we allow self-signed certificates in TLS connections?
/// Most definitely, this shouldn't be allowed in production.
pub static ALLOW_SELF_SIGNED_COMPUTE: AtomicBool = AtomicBool::new(false);
const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node";
#[derive(Debug, Error)]
@@ -42,6 +57,88 @@ impl UserFacingError for ConnectionError {
/// A pair of `ClientKey` & `ServerKey` for `SCRAM-SHA-256`.
pub type ScramKeys = tokio_postgres::config::ScramKeys<32>;
#[derive(Clone)]
pub enum Password {
/// A regular cleartext password.
ClearText(Vec<u8>),
/// A pair of `ClientKey` & `ServerKey` for `SCRAM-SHA-256`.
ScramKeys(ScramKeys),
}
pub enum ComputeNode {
/// Route via link auth.
Link(DatabaseInfo),
/// Regular compute node.
Static {
password: Password,
info: CachedNodeInfo,
},
}
impl ComputeNode {
/// Get metrics auxiliary info.
pub fn metrics_aux_info(&self) -> &MetricsAuxInfo {
match self {
Self::Link(info) => &info.aux,
Self::Static { info, .. } => &info.aux,
}
}
/// Invalidate compute node info if it's cached.
pub fn invalidate(&self) -> bool {
if let Self::Static { info, .. } = self {
warn!("invalidating compute node info cache entry");
info.invalidate();
return true;
}
false
}
/// Turn compute node info into a postgres connection config.
pub fn to_conn_config(&self) -> ConnCfg {
let mut config = ConnCfg::new();
let (host, port) = match self {
Self::Link(info) => {
// NB: use pre-supplied dbname, user and password for link auth.
// See `ConnCfg::set_startup_params` below.
config.0.dbname(&info.dbname).user(&info.user);
if let Some(password) = &info.password {
config.0.password(password.as_bytes());
}
(&info.host, info.port)
}
Self::Static { info, password } => {
// NB: setup auth keys (for SCRAM) or plaintext password.
match password {
Password::ClearText(text) => config.0.password(text),
Password::ScramKeys(keys) => {
use tokio_postgres::config::AuthKeys;
config.0.auth_keys(AuthKeys::ScramSha256(keys.to_owned()))
}
};
(&info.address.host, info.address.port)
}
};
// Backwards compatibility. pg_sni_proxy uses "--" in domain names
// while direct connections do not. Once we migrate to pg_sni_proxy
// everywhere, we can remove this.
config.0.ssl_mode(if host.contains("--") {
// We need TLS connection with SNI info to properly route it.
tokio_postgres::config::SslMode::Require
} else {
tokio_postgres::config::SslMode::Disable
});
config.0.host(host).port(port);
config
}
}
/// A config for establishing a connection to compute node.
/// Eventually, `tokio_postgres` will be replaced with something better.
/// Newtype allows us to implement methods on top of it.
@@ -51,43 +148,32 @@ pub struct ConnCfg(Box<tokio_postgres::Config>);
/// Creation and initialization routines.
impl ConnCfg {
pub fn new() -> Self {
fn new() -> Self {
Self(Default::default())
}
/// Reuse password or auth keys from the other config.
pub fn reuse_password(&mut self, other: &Self) {
if let Some(password) = other.get_password() {
self.password(password);
}
if let Some(keys) = other.get_auth_keys() {
self.auth_keys(keys);
}
}
/// Apply startup message params to the connection config.
pub fn set_startup_params(&mut self, params: &StartupMessageParams) {
// Only set `user` if it's not present in the config.
// Link auth flow takes username from the console's response.
if let (None, Some(user)) = (self.get_user(), params.get("user")) {
self.user(user);
if let (None, Some(user)) = (self.0.get_user(), params.get("user")) {
self.0.user(user);
}
// Only set `dbname` if it's not present in the config.
// Link auth flow takes dbname from the console's response.
if let (None, Some(dbname)) = (self.get_dbname(), params.get("database")) {
self.dbname(dbname);
if let (None, Some(dbname)) = (self.0.get_dbname(), params.get("database")) {
self.0.dbname(dbname);
}
// Don't add `options` if they were only used for specifying a project.
// Connection pools don't support `options`, because they affect backend startup.
if let Some(options) = filtered_options(params) {
self.options(&options);
self.0.options(&options);
}
if let Some(app_name) = params.get("application_name") {
self.application_name(app_name);
self.0.application_name(app_name);
}
// TODO: This is especially ugly...
@@ -95,10 +181,10 @@ impl ConnCfg {
use tokio_postgres::config::ReplicationMode;
match replication {
"true" | "on" | "yes" | "1" => {
self.replication_mode(ReplicationMode::Physical);
self.0.replication_mode(ReplicationMode::Physical);
}
"database" => {
self.replication_mode(ReplicationMode::Logical);
self.0.replication_mode(ReplicationMode::Logical);
}
_other => {}
}
@@ -113,27 +199,6 @@ impl ConnCfg {
}
}
impl std::ops::Deref for ConnCfg {
type Target = tokio_postgres::Config;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// For now, let's make it easier to setup the config.
impl std::ops::DerefMut for ConnCfg {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl Default for ConnCfg {
fn default() -> Self {
Self::new()
}
}
impl ConnCfg {
/// Establish a raw TCP connection to the compute node.
async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream, &str)> {
@@ -220,16 +285,13 @@ pub struct PostgresConnection {
}
impl ConnCfg {
async fn do_connect(
&self,
allow_self_signed_compute: bool,
) -> Result<PostgresConnection, ConnectionError> {
async fn do_connect(&self) -> Result<PostgresConnection, ConnectionError> {
let (socket_addr, stream, host) = self.connect_raw().await?;
let tls_connector = native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(allow_self_signed_compute)
.build()
.unwrap();
.danger_accept_invalid_certs(ALLOW_SELF_SIGNED_COMPUTE.load(Ordering::Relaxed))
.build()?;
let mut mk_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector);
let tls = MakeTlsConnect::<tokio::net::TcpStream>::make_tls_connect(&mut mk_tls, host)?;
@@ -261,11 +323,8 @@ impl ConnCfg {
}
/// Connect to a corresponding compute node.
pub async fn connect(
&self,
allow_self_signed_compute: bool,
) -> Result<PostgresConnection, ConnectionError> {
self.do_connect(allow_self_signed_compute)
pub async fn connect(&self) -> Result<PostgresConnection, ConnectionError> {
self.do_connect()
.inspect_err(|err| {
// Immediately log the error we have at our disposal.
error!("couldn't connect to compute node: {err}");

View File

@@ -12,7 +12,6 @@ pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
pub auth_backend: auth::BackendType<'static, ()>,
pub metric_collection: Option<MetricCollectionConfig>,
pub allow_self_signed_compute: bool,
}
#[derive(Debug)]
@@ -211,8 +210,11 @@ pub struct CacheOptions {
}
impl CacheOptions {
/// Default options for [`crate::auth::caches::AuthInfoCache`].
pub const DEFAULT_AUTH_INFO: &str = "size=4000,ttl=5s";
/// Default options for [`crate::auth::caches::NodeInfoCache`].
pub const DEFAULT_OPTIONS_NODE_INFO: &str = "size=4000,ttl=5m";
pub const DEFAULT_NODE_INFO: &str = "size=4000,ttl=5m";
/// Parse cache options passed via cmdline.
/// Example: [`Self::DEFAULT_OPTIONS_NODE_INFO`].

View File

@@ -6,12 +6,17 @@ pub mod messages;
/// Wrappers for console APIs and their mocks.
pub mod provider;
pub use provider::{errors, Api, AuthInfo, CachedNodeInfo, ConsoleReqExtra, NodeInfo};
pub use provider::{errors, Api, ConsoleReqExtra};
pub use provider::{AuthInfo, NodeInfo};
pub use provider::{CachedAuthInfo, CachedNodeInfo};
/// Various cache-related types.
pub mod caches {
pub use super::provider::{ApiCaches, NodeInfoCache};
pub use super::provider::{ApiCaches, AuthInfoCache, AuthInfoCacheKey, NodeInfoCache};
}
/// Console's management API.
pub mod mgmt;
/// Console's notification bus.
pub mod notifications;

View File

@@ -22,11 +22,37 @@ impl fmt::Debug for GetRoleSecret {
}
}
/// Represents compute node's host & port pair.
#[derive(Debug)]
pub struct PgEndpoint {
pub host: Box<str>,
pub port: u16,
}
impl<'de> Deserialize<'de> for PgEndpoint {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
let raw = String::deserialize(deserializer)?;
let (host, port) = raw
.rsplit_once(':')
.ok_or_else(|| Error::custom(format!("bad compute address: {raw}")))?;
Ok(PgEndpoint {
host: host.into(),
port: port.parse().map_err(Error::custom)?,
})
}
}
/// Response which holds compute node's `host:port` pair.
/// Returned by the `/proxy_wake_compute` API method.
#[derive(Debug, Deserialize)]
pub struct WakeCompute {
pub address: Box<str>,
pub address: PgEndpoint,
pub aux: MetricsAuxInfo,
}
@@ -187,4 +213,24 @@ mod tests {
Ok(())
}
#[test]
fn parse_wake_compute() -> anyhow::Result<()> {
let _: WakeCompute = serde_json::from_value(json!({
"address": "127.0.0.1:5432",
"aux": dummy_aux(),
}))?;
let _: WakeCompute = serde_json::from_value(json!({
"address": "[::1]:5432",
"aux": dummy_aux(),
}))?;
serde_json::from_value::<WakeCompute>(json!({
"address": "localhost:5432",
"aux": dummy_aux(),
}))?;
Ok(())
}
}

View File

@@ -0,0 +1,70 @@
use crate::console::caches::{ApiCaches, AuthInfoCacheKey};
use futures::StreamExt;
use serde::Deserialize;
const CHANNEL_NAME: &str = "proxy_notifications";
#[derive(Debug, Deserialize)]
#[serde(tag = "type")]
enum Notification<'a> {
#[serde(rename = "password_changed")]
PasswordChanged { project: &'a str, role: &'a str },
}
#[tracing::instrument(skip(caches))]
fn handle_message(msg: redis::Msg, caches: &ApiCaches) -> anyhow::Result<()> {
let payload: String = msg.get_payload()?;
use Notification::*;
match serde_json::from_str(&payload) {
Ok(PasswordChanged { project, role }) => {
let key = AuthInfoCacheKey {
project: project.into(),
role: role.into(),
};
tracing::info!(key = ?key, "invalidating auth info");
caches.auth_info.remove(&key);
}
Err(e) => tracing::error!("broken message: {e}"),
}
Ok(())
}
/// Handle console's invalidation messages.
#[tracing::instrument(name = "console_notifications", skip_all)]
pub async fn task_main(url: String, caches: &ApiCaches) -> anyhow::Result<()> {
let client = redis::Client::open(url.as_ref())?;
let mut conn = client.get_async_connection().await?.into_pubsub();
tracing::info!("subscribing to a channel `{CHANNEL_NAME}`");
conn.subscribe(CHANNEL_NAME).await?;
let mut stream = conn.on_message();
while let Some(msg) = stream.next().await {
handle_message(msg, caches)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn parse_notification() -> anyhow::Result<()> {
let text = json!({
"type": "password_changed",
"project": "very-nice",
"role": "borat",
})
.to_string();
let _: Notification = serde_json::from_str(&text)?;
Ok(())
}
}

View File

@@ -1,14 +1,13 @@
pub mod mock;
pub mod neon;
use super::messages::MetricsAuxInfo;
use super::messages::{MetricsAuxInfo, PgEndpoint};
use crate::{
auth::ClientCredentials,
cache::{timed_lru, TimedLru},
compute, scram,
cache::{types::Cached, TimedLru},
scram,
};
use async_trait::async_trait;
use std::sync::Arc;
pub mod errors {
use crate::{
@@ -112,11 +111,9 @@ pub mod errors {
}
}
}
#[derive(Debug, Error)]
pub enum WakeComputeError {
#[error("Console responded with a malformed compute address: {0}")]
BadComputeAddress(Box<str>),
#[error(transparent)]
ApiError(ApiError),
}
@@ -132,10 +129,7 @@ pub mod errors {
fn to_string_client(&self) -> String {
use WakeComputeError::*;
match self {
// We shouldn't show user the address even if it's broken.
// Besides, user is unlikely to care about this detail.
BadComputeAddress(_) => REQUEST_FAILED.to_owned(),
// However, API might return a meaningful error.
// API might return a meaningful error.
ApiError(e) => e.to_string_client(),
}
}
@@ -160,23 +154,26 @@ pub enum AuthInfo {
}
/// Info for establishing a connection to a compute node.
/// This is what we get after auth succeeded, but not before!
#[derive(Clone)]
/// This struct is cached, so we shouldn't store any user creds here.
pub struct NodeInfo {
/// Compute node connection params.
/// It's sad that we have to clone this, but this will improve
/// once we migrate to a bespoke connection logic.
pub config: compute::ConnCfg,
/// Address of the compute node.
pub address: PgEndpoint,
/// Labels for proxy's metrics.
pub aux: Arc<MetricsAuxInfo>,
/// Whether we should accept self-signed certificates (for testing)
pub allow_self_signed_compute: bool,
pub aux: MetricsAuxInfo,
}
pub type NodeInfoCache = TimedLru<Arc<str>, NodeInfo>;
pub type CachedNodeInfo = timed_lru::Cached<&'static NodeInfoCache>;
pub type NodeInfoCache = TimedLru<Box<str>, NodeInfo>;
pub type CachedNodeInfo = Cached<NodeInfo>;
#[derive(Debug, Hash, PartialEq, Eq)]
pub struct AuthInfoCacheKey {
pub project: Box<str>,
pub role: Box<str>,
}
pub type AuthInfoCache = TimedLru<AuthInfoCacheKey, AuthInfo>;
pub type CachedAuthInfo = Cached<AuthInfo>;
/// This will allocate per each call, but the http requests alone
/// already require a few allocations, so it should be fine.
@@ -187,7 +184,7 @@ pub trait Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
) -> Result<Option<AuthInfo>, errors::GetAuthInfoError>;
) -> Result<Option<CachedAuthInfo>, errors::GetAuthInfoError>;
/// Wake up the compute node and return the corresponding connection info.
async fn wake_compute(
@@ -199,6 +196,8 @@ pub trait Api {
/// Various caches for [`console`].
pub struct ApiCaches {
/// Cache for the `wake_compute` API method.
/// Cache for the `get_auth_info` method.
pub auth_info: AuthInfoCache,
/// Cache for the `wake_compute` method.
pub node_info: NodeInfoCache,
}

View File

@@ -2,13 +2,14 @@
use super::{
errors::{ApiError, GetAuthInfoError, WakeComputeError},
AuthInfo, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
ConsoleReqExtra, PgEndpoint,
};
use crate::{auth::ClientCredentials, compute, error::io_error, scram, url::ApiUrl};
use super::{AuthInfo, NodeInfo};
use super::{CachedAuthInfo, CachedNodeInfo};
use crate::{auth::ClientCredentials, error::io_error, scram, url::ApiUrl};
use async_trait::async_trait;
use futures::TryFutureExt;
use thiserror::Error;
use tokio_postgres::config::SslMode;
use tracing::{error, info, info_span, warn, Instrument};
#[derive(Debug, Error)]
@@ -84,19 +85,15 @@ impl Api {
}
async fn do_wake_compute(&self) -> Result<NodeInfo, WakeComputeError> {
let mut config = compute::ConnCfg::new();
config
.host(self.endpoint.host_str().unwrap_or("localhost"))
.port(self.endpoint.port().unwrap_or(5432))
.ssl_mode(SslMode::Disable);
let host = self.endpoint.host_str().unwrap_or("localhost").into();
let port = self.endpoint.port().unwrap_or(5432);
let node = NodeInfo {
config,
let info = NodeInfo {
address: PgEndpoint { host, port },
aux: Default::default(),
allow_self_signed_compute: false,
};
Ok(node)
Ok(info)
}
}
@@ -107,8 +104,9 @@ impl super::Api for Api {
&self,
_extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
) -> Result<Option<AuthInfo>, GetAuthInfoError> {
self.do_get_auth_info(creds).await
) -> Result<Option<CachedAuthInfo>, GetAuthInfoError> {
let res = self.do_get_auth_info(creds).await?;
Ok(res.map(CachedAuthInfo::new_uncached))
}
#[tracing::instrument(skip_all)]
@@ -117,9 +115,8 @@ impl super::Api for Api {
_extra: &ConsoleReqExtra<'_>,
_creds: &ClientCredentials<'_>,
) -> Result<CachedNodeInfo, WakeComputeError> {
self.do_wake_compute()
.map_ok(CachedNodeInfo::new_uncached)
.await
let res = self.do_wake_compute().await?;
Ok(CachedNodeInfo::new_uncached(res))
}
}

View File

@@ -3,18 +3,19 @@
use super::{
super::messages::{ConsoleError, GetRoleSecret, WakeCompute},
errors::{ApiError, GetAuthInfoError, WakeComputeError},
ApiCaches, AuthInfo, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
ApiCaches, ConsoleReqExtra,
};
use crate::{auth::ClientCredentials, compute, http, scram};
use super::{AuthInfo, AuthInfoCacheKey, CachedAuthInfo};
use super::{CachedNodeInfo, NodeInfo};
use crate::{auth::ClientCredentials, http, scram};
use async_trait::async_trait;
use futures::TryFutureExt;
use tokio_postgres::config::SslMode;
use tracing::{error, info, info_span, warn, Instrument};
#[derive(Clone)]
pub struct Api {
endpoint: http::Endpoint,
caches: &'static ApiCaches,
pub endpoint: http::Endpoint,
pub caches: &'static ApiCaches,
}
impl Api {
@@ -91,22 +92,9 @@ impl Api {
let response = self.endpoint.execute(request).await?;
let body = parse_body::<WakeCompute>(response).await?;
// Unfortunately, ownership won't let us use `Option::ok_or` here.
let (host, port) = match parse_host_port(&body.address) {
None => return Err(WakeComputeError::BadComputeAddress(body.address)),
Some(x) => x,
};
// Don't set anything but host and port! This config will be cached.
// We'll set username and such later using the startup message.
// TODO: add more type safety (in progress).
let mut config = compute::ConnCfg::new();
config.host(host).port(port).ssl_mode(SslMode::Disable); // TLS is not configured on compute nodes.
let node = NodeInfo {
config,
aux: body.aux.into(),
allow_self_signed_compute: false,
address: body.address,
aux: body.aux,
};
Ok(node)
@@ -124,8 +112,28 @@ impl super::Api for Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
) -> Result<Option<AuthInfo>, GetAuthInfoError> {
self.do_get_auth_info(extra, creds).await
) -> Result<Option<CachedAuthInfo>, GetAuthInfoError> {
let key = AuthInfoCacheKey {
project: creds.project().expect("impossible").into(),
role: creds.user.into(),
};
// Check if we already have a cached auth info for this project + user combo.
// Beware! We shouldn't flush this for unsuccessful auth attempts, otherwise
// the cache makes no sense whatsoever in the presence of unfaithful clients.
// Instead, we snoop an invalidation queue to keep the cache up-to-date.
if let Some(cached) = self.caches.auth_info.get(&key) {
info!(key = ?key, "found cached auth info");
return Ok(Some(cached));
}
let info = self.do_get_auth_info(extra, creds).await?;
Ok(info.map(|info| {
info!(key = ?key, "creating a cache entry for auth info");
let (_, cached) = self.caches.auth_info.insert(key.into(), info.into());
cached
}))
}
#[tracing::instrument(skip_all)]
@@ -134,20 +142,21 @@ impl super::Api for Api {
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
) -> Result<CachedNodeInfo, WakeComputeError> {
let key = creds.project().expect("impossible");
let key: Box<str> = creds.project().expect("impossible").into();
// Every time we do a wakeup http request, the compute node will stay up
// for some time (highly depends on the console's scale-to-zero policy);
// The connection info remains the same during that period of time,
// which means that we might cache it to reduce the load and latency.
if let Some(cached) = self.caches.node_info.get(key) {
info!(key = key, "found cached compute node info");
if let Some(cached) = self.caches.node_info.get(&key) {
info!(key = ?key, "found cached compute node info");
return Ok(cached);
}
let node = self.do_wake_compute(extra, creds).await?;
let (_, cached) = self.caches.node_info.insert(key.into(), node);
info!(key = key, "created a cache entry for compute node info");
let info = self.do_wake_compute(extra, creds).await?;
info!(key = ?key, "creating a cache entry for compute node info");
let (_, cached) = self.caches.node_info.insert(key.into(), info.into());
Ok(cached)
}
@@ -177,20 +186,3 @@ async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
error!("console responded with an error ({status}): {text}");
Err(ApiError::Console { status, text })
}
fn parse_host_port(input: &str) -> Option<(&str, u16)> {
let (host, port) = input.split_once(':')?;
Some((host, port.parse().ok()?))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_host_port() {
let (host, port) = parse_host_port("127.0.0.1:5432").expect("failed to parse");
assert_eq!(host, "127.0.0.1");
assert_eq!(port, 5432);
}
}

View File

@@ -175,14 +175,9 @@ pub async fn handle(
application_name: Some(APP_NAME),
};
let node = creds.wake_compute(&extra).await?.expect("msg");
let conf = node.value.config;
let port = *conf.get_ports().first().expect("no port");
let host = match conf.get_hosts().first().expect("no host") {
tokio_postgres::config::Host::Tcp(host) => host,
tokio_postgres::config::Host::Unix(_) => {
return Err(anyhow::anyhow!("unix socket is not supported"));
}
};
let pg_endpoint = &node.value.address;
let port = pg_endpoint.port;
let host = &pg_endpoint.host;
let request_content_length = match request.body().size_hint().upper() {
Some(v) => v,

View File

@@ -4,7 +4,7 @@ mod tests;
use crate::{
auth::{self, backend::AuthSuccess},
cancellation::{self, CancelMap},
compute::{self, PostgresConnection},
compute::{self, ComputeNode, PostgresConnection},
config::{ProxyConfig, TlsConfig},
console::{self, messages::MetricsAuxInfo},
error::io_error,
@@ -155,7 +155,7 @@ pub async fn handle_ws_client(
async { result }.or_else(|e| stream.throw_error(e)).await?
};
let client = Client::new(stream, creds, &params, session_id, false);
let client = Client::new(stream, creds, &params, session_id);
cancel_map
.with_session(|session| client.connect_to_db(session, true))
.await
@@ -194,15 +194,7 @@ async fn handle_client(
async { result }.or_else(|e| stream.throw_error(e)).await?
};
let allow_self_signed_compute = config.allow_self_signed_compute;
let client = Client::new(
stream,
creds,
&params,
session_id,
allow_self_signed_compute,
);
let client = Client::new(stream, creds, &params, session_id);
cancel_map
.with_session(|session| client.connect_to_db(session, false))
.await
@@ -283,61 +275,38 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
}
/// Try to connect to the compute node once.
#[tracing::instrument(name = "connect_once", skip_all)]
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
) -> Result<PostgresConnection, compute::ConnectionError> {
// If we couldn't connect, a cached connection info might be to blame
// (e.g. the compute node's address might've changed at the wrong time).
// Invalidate the cache entry (if any) to prevent subsequent errors.
let invalidate_cache = |_: &compute::ConnectionError| {
let is_cached = node_info.cached();
if is_cached {
warn!("invalidating stalled compute node info cache entry");
node_info.invalidate();
}
let label = match is_cached {
true => "compute_cached",
false => "compute_uncached",
};
NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc();
};
let allow_self_signed_compute = node_info.allow_self_signed_compute;
node_info
.config
.connect(allow_self_signed_compute)
.inspect_err(invalidate_cache)
.await
}
/// Try to connect to the compute node, retrying if necessary.
/// This function might update `node_info`, so we take it by `&mut`.
#[tracing::instrument(skip_all)]
async fn connect_to_compute(
node_info: &mut console::CachedNodeInfo,
node_info: &mut ComputeNode,
params: &StartupMessageParams,
extra: &console::ConsoleReqExtra<'_>,
creds: &auth::BackendType<'_, auth::ClientCredentials<'_>>,
) -> Result<PostgresConnection, compute::ConnectionError> {
let mut num_retries: usize = NUM_RETRIES_WAKE_COMPUTE;
let mut num_retries = NUM_RETRIES_WAKE_COMPUTE;
loop {
// Apply startup params to the (possibly, cached) compute node info.
node_info.config.set_startup_params(params);
match connect_to_compute_once(node_info).await {
let mut config = node_info.to_conn_config();
config.set_startup_params(params);
match config.connect().await {
Err(e) if num_retries > 0 => {
info!("compute node's state has changed; requesting a wake-up");
match creds.wake_compute(extra).map_err(io_error).await? {
let label = match node_info.invalidate() {
true => "compute_cached",
false => "compute_uncached",
};
NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc();
let res = creds.wake_compute(extra).map_err(io_error).await?;
match (res, &node_info) {
// Update `node_info` and try one more time.
Some(mut new) => {
new.config.reuse_password(&node_info.config);
*node_info = new;
(Some(new), ComputeNode::Static { password, .. }) => {
*node_info = ComputeNode::Static {
password: password.to_owned(),
info: new,
}
}
// Link auth doesn't work that way, so we just exit.
None => return Err(e),
_ => return Err(e),
}
}
other => return other,
@@ -430,8 +399,6 @@ struct Client<'a, S> {
params: &'a StartupMessageParams,
/// Unique connection ID.
session_id: uuid::Uuid,
/// Allow self-signed certificates (for testing).
allow_self_signed_compute: bool,
}
impl<'a, S> Client<'a, S> {
@@ -441,14 +408,12 @@ impl<'a, S> Client<'a, S> {
creds: auth::BackendType<'a, auth::ClientCredentials<'a>>,
params: &'a StartupMessageParams,
session_id: uuid::Uuid,
allow_self_signed_compute: bool,
) -> Self {
Self {
stream,
creds,
params,
session_id,
allow_self_signed_compute,
}
}
}
@@ -468,7 +433,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
mut creds,
params,
session_id,
allow_self_signed_compute,
} = self;
let extra = console::ConsoleReqExtra {
@@ -491,19 +455,19 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
value: mut node_info,
} = auth_result;
node_info.allow_self_signed_compute = allow_self_signed_compute;
let mut node = connect_to_compute(&mut node_info, params, &extra, &creds)
.or_else(|e| stream.throw_error(e))
.await?;
prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?;
// Before proxy passing, forward to compute whatever data is left in the
// PqStream input buffer. Normally there is none, but our serverless npm
// driver in pipeline mode sends startup, password and first query
// immediately after opening the connection.
let (stream, read_buf) = stream.into_inner();
node.stream.write_all(&read_buf).await?;
proxy_pass(stream, node.stream, &node_info.aux).await
proxy_pass(stream, node.stream, node_info.metrics_aux_info()).await
}
}

View File

@@ -6,41 +6,40 @@ authors = []
[tool.poetry.dependencies]
python = "^3.9"
pytest = "^7.3.1"
pytest = "^6.2.5"
psycopg2-binary = "^2.9.1"
typing-extensions = "^4.6.1"
typing-extensions = "^4.1.0"
PyJWT = {version = "^2.1.0", extras = ["crypto"]}
requests = "^2.31.0"
pytest-xdist = "^3.3.1"
pytest-xdist = "^3.0.2"
asyncpg = "^0.27.0"
aiopg = "^1.3.1"
Jinja2 = "^3.0.2"
types-requests = "^2.31.0.0"
types-psycopg2 = "^2.9.21.10"
types-requests = "^2.28.5"
types-psycopg2 = "^2.9.18"
boto3 = "^1.26.16"
boto3-stubs = {extras = ["s3"], version = "^1.26.16"}
moto = {extras = ["server"], version = "^4.1.2"}
backoff = "^2.2.1"
backoff = "^1.11.1"
pytest-lazy-fixture = "^0.6.3"
prometheus-client = "^0.14.1"
pytest-timeout = "^2.1.0"
Werkzeug = "^2.2.3"
pytest-order = "^1.1.0"
allure-pytest = "^2.13.2"
pytest-asyncio = "^0.21.0"
pytest-order = "^1.0.1"
allure-pytest = "^2.13.1"
pytest-asyncio = "^0.19.0"
toml = "^0.10.2"
psutil = "^5.9.4"
types-psutil = "^5.9.5.12"
types-toml = "^0.10.8.6"
pytest-httpserver = "^1.0.8"
types-psutil = "^5.9.5.4"
types-toml = "^0.10.8"
pytest-httpserver = "^1.0.6"
aiohttp = "3.7.4"
pytest-rerunfailures = "^11.1.2"
types-pytest-lazy-fixture = "^0.6.3.3"
[tool.poetry.group.dev.dependencies]
black = "^23.3.0"
mypy = "==1.3.0"
ruff = "^0.0.269"
black = "^23.1.0"
mypy = "==1.1.1"
ruff = "^0.0.255"
[build-system]
requires = ["poetry-core>=1.0.0"]

View File

@@ -156,9 +156,7 @@ class LLVM:
profdata: Path,
objects: List[str],
sources: List[str],
demangler: Optional[Path] = None,
output_file: Optional[Path] = None,
) -> None:
demangler: Optional[Path] = None) -> None:
cwd = self.cargo.cwd
objects = list(intersperse('-object', objects))
@@ -182,18 +180,14 @@ class LLVM:
*objects,
*sources,
]
if output_file is not None:
with output_file.open('w') as outfile:
subprocess.check_call(cmd, cwd=cwd, stdout=outfile)
else:
subprocess.check_call(cmd, cwd=cwd)
subprocess.check_call(cmd, cwd=cwd)
def cov_report(self, **kwargs) -> None:
self._cov(subcommand='report', **kwargs)
def cov_export(self, *, kind: str, output_file: Optional[Path], **kwargs) -> None:
def cov_export(self, *, kind: str, **kwargs) -> None:
extras = (f'-format={kind}', )
self._cov(subcommand='export', *extras, output_file=output_file, **kwargs)
self._cov(subcommand='export', *extras, **kwargs)
def cov_show(self, *, kind: str, output_dir: Optional[Path] = None, **kwargs) -> None:
extras = [f'-format={kind}']
@@ -288,19 +282,10 @@ class TextReport(Report):
def generate(self) -> None:
self.llvm.cov_show(kind='text', **self._common_kwargs())
@dataclass
class JsonReport(Report):
output_file: Path
def generate(self) -> None:
self.llvm.cov_export(kind='text', output_file=self.output_file, **self._common_kwargs())
@dataclass
class LcovReport(Report):
output_file: Path
def generate(self) -> None:
self.llvm.cov_export(kind='lcov', output_file=self.output_file, **self._common_kwargs())
self.llvm.cov_export(kind='lcov', **self._common_kwargs())
@dataclass
@@ -489,10 +474,8 @@ class State:
lambda: HtmlReport(**params, output_dir=self.report_dir),
'text':
lambda: TextReport(**params),
'json':
lambda: JsonReport(**params, output_file=self.report_dir / 'coverage.json'),
'lcov':
lambda: LcovReport(**params, output_file=self.report_dir / 'lcov.info'),
lambda: LcovReport(**params),
'summary':
lambda: SummaryReport(**params),
'github':
@@ -560,7 +543,7 @@ self-contained example:
help='cargo build profile')
p_report.add_argument('--format',
default='html',
choices=('html', 'text', 'summary', 'json', 'lcov', 'github'),
choices=('html', 'text', 'summary', 'lcov', 'github'),
help='report format')
p_report.add_argument('--input-objects',
metavar='FILE',

View File

@@ -162,7 +162,7 @@ class PgProtocol:
Returns psycopg2's connection object.
This method passes all extra params to connstr.
"""
conn: PgConnection = psycopg2.connect(**self.conn_options(**kwargs))
conn = psycopg2.connect(**self.conn_options(**kwargs))
# WARNING: this setting affects *all* tests!
conn.autocommit = autocommit
@@ -535,8 +535,8 @@ def export_timeline(
def main(args: argparse.Namespace):
# any psql version will do here. use current DEFAULT_PG_VERSION = 15
psql_path = str(Path(args.pg_distrib_dir) / "v15" / "bin" / "psql")
# any psql version will do here. use current DEFAULT_PG_VERSION = 14
psql_path = str(Path(args.pg_distrib_dir) / "v14" / "bin" / "psql")
old_pageserver_host = args.old_pageserver_host
new_pageserver_host = args.new_pageserver_host

View File

@@ -35,7 +35,7 @@ def get_connection_cursor():
connstr = os.getenv("DATABASE_URL")
if not connstr:
err("DATABASE_URL environment variable is not set")
with psycopg2.connect(connstr, connect_timeout=30) as conn:
with psycopg2.connect(connstr) as conn:
with conn.cursor() as cur:
yield cur

View File

@@ -40,9 +40,6 @@ pub type BrokerClientChannel = BrokerServiceClient<Channel>;
// Create connection object configured to run TLS if schema starts with https://
// and plain text otherwise. Connection is lazy, only endpoint sanity is
// validated here.
//
// NB: this function is not async, but still must be run on a tokio runtime thread
// because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call.
pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
where
U: std::convert::TryInto<Uri>,

View File

@@ -312,6 +312,6 @@ def neon_with_baseline(request: FixtureRequest) -> PgCompare:
implementation-specific logic is widely useful across multiple tests, it might
make sense to add methods to the PgCompare class.
"""
fixture = request.getfixturevalue(request.param)
fixture = request.getfixturevalue(request.param) # type: ignore
assert isinstance(fixture, PgCompare), f"test error: fixture {fixture} is not PgCompare"
return fixture

View File

@@ -26,7 +26,7 @@ from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast
from urllib.parse import urlparse
import asyncpg
import backoff
import backoff # type: ignore
import boto3
import jwt
import psycopg2
@@ -354,7 +354,7 @@ class PgProtocol:
Returns psycopg2's connection object.
This method passes all extra params to connstr.
"""
conn: PgConnection = psycopg2.connect(**self.conn_options(**kwargs))
conn = psycopg2.connect(**self.conn_options(**kwargs))
# WARNING: this setting affects *all* tests!
conn.autocommit = autocommit
@@ -1603,6 +1603,8 @@ class NeonPageserver(PgProtocol):
# https://github.com/neondatabase/neon/issues/2442
".*could not remove ephemeral file.*No such file or directory.*",
# FIXME: These need investigation
".*gc_loop.*Failed to get a tenant .* Tenant .* not found.*",
".*compaction_loop.*Failed to get a tenant .* Tenant .* not found.*",
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
".*Removing intermediate uninit mark file.*",
@@ -1619,8 +1621,6 @@ class NeonPageserver(PgProtocol):
".*task iteration took longer than the configured period.*",
# this is until #3501
".*Compaction failed, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
# these can happen anytime we do compactions from background task and shutdown pageserver
r".*ERROR.*ancestor timeline \S+ is being stopped",
]
def start(
@@ -1845,13 +1845,15 @@ class VanillaPostgres(PgProtocol):
]
)
def configure(self, options: List[str]):
def configure(self, options: List[str]) -> "VanillaPostgres":
"""Append lines into postgresql.conf file."""
assert not self.running
with open(os.path.join(self.pgdatadir, "postgresql.conf"), "a") as conf_file:
conf_file.write("\n".join(options))
def start(self, log_path: Optional[str] = None):
return self
def start(self, log_path: Optional[str] = None) -> "VanillaPostgres":
assert not self.running
self.running = True
@@ -1862,11 +1864,15 @@ class VanillaPostgres(PgProtocol):
["pg_ctl", "-w", "-D", str(self.pgdatadir), "-l", log_path, "start"]
)
def stop(self):
return self
def stop(self) -> "VanillaPostgres":
assert self.running
self.running = False
self.pg_bin.run_capture(["pg_ctl", "-w", "-D", str(self.pgdatadir), "stop"])
return self
def get_subdir_size(self, subdir) -> int:
"""Return size of pgdatadir subdirectory in bytes."""
return get_dir_size(os.path.join(self.pgdatadir, subdir))
@@ -2035,6 +2041,17 @@ class NeonProxy(PgProtocol):
*["--auth-endpoint", self.pg_conn_url],
]
@dataclass(frozen=True)
class Console(AuthBackend):
console_url: str
def extra_args(self) -> list[str]:
return [
# Postgres auth backend params
*["--auth-backend", "console"],
*["--auth-endpoint", self.console_url],
]
def __init__(
self,
neon_binpath: Path,
@@ -2240,6 +2257,33 @@ def link_proxy(
yield proxy
@pytest.fixture(scope="function")
def console_proxy(
port_distributor: PortDistributor, neon_binpath: Path, test_output_dir: Path
) -> Iterator[NeonProxy]:
"""Neon proxy that routes through link auth."""
http_port = port_distributor.get_port()
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
console_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
console_url = f"http://127.0.0.1:{console_port}"
with NeonProxy(
neon_binpath=neon_binpath,
test_output_dir=test_output_dir,
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
external_http_port=external_http_port,
auth_backend=NeonProxy.Console(console_url),
) as proxy:
proxy.start()
yield proxy
@pytest.fixture(scope="function")
def static_proxy(
vanilla_pg: VanillaPostgres,

View File

@@ -1,6 +1,5 @@
from __future__ import annotations
import json
import time
from collections import defaultdict
from dataclasses import dataclass
@@ -110,10 +109,6 @@ class PageserverHttpClient(requests.Session):
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"
@property
def base_url(self) -> str:
return f"http://localhost:{self.port}"
def verbose_error(self, res: requests.Response):
try:
res.raise_for_status()
@@ -173,22 +168,8 @@ class PageserverHttpClient(requests.Session):
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_attach(
self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False
):
if config_null:
assert config is None
body = "null"
else:
# null-config is prohibited by the API
if config is None:
config = {}
body = json.dumps({"config": config})
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach",
data=body,
headers={"Content-Type": "application/json"},
)
def tenant_attach(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach")
self.verbose_error(res)
def tenant_detach(self, tenant_id: TenantId, detach_ignored=False):

View File

@@ -27,10 +27,6 @@ class PgVersion(str, enum.Enum):
def __repr__(self) -> str:
return f"'{self.value}'"
# Make this explicit for Python 3.11 compatibility, which changes the behavior of enums
def __str__(self) -> str:
return self.value
# In GitHub workflows we use Postgres version with v-prefix (e.g. v14 instead of just 14),
# sometime we need to do so in tests.
@property
@@ -82,11 +78,11 @@ def pytest_addoption(parser: Parser):
@pytest.fixture(scope="session")
def pg_version(request: FixtureRequest) -> Iterator[PgVersion]:
if v := request.config.getoption("--pg-version"):
version, source = v, "from --pg-version command-line argument"
version, source = v, "from --pg-version commad-line argument"
elif v := os.environ.get("DEFAULT_PG_VERSION"):
version, source = PgVersion(v), "from DEFAULT_PG_VERSION environment variable"
else:
version, source = DEFAULT_VERSION, "default version"
version, source = DEFAULT_VERSION, "default verson"
log.info(f"pg_version is {version} ({source})")
yield version

View File

@@ -2,7 +2,7 @@ from contextlib import closing
import pytest
from fixtures.compare_fixtures import PgCompare
from pytest_lazyfixture import lazy_fixture
from pytest_lazyfixture import lazy_fixture # type: ignore
@pytest.mark.parametrize(

View File

@@ -2,7 +2,7 @@ from contextlib import closing
import pytest
from fixtures.compare_fixtures import PgCompare
from pytest_lazyfixture import lazy_fixture
from pytest_lazyfixture import lazy_fixture # type: ignore
@pytest.mark.parametrize(

View File

@@ -2,7 +2,7 @@ from contextlib import closing
import pytest
from fixtures.compare_fixtures import PgCompare
from pytest_lazyfixture import lazy_fixture
from pytest_lazyfixture import lazy_fixture # type: ignore
@pytest.mark.parametrize(

View File

@@ -6,7 +6,7 @@ import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.compare_fixtures import PgCompare
from fixtures.log_helper import log
from pytest_lazyfixture import lazy_fixture
from pytest_lazyfixture import lazy_fixture # type: ignore
@pytest.mark.parametrize(

View File

@@ -1,200 +0,0 @@
from dataclasses import dataclass
from typing import Generator, Optional
import pytest
from fixtures.neon_fixtures import (
LocalFsStorage,
NeonEnv,
NeonEnvBuilder,
RemoteStorageKind,
)
from fixtures.pageserver.http import PageserverApiException, TenantConfig
from fixtures.types import TenantId
from fixtures.utils import wait_until
@pytest.fixture
def positive_env(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_attach_tenant_config",
)
env = neon_env_builder.init_start()
assert isinstance(env.remote_storage, LocalFsStorage)
return env
@dataclass
class NegativeTests:
neon_env: NeonEnv
tenant_id: TenantId
config_pre_detach: TenantConfig
@pytest.fixture
def negative_env(neon_env_builder: NeonEnvBuilder) -> Generator[NegativeTests, None, None]:
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
test_name="test_attach_tenant_config",
)
env = neon_env_builder.init_start()
assert isinstance(env.remote_storage, LocalFsStorage)
ps_http = env.pageserver.http_client()
(tenant_id, _) = env.neon_cli.create_tenant()
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == {}
config_pre_detach = ps_http.tenant_config(tenant_id)
assert tenant_id in [TenantId(t["id"]) for t in ps_http.tenant_list()]
ps_http.tenant_detach(tenant_id)
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
yield NegativeTests(env, tenant_id, config_pre_detach)
assert tenant_id not in [
TenantId(t["id"]) for t in ps_http.tenant_list()
], "tenant should not be attached after negative test"
env.pageserver.allowed_errors.append(".*Error processing HTTP request: Bad request")
def log_contains_bad_request():
env.pageserver.log_contains(".*Error processing HTTP request: Bad request")
wait_until(50, 0.1, log_contains_bad_request)
def test_null_body(negative_env: NegativeTests):
"""
If we send `null` in the body, the request should be rejected with status 400.
"""
env = negative_env.neon_env
tenant_id = negative_env.tenant_id
ps_http = env.pageserver.http_client()
res = ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
data=b"null",
headers={"Content-Type": "application/json"},
)
assert res.status_code == 400
def test_null_config(negative_env: NegativeTests):
"""
If the `config` field is `null`, the request should be rejected with status 400.
"""
env = negative_env.neon_env
tenant_id = negative_env.tenant_id
ps_http = env.pageserver.http_client()
res = ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
data=b'{"config": null}',
headers={"Content-Type": "application/json"},
)
assert res.status_code == 400
def test_config_with_unknown_keys_is_bad_request(negative_env: NegativeTests):
"""
If we send a config with unknown keys, the request should be rejected with status 400.
"""
env = negative_env.neon_env
tenant_id = negative_env.tenant_id
ps_http = env.pageserver.http_client()
config_with_unknown_keys = {
"compaction_period": "1h",
"this_key_does_not_exist": "some value",
}
with pytest.raises(PageserverApiException) as e:
ps_http.tenant_attach(tenant_id, config=config_with_unknown_keys)
assert e.type == PageserverApiException
assert e.value.status_code == 400
@pytest.mark.parametrize("content_type", [None, "application/json"])
def test_empty_body(positive_env: NeonEnv, content_type: Optional[str]):
"""
For backwards-compatiblity: if we send an empty body,
the request should be accepted and the config should be the default config.
"""
env = positive_env
ps_http = env.pageserver.http_client()
(tenant_id, _) = env.neon_cli.create_tenant()
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == {}
config_pre_detach = ps_http.tenant_config(tenant_id)
assert tenant_id in [TenantId(t["id"]) for t in ps_http.tenant_list()]
ps_http.tenant_detach(tenant_id)
assert tenant_id not in [TenantId(t["id"]) for t in ps_http.tenant_list()]
ps_http.post(
f"{ps_http.base_url}/v1/tenant/{tenant_id}/attach",
data=b"",
headers=None if content_type else {"Content-Type": "application/json"},
).raise_for_status()
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == {}
assert ps_http.tenant_config(tenant_id).effective_config == config_pre_detach.effective_config
def test_fully_custom_config(positive_env: NeonEnv):
"""
If we send a valid config in the body, the request should be accepted and the config should be applied.
"""
env = positive_env
fully_custom_config = {
"compaction_period": "1h",
"compaction_threshold": 13,
"compaction_target_size": 1048576,
"checkpoint_distance": 10000,
"checkpoint_timeout": "13m",
"eviction_policy": {
"kind": "LayerAccessThreshold",
"period": "20s",
"threshold": "23h",
},
"evictions_low_residence_duration_metric_threshold": "2days",
"gc_horizon": 23 * (1024 * 1024),
"gc_period": "2h 13m",
"image_creation_threshold": 7,
"pitr_interval": "1m",
"lagging_wal_timeout": "23m",
"max_lsn_wal_lag": 230000,
"min_resident_size_override": 23,
"trace_read_requests": True,
"walreceiver_connect_timeout": "13m",
}
ps_http = env.pageserver.http_client()
initial_tenant_config = ps_http.tenant_config(env.initial_tenant)
assert initial_tenant_config.tenant_specific_overrides == {}
assert set(initial_tenant_config.effective_config.keys()) == set(
fully_custom_config.keys()
), "ensure we cover all config options"
(tenant_id, _) = env.neon_cli.create_tenant()
ps_http.set_tenant_config(tenant_id, fully_custom_config)
our_tenant_config = ps_http.tenant_config(tenant_id)
assert our_tenant_config.tenant_specific_overrides == fully_custom_config
assert set(our_tenant_config.effective_config.keys()) == set(
fully_custom_config.keys()
), "ensure we cover all config options"
assert {
k: initial_tenant_config.effective_config[k] != our_tenant_config.effective_config[k]
for k in fully_custom_config.keys()
} == {
k: True for k in fully_custom_config.keys()
}, "ensure our custom config has different values than the default config for all config options, so we know we overrode everything"
ps_http.tenant_detach(tenant_id)
ps_http.tenant_attach(tenant_id, config=fully_custom_config)
assert ps_http.tenant_config(tenant_id).tenant_specific_overrides == fully_custom_config
assert set(ps_http.tenant_config(tenant_id).effective_config.keys()) == set(
fully_custom_config.keys()
), "ensure we cover all config options"

View File

@@ -1,210 +0,0 @@
from types import TracebackType
from typing import Any, Dict, List, Optional, Tuple, Type
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import VanillaPostgres
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def handle_db(dbs, roles, operation):
if operation["op"] == "set":
if "old_name" in operation and operation["old_name"] in dbs:
dbs[operation["name"]] = dbs[operation["old_name"]]
dbs.pop(operation["old_name"])
if "owner" in operation:
dbs[operation["name"]] = operation["owner"]
elif operation["op"] == "del":
dbs.pop(operation["name"])
else:
raise ValueError("Invalid op")
def handle_role(dbs, roles, operation):
if operation["op"] == "set":
if "old_name" in operation and operation["old_name"] in roles:
roles[operation["name"]] = roles[operation["old_name"]]
roles.pop(operation["old_name"])
for db, owner in dbs.items():
if owner == operation["old_name"]:
dbs[db] = operation["name"]
if "password" in operation:
roles[operation["name"]] = operation["password"]
elif operation["op"] == "del":
if "old_name" in operation:
roles.pop(operation["old_name"])
roles.pop(operation["name"])
else:
raise ValueError("Invalid op")
fail = False
def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response:
log.info(f"Received request with data {request.get_data(as_text=True)}")
if fail:
log.info("FAILING")
return Response(status=500, response="Failed just cuz")
if request.json is None:
log.info("Received invalid JSON")
return Response(status=400)
json = request.json
# Handle roles first
if "roles" in json:
for operation in json["roles"]:
handle_role(dbs, roles, operation)
if "dbs" in json:
for operation in json["dbs"]:
handle_db(dbs, roles, operation)
return Response(status=200)
class DdlForwardingContext:
def __init__(self, httpserver: HTTPServer, vanilla_pg: VanillaPostgres, host: str, port: int):
self.server = httpserver
self.pg = vanilla_pg
self.host = host
self.port = port
self.dbs: Dict[str, str] = {}
self.roles: Dict[str, str] = {}
endpoint = "/management/api/v2/roles_and_databases"
ddl_url = f"http://{host}:{port}{endpoint}"
self.pg.configure(
[
f"neon.console_url={ddl_url}",
"shared_preload_libraries = 'neon'",
]
)
log.info(f"Listening on {ddl_url}")
self.server.expect_request(endpoint, method="PATCH").respond_with_handler(
lambda request: ddl_forward_handler(request, self.dbs, self.roles)
)
def __enter__(self):
self.pg.start()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
):
self.pg.stop()
def send(self, query: str) -> List[Tuple[Any, ...]]:
return self.pg.safe_psql(query)
def wait(self, timeout=3):
self.server.wait(timeout=timeout)
def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]:
res = self.send(query)
self.wait(timeout=timeout)
return res
@pytest.fixture(scope="function")
def ddl(
httpserver: HTTPServer, vanilla_pg: VanillaPostgres, httpserver_listen_address: tuple[str, int]
):
(host, port) = httpserver_listen_address
with DdlForwardingContext(httpserver, vanilla_pg, host, port) as ddl:
yield ddl
def test_ddl_forwarding(ddl: DdlForwardingContext):
curr_user = ddl.send("SELECT current_user")[0][0]
log.info(f"Current user is {curr_user}")
ddl.send_and_wait("CREATE DATABASE bork")
assert ddl.dbs == {"bork": curr_user}
ddl.send_and_wait("CREATE ROLE volk WITH PASSWORD 'nu_zayats'")
ddl.send_and_wait("ALTER DATABASE bork RENAME TO nu_pogodi")
assert ddl.dbs == {"nu_pogodi": curr_user}
ddl.send_and_wait("ALTER DATABASE nu_pogodi OWNER TO volk")
assert ddl.dbs == {"nu_pogodi": "volk"}
ddl.send_and_wait("DROP DATABASE nu_pogodi")
assert ddl.dbs == {}
ddl.send_and_wait("DROP ROLE volk")
assert ddl.roles == {}
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
assert ddl.roles == {"tarzan": "of_the_apes"}
ddl.send_and_wait("DROP ROLE tarzan")
assert ddl.roles == {}
ddl.send_and_wait("CREATE ROLE tarzan WITH PASSWORD 'of_the_apes'")
assert ddl.roles == {"tarzan": "of_the_apes"}
ddl.send_and_wait("ALTER ROLE tarzan WITH PASSWORD 'jungle_man'")
assert ddl.roles == {"tarzan": "jungle_man"}
ddl.send_and_wait("ALTER ROLE tarzan RENAME TO mowgli")
assert ddl.roles == {"mowgli": "jungle_man"}
ddl.send_and_wait("DROP ROLE mowgli")
assert ddl.roles == {}
conn = ddl.pg.connect()
cur = conn.cursor()
cur.execute("BEGIN")
cur.execute("CREATE ROLE bork WITH PASSWORD 'cork'")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"bork": "cork"}
cur.execute("BEGIN")
cur.execute("CREATE ROLE stork WITH PASSWORD 'pork'")
cur.execute("ABORT")
ddl.wait()
assert ("stork", "pork") not in ddl.roles.items()
cur.execute("BEGIN")
cur.execute("ALTER ROLE bork WITH PASSWORD 'pork'")
cur.execute("ALTER ROLE bork RENAME TO stork")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"stork": "pork"}
cur.execute("BEGIN")
cur.execute("CREATE ROLE dork WITH PASSWORD 'york'")
cur.execute("SAVEPOINT point")
cur.execute("ALTER ROLE dork WITH PASSWORD 'zork'")
cur.execute("ALTER ROLE dork RENAME TO fork")
cur.execute("ROLLBACK TO SAVEPOINT point")
cur.execute("ALTER ROLE dork WITH PASSWORD 'fork'")
cur.execute("ALTER ROLE dork RENAME TO zork")
cur.execute("RELEASE SAVEPOINT point")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"stork": "pork", "zork": "fork"}
cur.execute("DROP ROLE stork")
cur.execute("DROP ROLE zork")
ddl.wait()
assert ddl.roles == {}
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
cur.execute("CREATE ROLE stork WITH PASSWORD 'cork'")
cur.execute("BEGIN")
cur.execute("DROP ROLE bork")
cur.execute("ALTER ROLE stork RENAME TO bork")
cur.execute("COMMIT")
ddl.wait()
assert ddl.roles == {"bork": "cork"}
cur.execute("DROP ROLE bork")
ddl.wait()
assert ddl.roles == {}
cur.execute("CREATE ROLE bork WITH PASSWORD 'dork'")
cur.execute("CREATE DATABASE stork WITH OWNER=bork")
cur.execute("ALTER ROLE bork RENAME TO cork")
ddl.wait()
assert ddl.dbs == {"stork": "cork"}
with pytest.raises(psycopg2.InternalError):
global fail
fail = True
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
ddl.wait()
conn.close()

View File

@@ -228,6 +228,7 @@ def proxy_with_metric_collector(
@pytest.mark.asyncio
async def test_proxy_metric_collection(
httpserver: HTTPServer,
httpserver_listen_address,
proxy_with_metric_collector: NeonProxy,
vanilla_pg: VanillaPostgres,
):

View File

@@ -1,11 +1,13 @@
import json
import logging
import subprocess
from typing import Any, List
from typing import Any, List, cast
import psycopg2
import pytest
import requests
from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres
from aiohttp import web
from fixtures.neon_fixtures import PSQL, NeonProxy, PortDistributor, VanillaPostgres
def test_proxy_select_1(static_proxy: NeonProxy):
@@ -225,3 +227,78 @@ def test_sql_over_http(static_proxy: NeonProxy):
res = q("drop table t")
assert res["command"] == "DROP"
assert res["rowCount"] is None
@pytest.mark.asyncio
async def test_compute_cache_invalidation(
port_distributor: PortDistributor, vanilla_pg: VanillaPostgres, console_proxy: NeonProxy
):
console_url = cast(NeonProxy.Console, console_proxy.auth_backend).console_url
logging.info(f"mocked console's url is {console_url}")
console_port = int(console_url.split(":")[-1])
logging.info(f"mocked console's port is {console_port}")
routes = web.RouteTableDef()
@routes.get("/proxy_get_role_secret")
async def get_role_secret(request):
# corresponds to password "password"
secret = ":".join(
[
"SCRAM-SHA-256$4096",
"t33UQcz/cs1D+n9INqThsw==$1NYlCbuxtK7YF2sgECBDTv1Myf8PpHJCT3RgKSXlZL0=",
"9iLeGY91MqBQ4ez1389Smo7h+STsJJ5jvu7kNofxj08=",
]
)
return web.json_response({"role_secret": secret})
wake_compute_called = 0
postgres_port = vanilla_pg.default_options["port"]
@routes.get("/proxy_wake_compute")
async def wake_compute(request):
nonlocal wake_compute_called
wake_compute_called += 1
nonlocal postgres_port
logging.info(f"compute's port is {postgres_port}")
return web.json_response(
{
"address": f"127.0.0.1:{postgres_port}",
"aux": {
"endpoint_id": "",
"project_id": "",
"branch_id": "",
},
}
)
console = web.Application()
console.add_routes(routes)
runner = web.AppRunner(console)
await runner.setup()
await web.TCPSite(runner, "127.0.0.1", console_port).start()
# Create a user we're going to use in the test sequence
user, password = "borat", "password"
vanilla_pg.start().safe_psql(f"create role {user} with login password '{password}'")
async def try_connect():
await console_proxy.connect_async(user=user, password=password, dbname="postgres")
assert wake_compute_called == 0
# Try connecting to compute
await try_connect()
assert wake_compute_called == 1
# Change compute's port
postgres_port = port_distributor.get_port()
vanilla_pg.stop().configure([f"port = {postgres_port}"]).start()
# Try connecting to compute
await try_connect()
assert wake_compute_called == 2

View File

@@ -4,7 +4,7 @@ from pathlib import Path
from types import TracebackType
from typing import Optional, Type
import backoff
import backoff # type: ignore
from fixtures.log_helper import log
from fixtures.neon_fixtures import PgProtocol, PortDistributor, VanillaPostgres

View File

@@ -62,7 +62,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
log.info(f"show {env.initial_tenant}")
pscur.execute(f"show {env.initial_tenant}")
res = pscur.fetchone()
assert res is not None
assert all(
i in res.items()
for i in {
@@ -102,7 +101,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {
@@ -165,7 +163,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"after config res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {
@@ -221,7 +218,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"after restart res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {
@@ -282,7 +278,6 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
pscur.execute(f"show {tenant}")
res = pscur.fetchone()
log.info(f"after restart res: {res}")
assert res is not None
assert all(
i in res.items()
for i in {

View File

@@ -27,6 +27,7 @@ futures-core = { version = "0.3" }
futures-executor = { version = "0.3" }
futures-sink = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
hashbrown = { version = "0.12", features = ["raw"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -38,10 +39,11 @@ num-traits = { version = "0.2", features = ["i128"] }
prost = { version = "0.11" }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }
regex-syntax = { version = "0.7" }
regex-syntax = { version = "0.6" }
reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "multipart", "rustls-tls"] }
ring = { version = "0.16", features = ["std"] }
rustls = { version = "0.20", features = ["dangerous_configuration"] }
rustls-56bd22fc3884b12 = { package = "rustls", version = "0.20", features = ["dangerous_configuration"] }
rustls-647d43efb71741da = { package = "rustls", version = "0.21" }
scopeguard = { version = "1" }
serde = { version = "1", features = ["alloc", "derive"] }
serde_json = { version = "1", features = ["raw_value"] }
@@ -61,6 +63,7 @@ url = { version = "2", features = ["serde"] }
anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] }
either = { version = "1" }
hashbrown = { version = "0.12", features = ["raw"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -68,7 +71,7 @@ memchr = { version = "2" }
nom = { version = "7" }
prost = { version = "0.11" }
regex = { version = "1" }
regex-syntax = { version = "0.7" }
regex-syntax = { version = "0.6" }
serde = { version = "1", features = ["alloc", "derive"] }
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] }
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit-mut"] }