Compare commits

..

6 Commits

Author SHA1 Message Date
Bojan Serafimov
f6798d503a Skip sync safekeepers if possible 2023-06-27 10:35:25 -04:00
Shany Pozin
d748615c1f RemoteTimelineClient::delete_all() to use s3::delete_objects (#4461)
## Problem
[#4325](https://github.com/neondatabase/neon/issues/4325)
## Summary of changes
Use delete_objects() method
2023-06-27 15:01:32 +03:00
Dmitry Rodionov
681c6910c2 Straighten the spec for timeline delete (#4538)
## Problem

Lets keep 500 for unusual stuff that is not considered normal. Came up
during one of the discussions around console logs now seeing this 500's.

## Summary of changes

- Return 409 Conflict instead of 500
- Remove 200 ok status because it is not used anymore
2023-06-27 13:56:32 +03:00
Vadim Kharitonov
148f0f9b21 Compile pg_roaringbitmap extension (#4564)
## Problem

```
postgres=# create extension roaringbitmap;
CREATE EXTENSION
postgres=# select roaringbitmap('{1,100,10}');
                 roaringbitmap
------------------------------------------------
 \x3a30000001000000000002001000000001000a006400
(1 row)
```
2023-06-27 10:55:03 +01:00
Shany Pozin
a7f3f5f356 Revert "run Layer::get_value_reconstruct_data in spawn_blocking#4498" (#4569)
This reverts commit 1faf69a698.
2023-06-27 10:57:28 +03:00
Felix Prasanna
00d1cfa503 bump VM_BUILDER_VERSION to 0.11.0 (#4566)
Routine bump of autoscaling version `0.8.0` -> `0.11.0`
2023-06-26 14:10:27 -04:00
42 changed files with 198 additions and 1588 deletions

View File

@@ -738,7 +738,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.8.0
VM_BUILDER_VERSION: v0.11.0
steps:
- name: Checkout

3
Cargo.lock generated
View File

@@ -924,14 +924,12 @@ dependencies = [
"opentelemetry",
"postgres",
"regex",
"remote_storage",
"reqwest",
"serde",
"serde_json",
"tar",
"tokio",
"tokio-postgres",
"toml_edit",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
@@ -999,7 +997,6 @@ dependencies = [
"tar",
"thiserror",
"toml",
"tracing",
"url",
"utils",
"workspace_hack",

View File

@@ -498,6 +498,23 @@ RUN wget https://github.com/fboulnois/pg_uuidv7/archive/refs/tags/v1.0.1.tar.gz
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_uuidv7.control
#########################################################################################
#
# Layer "pg-roaringbitmap-pg-build"
# compile pg_roaringbitmap extension
#
#########################################################################################
FROM build-deps AS pg-roaringbitmap-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4.tar.gz -O pg_roaringbitmap.tar.gz && \
echo "b75201efcb1c2d1b014ec4ae6a22769cc7a224e6e406a587f5784a37b6b5a2aa pg_roaringbitmap.tar.gz" | sha256sum --check && \
mkdir pg_roaringbitmap-src && cd pg_roaringbitmap-src && tar xvzf ../pg_roaringbitmap.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/roaringbitmap.control
#########################################################################################
#
# Layer "rust extensions"
@@ -632,6 +649,7 @@ COPY --from=pg-cron-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-pgx-ulid-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-roaringbitmap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \

View File

@@ -30,5 +30,3 @@ url.workspace = true
compute_api.workspace = true
utils.workspace = true
workspace_hack.workspace = true
toml_edit.workspace = true
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }

View File

@@ -27,8 +27,7 @@
//! compute_ctl -D /var/db/postgres/compute \
//! -C 'postgresql://cloud_admin@localhost/postgres' \
//! -S /var/db/postgres/specs/current.json \
//! -b /usr/local/bin/postgres \
//! -r {"bucket": "my-bucket", "region": "eu-central-1"}
//! -b /usr/local/bin/postgres
//! ```
//!
use std::collections::HashMap;
@@ -36,7 +35,7 @@ use std::fs::File;
use std::panic;
use std::path::Path;
use std::process::exit;
use std::sync::{mpsc, Arc, Condvar, Mutex, OnceLock};
use std::sync::{mpsc, Arc, Condvar, Mutex};
use std::{thread, time::Duration};
use anyhow::{Context, Result};
@@ -49,7 +48,6 @@ use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::{get_pg_version, init_remote_storage};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
@@ -66,20 +64,6 @@ fn main() -> Result<()> {
info!("build_tag: {build_tag}");
let matches = cli().get_matches();
let pgbin_default = String::from("postgres");
let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
let remote_ext_config = matches.get_one::<String>("remote-ext-config");
let ext_remote_storage = match remote_ext_config {
Some(x) => match init_remote_storage(x) {
Ok(y) => Some(y),
Err(e) => {
dbg!("Error {:?}, setting remote storage to None", e);
None
}
},
None => None,
};
let http_port = *matches
.get_one::<u16>("http-port")
@@ -144,6 +128,9 @@ fn main() -> Result<()> {
let compute_id = matches.get_one::<String>("compute-id");
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
// Try to use just 'postgres' if no path is provided
let pgbin = matches.get_one::<String>("pgbin").unwrap();
let spec;
let mut live_config_allowed = false;
match spec_json {
@@ -181,7 +168,6 @@ fn main() -> Result<()> {
let mut new_state = ComputeState::new();
let spec_set;
if let Some(spec) = spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
new_state.pspec = Some(pspec);
@@ -193,13 +179,9 @@ fn main() -> Result<()> {
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
pgversion: get_pg_version(pgbin),
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage,
available_libraries: OnceLock::new(),
available_extensions: OnceLock::new(),
};
let compute = Arc::new(compute_node);
@@ -208,8 +190,6 @@ fn main() -> Result<()> {
let _http_handle =
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
let extension_server_port: u16 = http_port;
if !spec_set {
// No spec provided, hang waiting for it.
info!("no compute spec provided, waiting");
@@ -250,7 +230,7 @@ fn main() -> Result<()> {
// Start Postgres
let mut delay_exit = false;
let mut exit_code = None;
let pg = match compute.start_compute(extension_server_port) {
let pg = match compute.start_compute() {
Ok(pg) => Some(pg),
Err(err) => {
error!("could not start the compute node: {:?}", err);
@@ -369,12 +349,6 @@ fn cli() -> clap::Command {
.long("control-plane-uri")
.value_name("CONTROL_PLANE_API_BASE_URI"),
)
.arg(
Arg::new("remote-ext-config")
.short('r')
.long("remote-ext-config")
.value_name("REMOTE_EXT_CONFIG"),
)
}
#[test]

View File

@@ -1,15 +1,13 @@
use std::collections::HashMap;
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::{Condvar, Mutex, OnceLock};
use std::sync::{Condvar, Mutex};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
use tokio;
use tokio_postgres;
use tracing::{info, instrument, warn};
use utils::id::{TenantId, TimelineId};
@@ -18,11 +16,9 @@ use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeMode, ComputeSpec};
use remote_storage::{GenericRemoteStorage, RemotePath};
use crate::config;
use crate::pg_helpers::*;
use crate::spec::*;
use crate::{config, extension_server};
/// Compute node info shared across several `compute_ctl` threads.
pub struct ComputeNode {
@@ -30,7 +26,6 @@ pub struct ComputeNode {
pub connstr: url::Url,
pub pgdata: String,
pub pgbin: String,
pub pgversion: String,
/// We should only allow live re- / configuration of the compute node if
/// it uses 'pull model', i.e. it can go to control-plane and fetch
/// the latest configuration. Otherwise, there could be a case:
@@ -50,10 +45,6 @@ pub struct ComputeNode {
pub state: Mutex<ComputeState>,
/// `Condvar` to allow notifying waiters about state changes.
pub state_changed: Condvar,
/// S3 extensions configuration variables
pub ext_remote_storage: Option<GenericRemoteStorage>,
pub available_libraries: OnceLock<HashMap<String, RemotePath>>,
pub available_extensions: OnceLock<HashMap<String, Vec<RemotePath>>>,
}
#[derive(Clone, Debug)]
@@ -332,22 +323,14 @@ impl ComputeNode {
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(
&self,
compute_state: &ComputeState,
extension_server_port: u16,
) -> Result<()> {
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let pgdata_path = Path::new(&self.pgdata);
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
config::write_postgres_conf(
&pgdata_path.join("postgresql.conf"),
&pspec.spec,
Some(extension_server_port),
)?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
// Syncing safekeepers is only safe with primary nodes: if a primary
// is already connected it will be kicked out, so a secondary (standby)
@@ -355,9 +338,13 @@ impl ComputeNode {
let lsn = match spec.mode {
ComputeMode::Primary => {
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
let lsn = if let Some(synced_lsn) = spec.skip_sync_safekeepers {
info!("no need to sync");
synced_lsn
} else {
self.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?
};
info!("safekeepers synced at LSN {}", lsn);
lsn
}
@@ -489,7 +476,7 @@ impl ComputeNode {
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;
@@ -519,7 +506,7 @@ impl ComputeNode {
}
#[instrument(skip(self))]
pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> {
pub fn start_compute(&self) -> Result<std::process::Child> {
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
info!(
@@ -530,9 +517,7 @@ impl ComputeNode {
pspec.timeline_id,
);
self.prepare_external_extensions(&compute_state)?;
self.prepare_pgdata(&compute_state, extension_server_port)?;
self.prepare_pgdata(&compute_state)?;
let start_time = Utc::now();
@@ -668,131 +653,4 @@ LIMIT 100",
"{{\"pg_stat_statements\": []}}".to_string()
}
}
// If remote extension storage is configured,
// download extension control files
// and shared preload libraries.
#[tokio::main]
pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> {
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
// download preload shared libraries before postgres start (if any)
let spec = &pspec.spec;
// 1. parse private extension paths from spec
let private_ext_prefixes = match &spec.private_extensions {
Some(private_extensions) => private_extensions.clone(),
None => Vec::new(),
};
info!("private_ext_prefixes: {:?}", &private_ext_prefixes);
// 2. parse shared_preload_libraries from spec
let mut libs_vec = Vec::new();
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
libs_vec = libs
.split(&[',', '\'', ' '])
.filter(|s| *s != "neon" && !s.is_empty())
.map(str::to_string)
.collect();
}
info!(
"shared_preload_libraries parsed from spec.cluster.settings: {:?}",
libs_vec
);
// also parse shared_preload_libraries from provided postgresql.conf
// that is used in neon_local and python tests
if let Some(conf) = &spec.cluster.postgresql_conf {
let conf_lines = conf.split('\n').collect::<Vec<&str>>();
let mut shared_preload_libraries_line = "";
for line in conf_lines {
if line.starts_with("shared_preload_libraries") {
shared_preload_libraries_line = line;
}
}
let mut preload_libs_vec = Vec::new();
if let Some(libs) = shared_preload_libraries_line.split("='").nth(1) {
preload_libs_vec = libs
.split(&[',', '\'', ' '])
.filter(|s| *s != "neon" && !s.is_empty())
.map(str::to_string)
.collect();
}
info!(
"shared_preload_libraries parsed from spec.cluster.postgresql_conf: {:?}",
preload_libs_vec
);
libs_vec.extend(preload_libs_vec);
}
// download extension control files & shared_preload_libraries
let available_extensions = extension_server::get_available_extensions(
ext_remote_storage,
&self.pgbin,
&self.pgversion,
&private_ext_prefixes,
)
.await?;
self.available_extensions
.set(available_extensions)
.expect("available_extensions.set error");
info!("Libraries to download: {:?}", &libs_vec);
let available_libraries = extension_server::get_available_libraries(
ext_remote_storage,
&self.pgbin,
&self.pgversion,
&private_ext_prefixes,
&libs_vec,
)
.await?;
self.available_libraries
.set(available_libraries)
.expect("available_libraries.set error");
}
Ok(())
}
pub async fn download_extension_sql_files(&self, filename: String) -> Result<()> {
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
extension_server::download_extension_sql_files(
&filename,
remote_storage,
&self.pgbin,
self.available_extensions
.get()
.context("available_extensions broke")?,
)
.await
}
}
}
pub async fn download_library_file(&self, filename: String) -> Result<()> {
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
extension_server::download_library_file(
&filename,
remote_storage,
&self.pgbin,
self.available_libraries
.get()
.context("available_libraries broke")?,
)
.await
}
}
}
}

View File

@@ -33,11 +33,7 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
}
/// Create or completely rewrite configuration file specified by `path`
pub fn write_postgres_conf(
path: &Path,
spec: &ComputeSpec,
extension_server_port: Option<u16>,
) -> Result<()> {
pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
// File::create() destroys the file content if it exists.
let mut file = File::create(path)?;
@@ -99,9 +95,5 @@ pub fn write_postgres_conf(
writeln!(file, "# Managed by compute_ctl: end")?;
}
if let Some(port) = extension_server_port {
writeln!(file, "neon.extension_server_port={}", port)?;
}
Ok(())
}

View File

@@ -1,379 +0,0 @@
// Download extension files from the extension store
// and put them in the right place in the postgres directory
use anyhow::{self, bail, Context, Result};
use remote_storage::*;
use serde_json::{self, Value};
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::{Path, PathBuf};
use std::str;
use tokio::io::AsyncReadExt;
use tracing::info;
const SHARE_EXT_PATH: &str = "share/postgresql/extension";
fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
// where argument is a flag like `--version` or `--sharedir`
let pgconfig = pgbin.replace("postgres", "pg_config");
let config_output = std::process::Command::new(pgconfig)
.arg(argument)
.output()
.expect("pg_config error");
std::str::from_utf8(&config_output.stdout)
.expect("pg_config error")
.trim()
.to_string()
}
pub fn get_pg_version(pgbin: &str) -> String {
// pg_config --version returns a (platform specific) human readable string
// such as "PostgreSQL 15.4". We parse this to v14/v15
let human_version = get_pg_config("--version", pgbin);
if human_version.contains("15") {
return "v15".to_string();
} else if human_version.contains("14") {
return "v14".to_string();
}
panic!("Unsuported postgres version {human_version}");
}
async fn download_helper(
remote_storage: &GenericRemoteStorage,
remote_from_path: &RemotePath,
remote_from_prefix: Option<&Path>,
download_location: &Path,
) -> anyhow::Result<()> {
// downloads file at remote_from_path to download_location/[file_name]
// we cannot use remote_from_path.object_name() here
// because extension files can be in subdirectories of the extension store.
//
// To handle this, we use remote_from_prefix to strip the prefix from the path
// this gives us the relative path of the file in the extension store,
// and we use this relative path to construct the local path.
//
let local_path = match remote_from_prefix {
Some(prefix) => {
let p = remote_from_path
.get_path()
.strip_prefix(prefix)
.expect("bad prefix");
download_location.join(p)
}
None => download_location.join(remote_from_path.object_name().expect("bad object")),
};
if local_path.exists() {
info!("File {:?} already exists. Skipping download", &local_path);
return Ok(());
}
info!(
"Downloading {:?} to location {:?}",
&remote_from_path, &local_path
);
let mut download = remote_storage.download(remote_from_path).await?;
let mut write_data_buffer = Vec::new();
download
.download_stream
.read_to_end(&mut write_data_buffer)
.await?;
if remote_from_prefix.is_some() {
if let Some(prefix) = local_path.parent() {
info!(
"Downloading file with prefix. Create directory {:?}",
prefix
);
// if directory already exists, this is a no-op
std::fs::create_dir_all(prefix)?;
}
}
let mut output_file = BufWriter::new(File::create(local_path)?);
output_file.write_all(&write_data_buffer)?;
Ok(())
}
// download extension control files
//
// if private_ext_prefixes is provided - search also in private extension paths
//
pub async fn get_available_extensions(
remote_storage: &GenericRemoteStorage,
pgbin: &str,
pg_version: &str,
private_ext_prefixes: &Vec<String>,
) -> anyhow::Result<HashMap<String, Vec<RemotePath>>> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let mut paths: Vec<RemotePath> = Vec::new();
// public extensions
paths.push(RemotePath::new(
&Path::new(pg_version).join(SHARE_EXT_PATH),
)?);
// private extensions
for private_prefix in private_ext_prefixes {
paths.push(RemotePath::new(
&Path::new(pg_version)
.join(private_prefix)
.join(SHARE_EXT_PATH),
)?);
}
let all_available_files = list_files_in_prefixes_for_extensions(remote_storage, &paths).await?;
info!(
"list of available_extension files {:?}",
&all_available_files
);
// download all control files
for (obj_name, obj_paths) in &all_available_files {
for obj_path in obj_paths {
if obj_name.ends_with("control") {
download_helper(remote_storage, obj_path, None, &local_sharedir).await?;
}
}
}
Ok(all_available_files)
}
// Download requested shared_preload_libraries
//
// Note that tenant_id is not optional here, because we only download libraries
// after we know the tenant spec and the tenant_id.
//
// return list of all library files to use it in the future searches
pub async fn get_available_libraries(
remote_storage: &GenericRemoteStorage,
pgbin: &str,
pg_version: &str,
private_ext_prefixes: &Vec<String>,
preload_libraries: &Vec<String>,
) -> anyhow::Result<HashMap<String, RemotePath>> {
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
// Construct a hashmap of all available libraries
// example (key, value) pair: test_lib0.so, v14/lib/test_lib0.so
let mut paths: Vec<RemotePath> = Vec::new();
// public libraries
paths.push(
RemotePath::new(&Path::new(&pg_version).join("lib/"))
.expect("The hard coded path here is valid"),
);
// private libraries
for private_prefix in private_ext_prefixes {
paths.push(
RemotePath::new(&Path::new(&pg_version).join(private_prefix).join("lib"))
.expect("The hard coded path here is valid"),
);
}
let all_available_libraries = list_files_in_prefixes(remote_storage, &paths).await?;
info!("list of library files {:?}", &all_available_libraries);
// download all requested libraries
for lib_name in preload_libraries {
// add file extension if it isn't in the filename
let lib_name_with_ext = enforce_so_end(lib_name);
info!("looking for library {:?}", &lib_name_with_ext);
match all_available_libraries.get(&*lib_name_with_ext) {
Some(remote_path) => {
download_helper(remote_storage, remote_path, None, &local_libdir).await?
}
None => {
let file_path = local_libdir.join(&lib_name_with_ext);
if file_path.exists() {
info!("File {:?} already exists. Skipping download", &file_path);
} else {
bail!("Shared library file {lib_name} is not found in the extension store")
}
}
}
}
Ok(all_available_libraries)
}
// download all sqlfiles (and possibly data files) for a given extension name
//
pub async fn download_extension_sql_files(
ext_name: &str,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
all_available_files: &HashMap<String, Vec<RemotePath>>,
) -> Result<()> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let mut downloaded_something = false;
if let Some(files) = all_available_files.get(ext_name) {
for file in files {
if file.extension().context("bad file name")? != "control" {
// find files prefix to handle cases when extension files are stored
// in a directory with the same name as the extension
// example:
// share/postgresql/extension/extension_name/extension_name--1.0.sql
let index = file
.get_path()
.to_str()
.context("invalid path")?
.find(ext_name)
.context("invalid path")?;
let prefix_str =
file.get_path().to_str().context("invalid path")?[..index].to_string();
let remote_from_prefix = if prefix_str.is_empty() {
None
} else {
Some(Path::new(&prefix_str))
};
download_helper(remote_storage, file, remote_from_prefix, &local_sharedir).await?;
downloaded_something = true;
}
}
}
if !downloaded_something {
bail!("Files for extension {ext_name} are not found in the extension store");
}
Ok(())
}
// appends an .so suffix to libname if it does not already have one
fn enforce_so_end(libname: &str) -> String {
if !libname.ends_with(".so") {
format!("{}.so", libname)
} else {
libname.to_string()
}
}
// download shared library file
pub async fn download_library_file(
lib_name: &str,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
all_available_libraries: &HashMap<String, RemotePath>,
) -> Result<()> {
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
let lib_name_with_ext = enforce_so_end(lib_name);
info!("looking for library {:?}", &lib_name_with_ext);
match all_available_libraries.get(&*lib_name_with_ext) {
Some(remote_path) => {
download_helper(remote_storage, remote_path, None, &local_libdir).await?
}
None => bail!("Shared library file {lib_name} is not found in the extension store"),
}
Ok(())
}
pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result<GenericRemoteStorage> {
let remote_ext_config: serde_json::Value = serde_json::from_str(remote_ext_config)?;
let remote_ext_bucket = match &remote_ext_config["bucket"] {
Value::String(x) => x,
_ => bail!("remote_ext_config missing bucket"),
};
let remote_ext_region = match &remote_ext_config["region"] {
Value::String(x) => x,
_ => bail!("remote_ext_config missing region"),
};
let remote_ext_endpoint = match &remote_ext_config["endpoint"] {
Value::String(x) => Some(x.clone()),
_ => None,
};
let remote_ext_prefix = match &remote_ext_config["prefix"] {
Value::String(x) => Some(x.clone()),
_ => None,
};
// load will not be large, so default parameters are fine
let config = S3Config {
bucket_name: remote_ext_bucket.to_string(),
bucket_region: remote_ext_region.to_string(),
prefix_in_bucket: remote_ext_prefix,
endpoint: remote_ext_endpoint,
concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
max_keys_per_list_response: None,
};
let config = RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"),
max_sync_errors: NonZeroU32::new(100).expect("100 != 0"),
storage: RemoteStorageKind::AwsS3(config),
};
GenericRemoteStorage::from_config(&config)
}
// helper to collect all files in the given prefixes
// returns hashmap of (file_name, file_remote_path)
async fn list_files_in_prefixes(
remote_storage: &GenericRemoteStorage,
paths: &Vec<RemotePath>,
) -> Result<HashMap<String, RemotePath>> {
let mut res = HashMap::new();
for path in paths {
for file in remote_storage.list_files(Some(path)).await? {
res.insert(
file.object_name().expect("bad object").to_owned(),
file.to_owned(),
);
}
}
Ok(res)
}
// helper to extract extension name
// extension files can be in subdirectories of the extension store.
// examples of layout:
//
// share/postgresql/extension/extension_name--1.0.sql
//
// or
//
// share/postgresql/extension/extension_name/extension_name--1.0.sql
// share/postgresql/extension/extension_name/extra_data.csv
//
// Note: we **assume** that the extension files is in one of these formats.
// If it is not, this code will not download it.
fn get_ext_name(path: &str) -> Result<&str> {
let path_suffix: Vec<&str> = path.split(&format!("{SHARE_EXT_PATH}/")).collect();
let path_suffix = path_suffix.last().expect("bad ext name");
// the order of these is important
// otherwise we'll return incorrect extension name
// for path like share/postgresql/extension/extension_name/extension_name--1.0.sql
for index in ["/", "--"] {
if let Some(index) = path_suffix.find(index) {
return Ok(&path_suffix[..index]);
}
}
Ok(path_suffix)
}
// helper to collect files of given prefixes for extensions
// and group them by extension
// returns a hashmap of (extension_name, Vector of remote paths for all files needed for this extension)
async fn list_files_in_prefixes_for_extensions(
remote_storage: &GenericRemoteStorage,
paths: &Vec<RemotePath>,
) -> Result<HashMap<String, Vec<RemotePath>>> {
let mut result = HashMap::new();
for path in paths {
for file in remote_storage.list_files(Some(path)).await? {
let file_ext_name = get_ext_name(file.get_path().to_str().context("invalid path")?)?;
let ext_file_list = result
.entry(file_ext_name.to_string())
.or_insert(Vec::new());
ext_file_list.push(file.to_owned());
}
}
Ok(result)
}

View File

@@ -121,62 +121,8 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
// download extension files from S3 on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
info!("req.uri {:?}", req.uri());
let mut is_library = false;
if let Some(params) = req.uri().query() {
info!("serving {:?} POST request with params: {}", route, params);
if params == "is_library=true" {
is_library = true;
} else {
let mut resp = Response::new(Body::from("Wrong request parameters"));
*resp.status_mut() = StatusCode::BAD_REQUEST;
return resp;
}
}
let filename = route.split('/').last().unwrap().to_string();
info!(
"serving /extension_server POST request, filename: {:?} is_library: {}",
filename, is_library
);
if is_library {
match compute.download_library_file(filename.to_string()).await {
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("library download failed: {}", e);
let mut resp = Response::new(Body::from(e.to_string()));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
resp
}
}
} else {
match compute
.download_extension_sql_files(filename.to_string())
.await
{
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("extension download failed: {}", e);
let mut resp = Response::new(Body::from(e.to_string()));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
resp
}
}
}
}
// Return the `404 Not Found` for any other routes.
method => {
info!("404 Not Found for {:?}", method);
_ => {
let mut not_found = Response::new(Body::from("404 Not Found"));
*not_found.status_mut() = StatusCode::NOT_FOUND;
not_found

View File

@@ -139,34 +139,6 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/extension_server:
post:
tags:
- Extension
summary: Download extension from S3 to local folder.
description: ""
operationId: downloadExtension
responses:
200:
description: Extension downloaded
content:
text/plain:
schema:
type: string
description: Error text or 'OK' if download succeeded.
example: "OK"
400:
description: Request is invalid.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
500:
description: Extension download request failed.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
components:
securitySchemes:

View File

@@ -9,7 +9,6 @@ pub mod http;
#[macro_use]
pub mod logger;
pub mod compute;
pub mod extension_server;
pub mod monitor;
pub mod params;
pub mod pg_helpers;

View File

@@ -124,7 +124,7 @@ pub fn get_spec_from_control_plane(
pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
// File `postgresql.conf` is no longer included into `basebackup`, so just
// always write all config into it creating new file.
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?;
update_pg_hba(pgdata_path)?;

View File

@@ -32,4 +32,3 @@ utils.workspace = true
compute_api.workspace = true
workspace_hack.workspace = true
tracing.workspace = true

View File

@@ -657,8 +657,6 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
// If --safekeepers argument is given, use only the listed safekeeper nodes.
let safekeepers =
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
@@ -700,7 +698,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
_ => {}
}
println!("Starting existing endpoint {endpoint_id}...");
endpoint.start(&auth_token, safekeepers, remote_ext_config)?;
endpoint.start(&auth_token, safekeepers)?;
} else {
let branch_name = sub_args
.get_one::<String>("branch-name")
@@ -744,7 +742,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
pg_version,
mode,
)?;
ep.start(&auth_token, safekeepers, remote_ext_config)?;
ep.start(&auth_token, safekeepers)?;
}
}
"stop" => {
@@ -1004,12 +1002,6 @@ fn cli() -> Command {
.help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
.required(false);
let remote_ext_config_args = Arg::new("remote-ext-config")
.long("remote-ext-config")
.num_args(1)
.help("Configure the S3 bucket that we search for extensions in.")
.required(false);
let lsn_arg = Arg::new("lsn")
.long("lsn")
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
@@ -1160,7 +1152,6 @@ fn cli() -> Command {
.arg(pg_version_arg)
.arg(hot_standby_arg)
.arg(safekeepers_arg)
.arg(remote_ext_config_args)
)
.subcommand(
Command::new("stop")

View File

@@ -68,6 +68,7 @@ pub struct EndpointConf {
http_port: u16,
pg_version: u32,
skip_pg_catalog_updates: bool,
skip_sync_safekeepers: Option<utils::lsn::Lsn>,
}
//
@@ -137,6 +138,7 @@ impl ComputeControlPlane {
tenant_id,
pg_version,
skip_pg_catalog_updates: false,
skip_sync_safekeepers: None,
});
ep.create_endpoint_dir()?;
@@ -151,6 +153,7 @@ impl ComputeControlPlane {
pg_port,
pg_version,
skip_pg_catalog_updates: false,
skip_sync_safekeepers: None,
})?,
)?;
std::fs::write(
@@ -189,6 +192,7 @@ pub struct Endpoint {
// Optimizations
skip_pg_catalog_updates: bool,
skip_sync_safekeepers: Option<utils::lsn::Lsn>,
}
impl Endpoint {
@@ -223,6 +227,7 @@ impl Endpoint {
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
skip_sync_safekeepers: conf.skip_sync_safekeepers,
})
}
@@ -311,7 +316,7 @@ impl Endpoint {
// TODO: use future host field from safekeeper spec
// Pass the list of safekeepers to the replica so that it can connect to any of them,
// whichever is available.
// whichever is availiable.
let sk_ports = self
.env
.safekeepers
@@ -408,12 +413,7 @@ impl Endpoint {
Ok(())
}
pub fn start(
&self,
auth_token: &Option<String>,
safekeepers: Vec<NodeId>,
remote_ext_config: Option<&String>,
) -> Result<()> {
pub fn start(&self, auth_token: &Option<String>, safekeepers: Vec<NodeId>) -> Result<()> {
if self.status() == "running" {
anyhow::bail!("The endpoint is already running");
}
@@ -462,6 +462,7 @@ impl Endpoint {
// Create spec file
let spec = ComputeSpec {
skip_sync_safekeepers: self.skip_sync_safekeepers,
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
format_version: 1.0,
operation_uuid: None,
@@ -481,7 +482,6 @@ impl Endpoint {
pageserver_connstring: Some(pageserver_connstring),
safekeeper_connstrings,
storage_auth_token: auth_token.clone(),
private_extensions: Some(vec![self.tenant_id.to_string()]), //DEBUG ONLY
};
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
@@ -513,9 +513,6 @@ impl Endpoint {
.stdin(std::process::Stdio::null())
.stderr(logfile.try_clone()?)
.stdout(logfile);
if let Some(remote_ext_config) = remote_ext_config {
cmd.args(["--remote-ext-config", remote_ext_config]);
}
let _child = cmd.spawn()?;
// Wait for it to start

View File

@@ -1,301 +0,0 @@
# Supporting custom user Extensions
Created 2023-05-03
## Motivation
There are many extensions in the PostgreSQL ecosystem, and not all extensions
are of a quality that we can confidently support them. Additionally, our
current extension inclusion mechanism has several problems because we build all
extensions into the primary Compute image: We build the extensions every time
we build the compute image regardless of whether we actually need to rebuild
the image, and the inclusion of these extensions in the image adds a hard
dependency on all supported extensions - thus increasing the image size, and
with it the time it takes to download that image - increasing first start
latency.
This RFC proposes a dynamic loading mechanism that solves most of these
problems.
## Summary
`compute_ctl` is made responsible for loading extensions on-demand into
the container's file system for dynamically loaded extensions, and will also
make sure that the extensions in `shared_preload_libraries` are downloaded
before the compute node starts.
## Components
compute_ctl, PostgreSQL, neon (extension), Compute Host Node, Extension Store
## Requirements
Compute nodes with no extra extensions should not be negatively impacted by
the existence of support for many extensions.
Installing an extension into PostgreSQL should be easy.
Non-preloaded extensions shouldn't impact startup latency.
Uninstalled extensions shouldn't impact query latency.
A small latency penalty for dynamically loaded extensions is acceptable in
the first seconds of compute startup, but not in steady-state operations.
## Proposed implementation
### On-demand, JIT-loading of extensions
TLDR; we download extensions as soon as we need them, or when we have spare
time.
That means, we first download the extensions required to start the PostMaster
(`shared_preload_libraries` and their dependencies), then the libraries required
before a backend can start processing user input (`preload_libraries` and
dependencies), and then (with network limits applied) the remainder of the
configured extensions, with prioritization for installed extensions.
If PostgreSQL tries to load a library that is not yet fully on disk, it will
ask `compute_ctl` first if the extension has been downloaded yet, and will wait
for `compute_ctl` to finish downloading that extension. `compute_ctl` will
prioritize downloading that extension over other extensions that were not yet
requested.
#### Workflow
```mermaid
sequenceDiagram
autonumber
participant EX as External (control plane, ...)
participant CTL as compute_ctl
participant ST as extension store
actor PG as PostgreSQL
EX ->>+ CTL: Start compute with config X
note over CTL: The configuration contains a list of all <br/>extensions available to that compute node, etc.
par Optionally parallel or concurrent
loop Available extensions
CTL ->>+ ST: Download control file of extension
activate CTL
ST ->>- CTL: Finish downloading control file
CTL ->>- CTL: Put control file in extensions directory
end
loop For each extension in shared_preload_libraries
CTL ->>+ ST: Download extension's data
activate CTL
ST ->>- CTL: Finish downloading
CTL ->>- CTL: Put extension's files in the right place
end
end
CTL ->>+ PG: Start PostgreSQL
note over CTL: PostgreSQL can now start accepting <br/>connections. However, users may still need to wait <br/>for preload_libraries extensions to get downloaded.
par Load preload_libraries
loop For each extension in preload_libraries
CTL ->>+ ST: Download extension's data
activate CTL
ST ->>- CTL: Finish downloading
CTL ->>- CTL: Put extension's files in the right place
end
end
note over CTL: After this, connections don't have any hard <br/>waits for extension files left, except for those <br/>connections that override preload_libraries <br/>in their startup packet
par PG's internal_load_library(library)
alt Library is not yet loaded
PG ->>+ CTL: Load library X
CTL ->>+ ST: Download the extension that provides X
ST ->>- CTL: Finish downloading
CTL ->> CTL: Put extension's files in the right place
CTL ->>- PG: Ready
else Library is already loaded
note over PG: No-op
end
and Download all remaining extensions
loop Extension X
CTL ->>+ ST: Download not-yet-downloaded extension X
activate CTL
ST ->>- CTL: Finish downloading
CTL ->>- CTL: Put extension's files in the right place
end
end
deactivate PG
deactivate CTL
```
#### Summary
Pros:
- Startup is only as slow as it takes to load all (shared_)preload_libraries
- Supports BYO Extension
Cons:
- O(sizeof(extensions)) IO requirement for loading all extensions.
### Alternative solutions
1. Allow users to add their extensions to the base image
Pros:
- Easy to deploy
Cons:
- Doesn't scale - first start size is dependent on image size;
- All extensions are shared across all users: It doesn't allow users to
bring their own restrictive-licensed extensions
2. Bring Your Own compute image
Pros:
- Still easy to deploy
- User can bring own patched version of PostgreSQL
Cons:
- First start latency is O(sizeof(extensions image))
- Warm instance pool for skipping pod schedule latency is not feasible with
O(n) custom images
- Support channels are difficult to manage
3. Download all user extensions in bulk on compute start
Pros:
- Easy to deploy
- No startup latency issues for "clean" users.
- Warm instance pool for skipping pod schedule latency is possible
Cons:
- Downloading all extensions in advance takes a lot of time, thus startup
latency issues
4. Store user's extensions in persistent storage
Pros:
- Easy to deploy
- No startup latency issues
- Warm instance pool for skipping pod schedule latency is possible
Cons:
- EC2 instances have only limited number of attachments shared between EBS
volumes, direct-attached NVMe drives, and ENIs.
- Compute instance migration isn't trivially solved for EBS mounts (e.g.
the device is unavailable whilst moving the mount between instances).
- EBS can only mount on one instance at a time (except the expensive IO2
device type).
5. Store user's extensions in network drive
Pros:
- Easy to deploy
- Few startup latency issues
- Warm instance pool for skipping pod schedule latency is possible
Cons:
- We'd need networked drives, and a lot of them, which would store many
duplicate extensions.
- **UNCHECKED:** Compute instance migration may not work nicely with
networked IOs
### Idea extensions
The extension store does not have to be S3 directly, but could be a Node-local
caching service on top of S3. This would reduce the load on the network for
popular extensions.
## Extension Store implementation
Extension Store in our case is a private S3 bucket.
Extensions are stored as tarballs in the bucket. The tarball contains the extension's control file and all the files that the extension needs to run.
We may also store the control file separately from the tarball to speed up the extension loading.
`s3://<the-bucket>/extensions/ext-name/sha-256+1234abcd1234abcd1234abcd1234abcd/bundle.tar`
where `ext-name` is an extension name and `sha-256+1234abcd1234abcd1234abcd1234abcd` is a hash of a specific extension version tarball.
To ensure security, there is no direct access to the S3 bucket from compute node.
Control plane forms a list of extensions available to the compute node
and forms a short-lived [pre-signed URL](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ShareObjectPreSignedURL.html)
for each extension that is available to the compute node.
so, `compute_ctl` receives spec in the following format
```
"extensions": [{
"meta_format": 1,
"extension_name": "postgis",
"link": "https://<the-bucket>/extensions/sha-256+1234abcd1234abcd1234abcd1234abcd/bundle.tar?AWSAccessKeyId=1234abcd1234abcd1234abcd1234abcd&Expires=1234567890&Signature=1234abcd1234abcd1234abcd1234abcd",
...
}]
```
`compute_ctl` then downloads the extension from the link and unpacks it to the right place.
### How do we handle private extensions?
Private and public extensions are treated equally from the Extension Store perspective.
The only difference is that the private extensions are not listed in the user UI (managed by control plane).
### How to add new extension to the Extension Store?
Since we need to verify that the extension is compatible with the compute node and doesn't contain any malicious code,
we need to review the extension before adding it to the Extension Store.
I do not expect that we will have a lot of extensions to review, so we can do it manually for now.
Some admin UI may be added later to automate this process.
The list of extensions available to a compute node is stored in the console database.
### How is the list of available extensions managed?
We need to add new tables to the console database to store the list of available extensions, their versions and access rights.
something like this:
```
CREATE TABLE extensions (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
version VARCHAR(255) NOT NULL,
hash VARCHAR(255) NOT NULL, // this is the path to the extension in the Extension Store
supported_postgres_versions integer[] NOT NULL,
is_public BOOLEAN NOT NULL, // public extensions are available to all users
is_shared_preload BOOLEAN NOT NULL, // these extensions require postgres restart
is_preload BOOLEAN NOT NULL,
license VARCHAR(255) NOT NULL,
);
CREATE TABLE user_extensions (
user_id INTEGER NOT NULL,
extension_id INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (id),
FOREIGN KEY (extension_id) REFERENCES extensions (id)
);
```
When new extension is added to the Extension Store, we add a new record to the table and set permissions.
In UI, user may select the extensions that they want to use with their compute node.
NOTE: Extensions that require postgres restart will not be available until the next compute restart.
Also, currently user cannot force postgres restart. We should add this feature later.
For other extensions, we must communicate updates to `compute_ctl` and they will be downloaded in the background.
### How can user update the extension?
User can update the extension by selecting the new version of the extension in the UI.
### Alternatives
For extensions written on trusted languages we can also adopt
`dbdev` PostgreSQL Package Manager based on `pg_tle` by Supabase.
This will increase the amount supported extensions and decrease the amount of work required to support them.

View File

@@ -33,6 +33,15 @@ pub struct ComputeSpec {
#[serde(default)] // Default false
pub skip_pg_catalog_updates: bool,
/// An optinal hint that can be passed to speed up startup time if we know
/// that safekeepers have already been synced at the given LSN.
///
/// NOTE: If there's any possibility that the safekeepers could have advanced
/// (e.g. if we started compute, and it crashed) we should stay on the
/// safe side and provide None.
#[serde(default)]
pub skip_sync_safekeepers: Option<Lsn>,
// Information needed to connect to the storage layer.
//
// `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.
@@ -60,8 +69,6 @@ pub struct ComputeSpec {
/// If set, 'storage_auth_token' is used as the password to authenticate to
/// the pageserver and safekeepers.
pub storage_auth_token: Option<String>,
pub private_extensions: Option<Vec<String>>,
}
#[serde_as]

View File

@@ -184,20 +184,6 @@ pub enum GenericRemoteStorage {
}
impl GenericRemoteStorage {
// A function for listing all the files in a "directory"
// Example:
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
match self {
Self::LocalFs(s) => s.list_files(folder).await,
Self::AwsS3(s) => s.list_files(folder).await,
Self::Unreliable(s) => s.list_files(folder).await,
}
}
// lists common *prefixes*, if any of files
// Example:
// list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
pub async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
@@ -209,6 +195,14 @@ impl GenericRemoteStorage {
}
}
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
match self {
Self::LocalFs(s) => s.list_files(folder).await,
Self::AwsS3(s) => s.list_files(folder).await,
Self::Unreliable(s) => s.list_files(folder).await,
}
}
pub async fn upload(
&self,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,

View File

@@ -349,17 +349,10 @@ impl RemoteStorage for S3Bucket {
/// See the doc for `RemoteStorage::list_files`
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let mut folder_name = folder
let folder_name = folder
.map(|p| self.relative_path_to_s3_object(p))
.or_else(|| self.prefix_in_bucket.clone());
// remove leading "/" if one exists
if let Some(folder_name_slash) = folder_name.clone() {
if folder_name_slash.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
folder_name = Some(folder_name_slash[1..].to_string());
}
}
// AWS may need to break the response into several parts
let mut continuation_token = None;
let mut all_files = vec![];

View File

@@ -173,10 +173,15 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
let path2 = RemotePath::new(&PathBuf::from(format!("{}/path2", ctx.base_prefix,)))
.with_context(|| "RemotePath conversion")?;
let path3 = RemotePath::new(&PathBuf::from(format!("{}/path3", ctx.base_prefix,)))
.with_context(|| "RemotePath conversion")?;
let data1 = "remote blob data1".as_bytes();
let data1_len = data1.len();
let data2 = "remote blob data2".as_bytes();
let data2_len = data2.len();
let data3 = "remote blob data3".as_bytes();
let data3_len = data3.len();
ctx.client
.upload(std::io::Cursor::new(data1), data1_len, &path1, None)
.await?;
@@ -185,8 +190,18 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
.upload(std::io::Cursor::new(data2), data2_len, &path2, None)
.await?;
ctx.client
.upload(std::io::Cursor::new(data3), data3_len, &path3, None)
.await?;
ctx.client.delete_objects(&[path1, path2]).await?;
let prefixes = ctx.client.list_prefixes(None).await?;
assert_eq!(prefixes.len(), 1);
ctx.client.delete_objects(&[path3]).await?;
Ok(())
}

View File

@@ -186,10 +186,8 @@ paths:
schema:
$ref: "#/components/schemas/Error"
delete:
description: "Attempts to delete specified timeline. On 500 errors should be retried"
description: "Attempts to delete specified timeline. 500 and 409 errors should be retried"
responses:
"200":
description: Ok
"400":
description: Error when no tenant id found in path or no timeline id
content:
@@ -214,6 +212,12 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/NotFoundError"
"409":
description: Deletion is already in progress, continue polling
content:
application/json:
schema:
$ref: "#/components/schemas/ConflictError"
"412":
description: Tenant is missing, or timeline has children
content:

View File

@@ -187,6 +187,7 @@ impl From<crate::tenant::DeleteTimelineError> for ApiError {
format!("Cannot delete timeline which has child timelines: {children:?}")
.into_boxed_str(),
),
a @ AlreadyInProgress => ApiError::Conflict(a.to_string()),
Other(e) => ApiError::InternalServerError(e),
}
}

View File

@@ -440,8 +440,13 @@ pub enum GetTimelineError {
pub enum DeleteTimelineError {
#[error("NotFound")]
NotFound,
#[error("HasChildren")]
HasChildren(Vec<TimelineId>),
#[error("Timeline deletion is already in progress")]
AlreadyInProgress,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
@@ -1755,14 +1760,11 @@ impl Tenant {
timeline = Arc::clone(timeline_entry.get());
// Prevent two tasks from trying to delete the timeline at the same time.
delete_lock_guard =
DeletionGuard(Arc::clone(&timeline.delete_lock).try_lock_owned().map_err(
|_| {
DeleteTimelineError::Other(anyhow::anyhow!(
"timeline deletion is already in progress"
))
},
)?);
delete_lock_guard = DeletionGuard(
Arc::clone(&timeline.delete_lock)
.try_lock_owned()
.map_err(|_| DeleteTimelineError::AlreadyInProgress)?,
);
// If another task finished the deletion just before we acquired the lock,
// return success.

View File

@@ -862,10 +862,8 @@ impl RemoteTimelineClient {
"Found {} files not bound to index_file.json, proceeding with their deletion",
remaining.len()
);
for file in remaining {
warn!("Removing {}", file.object_name().unwrap_or_default());
self.storage_impl.delete(&file).await?;
}
warn!("About to remove {} files", remaining.len());
self.storage_impl.delete_objects(&remaining).await?;
}
let index_file_path = timeline_storage_path.join(Path::new(IndexPart::FILE_NAME));

View File

@@ -12,7 +12,7 @@ use crate::context::RequestContext;
use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::{Context, Result};
use anyhow::Result;
use bytes::Bytes;
use enum_map::EnumMap;
use enumset::EnumSet;
@@ -343,8 +343,7 @@ impl LayerAccessStats {
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
/// timeline names, because those are known in the context of which the layers
/// are used in (timeline).
#[async_trait::async_trait]
pub trait Layer: std::fmt::Debug + Send + Sync + 'static {
pub trait Layer: std::fmt::Debug + Send + Sync {
/// Range of keys that this layer covers
fn get_key_range(&self) -> Range<Key>;
@@ -374,42 +373,13 @@ pub trait Layer: std::fmt::Debug + Send + Sync + 'static {
/// is available. If this returns ValueReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)>;
/// CANCEL SAFETY: if the returned future is dropped,
/// the wrapped closure still run to completion and the return value discarded.
/// For the case of get_value_reconstruct_data, we expect the closure to not
/// have any side effects, as it only attempts to read a layer (and stuff like
/// page cache isn't considered a real side effect).
/// But, ...
/// TRACING:
/// If the returned future is cancelled, the spawn_blocking span can outlive
/// the caller's span.
/// So, technically, we should be using `parent: None` and `follows_from: current`
/// instead. However, in practice, the advantage of maintaining the span stack
/// in logs outweighs the disadvantage of having a dangling span in a case that
/// is not expected to happen because in pageserver we generally don't drop pending futures.
async fn get_value_reconstruct_data(
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
let span = tracing::info_span!("get_value_reconstruct_data_spawn_blocking");
tokio::task::spawn_blocking(move || {
let _enter = span.enter();
self.get_value_reconstruct_data_blocking(key, lsn_range, reconstruct_data, ctx)
})
.await
.context("spawn_blocking")?
}
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult>;
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
fn short_id(&self) -> String;
@@ -529,7 +499,6 @@ impl LayerDescriptor {
}
}
#[async_trait::async_trait]
impl Layer for LayerDescriptor {
fn get_key_range(&self) -> Range<Key> {
self.key.clone()
@@ -543,13 +512,13 @@ impl Layer for LayerDescriptor {
self.is_incremental
}
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: ValueReconstructState,
_ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}

View File

@@ -218,7 +218,6 @@ impl std::fmt::Debug for DeltaLayerInner {
}
}
#[async_trait::async_trait]
impl Layer for DeltaLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
@@ -295,13 +294,13 @@ impl Layer for DeltaLayer {
Ok(())
}
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.desc.lsn_range.start);
let mut need_image = true;
@@ -309,7 +308,7 @@ impl Layer for DeltaLayer {
{
// Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
// Scan the page versions backwards, starting from `lsn`.
let file = &inner.file;
@@ -375,9 +374,9 @@ impl Layer for DeltaLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
Ok(ValueReconstructResult::Continue)
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
Ok(ValueReconstructResult::Complete)
}
}

View File

@@ -149,7 +149,6 @@ impl std::fmt::Debug for ImageLayerInner {
}
}
#[async_trait::async_trait]
impl Layer for ImageLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
@@ -182,18 +181,18 @@ impl Layer for ImageLayer {
}
/// Look up given page in the file
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
assert!(self.desc.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
@@ -211,9 +210,9 @@ impl Layer for ImageLayer {
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok((reconstruct_state, ValueReconstructResult::Complete))
Ok(ValueReconstructResult::Complete)
} else {
Ok((reconstruct_state, ValueReconstructResult::Missing))
Ok(ValueReconstructResult::Missing)
}
}

View File

@@ -110,7 +110,6 @@ impl InMemoryLayer {
}
}
#[async_trait::async_trait]
impl Layer for InMemoryLayer {
fn get_key_range(&self) -> Range<Key> {
Key::MIN..Key::MAX
@@ -191,13 +190,13 @@ impl Layer for InMemoryLayer {
}
/// Look up given value in the layer.
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
mut reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
@@ -214,7 +213,7 @@ impl Layer for InMemoryLayer {
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok((reconstruct_state, ValueReconstructResult::Complete));
return Ok(ValueReconstructResult::Complete);
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
@@ -234,9 +233,9 @@ impl Layer for InMemoryLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok((reconstruct_state, ValueReconstructResult::Continue))
Ok(ValueReconstructResult::Continue)
} else {
Ok((reconstruct_state, ValueReconstructResult::Complete))
Ok(ValueReconstructResult::Complete)
}
}
}

View File

@@ -6,7 +6,7 @@ use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::layer_map::BatchedUpdates;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructState};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use anyhow::{bail, Result};
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
@@ -21,7 +21,7 @@ use utils::{
use super::filename::{DeltaFileName, ImageFileName};
use super::{
DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, ValueReconstructResult,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
};
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
@@ -63,15 +63,14 @@ impl std::fmt::Debug for RemoteLayer {
}
}
#[async_trait::async_trait]
impl Layer for RemoteLayer {
fn get_value_reconstruct_data_blocking(
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
_reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
bail!(
"layer {} needs to be downloaded",
self.filename().file_name()

View File

@@ -555,14 +555,13 @@ impl Timeline {
None => None,
};
let reconstruct_state = ValueReconstructState {
let mut reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: cached_page_img,
};
let timer = self.metrics.get_reconstruct_data_time_histo.start_timer();
let reconstruct_state = self
.get_reconstruct_data(key, lsn, reconstruct_state, ctx)
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
.await?;
timer.stop_and_record();
@@ -2353,9 +2352,9 @@ impl Timeline {
&self,
key: Key,
request_lsn: Lsn,
mut reconstruct_state: ValueReconstructState,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructState, PageReconstructError> {
) -> Result<(), PageReconstructError> {
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;
@@ -2385,12 +2384,12 @@ impl Timeline {
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
ValueReconstructResult::Complete => return Ok(reconstruct_state),
ValueReconstructResult::Complete => return Ok(()),
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
return Ok(reconstruct_state);
return Ok(());
}
if prev_lsn <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
@@ -2494,19 +2493,13 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match Arc::clone(open_layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
result = match open_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -2527,19 +2520,13 @@ impl Timeline {
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match Arc::clone(frozen_layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
result = match frozen_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -2568,19 +2555,13 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match Arc::clone(&layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
result = match layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;

View File

@@ -4,7 +4,6 @@
MODULE_big = neon
OBJS = \
$(WIN32RES) \
extension_server.o \
file_cache.o \
libpagestore.o \
libpqwalproposer.o \

View File

@@ -1,103 +0,0 @@
/*-------------------------------------------------------------------------
*
* extension_server.c
* Request compute_ctl to download extension files.
*
* IDENTIFICATION
* contrib/neon/extension_server.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "access/xact.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "commands/defrem.h"
#include "miscadmin.h"
#include "utils/acl.h"
#include "fmgr.h"
#include "utils/guc.h"
#include "port.h"
#include "fmgr.h"
#include <curl/curl.h>
static int extension_server_port = 0;
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
// to download all SQL files for an extension:
// curl -X POST http://localhost:8080/extension_server/postgis
//
// to download specific library file:
// curl -X POST http://localhost:8080/extension_server/postgis-3.so?=true
static bool
neon_download_extension_file_http(const char *filename, bool is_library)
{
CURL *curl;
CURLcode res;
char *compute_ctl_url;
char *postdata;
bool ret = false;
if ((curl = curl_easy_init()) == NULL)
{
elog(ERROR, "Failed to initialize curl handle");
}
if (is_library)
{
elog(LOG, "request library");
}
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",
extension_server_port, filename, is_library?"?is_library=true":"");
elog(LOG, "curl_easy_perform() url: %s", compute_ctl_url);
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */);
if (curl)
{
/* Perform the request, res will get the return code */
res = curl_easy_perform(curl);
/* Check for errors */
if (res == CURLE_OK)
{
elog(LOG, "curl_easy_perform() succeeded");
ret = true;
}
else
{
elog(WARNING, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
}
/* always cleanup */
curl_easy_cleanup(curl);
}
return ret;
}
void pg_init_extension_server()
{
DefineCustomIntVariable("neon.extension_server_port",
"connection string to the compute_ctl",
NULL,
&extension_server_port,
0, 0, INT_MAX,
PGC_POSTMASTER,
0, /* no flags required */
NULL, NULL, NULL);
// set download_extension_file_hook
prev_download_extension_file_hook = download_extension_file_hook;
download_extension_file_hook = neon_download_extension_file_http;
}

View File

@@ -1 +0,0 @@

View File

@@ -35,11 +35,8 @@ _PG_init(void)
{
pg_init_libpagestore();
pg_init_walproposer();
InitControlPlaneConnector();
pg_init_extension_server();
// Important: This must happen after other parts of the extension
// are loaded, otherwise any settings to GUCs that were set before
// the extension was loaded will be removed.

View File

@@ -21,8 +21,6 @@ extern char *neon_tenant;
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);
extern void pg_init_extension_server(void);
/*
* Returns true if we shouldn't do REDO on that block in record indicated by
* block_id; false otherwise.

View File

@@ -534,16 +534,6 @@ class S3Storage:
"AWS_SECRET_ACCESS_KEY": self.secret_key,
}
def to_string(self) -> str:
return json.dumps(
{
"bucket": self.bucket_name,
"region": self.bucket_region,
"endpoint": self.endpoint,
"prefix": self.prefix_in_bucket,
}
)
RemoteStorage = Union[LocalFsStorage, S3Storage]
@@ -610,12 +600,10 @@ class NeonEnvBuilder:
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
self.remote_storage = remote_storage
self.ext_remote_storage: Optional[S3Storage] = None
self.remote_storage_client: Optional[Any] = None
self.remote_storage_users = remote_storage_users
self.broker = broker
self.run_id = run_id
self.mock_s3_server: MockS3Server = mock_s3_server
self.mock_s3_server = mock_s3_server
self.pageserver_config_override = pageserver_config_override
self.num_safekeepers = num_safekeepers
self.safekeepers_id_start = safekeepers_id_start
@@ -663,24 +651,15 @@ class NeonEnvBuilder:
remote_storage_kind: RemoteStorageKind,
test_name: str,
force_enable: bool = True,
enable_remote_extensions: bool = False,
):
if remote_storage_kind == RemoteStorageKind.NOOP:
return
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
self.enable_local_fs_remote_storage(force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
self.enable_mock_s3_remote_storage(
bucket_name=test_name,
force_enable=force_enable,
enable_remote_extensions=enable_remote_extensions,
)
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.REAL_S3:
self.enable_real_s3_remote_storage(
test_name=test_name,
force_enable=force_enable,
enable_remote_extensions=enable_remote_extensions,
)
self.enable_real_s3_remote_storage(test_name=test_name, force_enable=force_enable)
else:
raise RuntimeError(f"Unknown storage type: {remote_storage_kind}")
@@ -694,15 +673,11 @@ class NeonEnvBuilder:
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
self.remote_storage = LocalFsStorage(Path(self.repo_dir / "local_fs_remote_storage"))
def enable_mock_s3_remote_storage(
self, bucket_name: str, force_enable: bool = True, enable_remote_extensions: bool = False
):
def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable: bool = True):
"""
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
Starts up the mock server, if that does not run yet.
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
Also creates the bucket for extensions, self.ext_remote_storage bucket
"""
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
mock_endpoint = self.mock_s3_server.endpoint()
@@ -723,22 +698,9 @@ class NeonEnvBuilder:
bucket_region=mock_region,
access_key=self.mock_s3_server.access_key(),
secret_key=self.mock_s3_server.secret_key(),
prefix_in_bucket="pageserver",
)
if enable_remote_extensions:
self.ext_remote_storage = S3Storage(
bucket_name=bucket_name,
endpoint=mock_endpoint,
bucket_region=mock_region,
access_key=self.mock_s3_server.access_key(),
secret_key=self.mock_s3_server.secret_key(),
prefix_in_bucket="ext",
)
def enable_real_s3_remote_storage(
self, test_name: str, force_enable: bool = True, enable_remote_extensions: bool = False
):
def enable_real_s3_remote_storage(self, test_name: str, force_enable: bool = True):
"""
Sets up configuration to use real s3 endpoint without mock server
"""
@@ -775,18 +737,9 @@ class NeonEnvBuilder:
bucket_region=region,
access_key=access_key,
secret_key=secret_key,
prefix_in_bucket=f"{self.remote_storage_prefix}/pageserver",
prefix_in_bucket=self.remote_storage_prefix,
)
if enable_remote_extensions:
self.ext_remote_storage = S3Storage(
bucket_name=bucket_name,
bucket_region=region,
access_key=access_key,
secret_key=secret_key,
prefix_in_bucket=f"{self.remote_storage_prefix}/ext",
)
def cleanup_local_storage(self):
if self.preserve_database_files:
return
@@ -820,7 +773,6 @@ class NeonEnvBuilder:
# `self.remote_storage_prefix` is coupled with `S3Storage` storage type,
# so this line effectively a no-op
assert isinstance(self.remote_storage, S3Storage)
assert self.remote_storage_client is not None
if self.keep_remote_storage_contents:
log.info("keep_remote_storage_contents skipping remote storage cleanup")
@@ -950,8 +902,6 @@ class NeonEnv:
self.neon_binpath = config.neon_binpath
self.pg_distrib_dir = config.pg_distrib_dir
self.endpoint_counter = 0
self.remote_storage_client = config.remote_storage_client
self.ext_remote_storage = config.ext_remote_storage
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
@@ -1538,7 +1488,6 @@ class NeonCli(AbstractNeonCli):
safekeepers: Optional[List[int]] = None,
tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None,
remote_ext_config: Optional[str] = None,
) -> "subprocess.CompletedProcess[str]":
args = [
"endpoint",
@@ -1548,8 +1497,6 @@ class NeonCli(AbstractNeonCli):
"--pg-version",
self.env.pg_version,
]
if remote_ext_config is not None:
args.extend(["--remote-ext-config", remote_ext_config])
if lsn is not None:
args.append(f"--lsn={lsn}")
args.extend(["--pg-port", str(pg_port)])
@@ -2411,7 +2358,7 @@ class Endpoint(PgProtocol):
return self
def start(self, remote_ext_config: Optional[str] = None) -> "Endpoint":
def start(self) -> "Endpoint":
"""
Start the Postgres instance.
Returns self.
@@ -2427,7 +2374,6 @@ class Endpoint(PgProtocol):
http_port=self.http_port,
tenant_id=self.tenant_id,
safekeepers=self.active_safekeepers,
remote_ext_config=remote_ext_config,
)
self.running = True
@@ -2517,7 +2463,6 @@ class Endpoint(PgProtocol):
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
remote_ext_config: Optional[str] = None,
) -> "Endpoint":
"""
Create an endpoint, apply config, and start Postgres.
@@ -2532,7 +2477,7 @@ class Endpoint(PgProtocol):
config_lines=config_lines,
hot_standby=hot_standby,
lsn=lsn,
).start(remote_ext_config=remote_ext_config)
).start()
log.info(f"Postgres startup took {time.time() - started_at} seconds")
@@ -2566,7 +2511,6 @@ class EndpointFactory:
lsn: Optional[Lsn] = None,
hot_standby: bool = False,
config_lines: Optional[List[str]] = None,
remote_ext_config: Optional[str] = None,
) -> Endpoint:
ep = Endpoint(
self.env,
@@ -2583,7 +2527,6 @@ class EndpointFactory:
hot_standby=hot_standby,
config_lines=config_lines,
lsn=lsn,
remote_ext_config=remote_ext_config,
)
def create(

View File

@@ -89,9 +89,6 @@ class TenantId(Id):
def __repr__(self) -> str:
return f'`TenantId("{self.id.hex()}")'
def __str__(self) -> str:
return self.id.hex()
class TimelineId(Id):
def __repr__(self) -> str:

View File

@@ -30,7 +30,18 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_startup")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_startup")
def get_synced_lsn():
"""Assert safekeepers are synced and get the LSN."""
commit_lsns = [
sk.http_client().timeline_status(tenant_id, timeline_id).commit_lsn
for sk in env.safekeepers
]
assert len(commit_lsns) == 3
assert len(set(commit_lsns)) == 1
return commit_lsns[0]
endpoint = None
@@ -63,7 +74,8 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
endpoint.stop()
# Imitate optimizations that console would do for the second start
endpoint.respec(skip_pg_catalog_updates=True)
lsn = get_synced_lsn()
endpoint.respec(skip_pg_catalog_updates=True, skip_sync_safekeepers=lsn.lsn_int)
# This test sometimes runs for longer than the global 5 minute timeout.

View File

@@ -1,252 +0,0 @@
import os
from contextlib import closing
from io import BytesIO
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, RemoteStorageKind
from fixtures.pg_version import PgVersion
from fixtures.types import TenantId
NUM_EXT = 3
def control_file_content(owner, i):
output = f"""# mock {owner} extension{i}
comment = 'This is a mock extension'
default_version = '1.0'
module_pathname = '$libdir/test_ext{i}'
relocatable = true"""
return output
def sql_file_content():
output = """
CREATE FUNCTION test_ext_add(integer, integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
IMMUTABLE
RETURNS NULL ON NULL INPUT;
"""
return output
# Prepare some mock extension files and upload them to the bucket
# returns a list of files that should be cleaned up after the test
def prepare_mock_ext_storage(
pg_version: PgVersion,
tenant_id: TenantId,
pg_bin: PgBin,
ext_remote_storage,
remote_storage_client,
):
bucket_prefix = ext_remote_storage.prefix_in_bucket
private_prefix = str(tenant_id)
PUB_EXT_ROOT = f"v{pg_version}/share/postgresql/extension"
PRIVATE_EXT_ROOT = f"v{pg_version}/{private_prefix}/share/postgresql/extension"
LOCAL_EXT_ROOT = f"pg_install/{PUB_EXT_ROOT}"
PUB_LIB_ROOT = f"v{pg_version}/lib"
PRIVATE_LIB_ROOT = f"v{pg_version}/{private_prefix}/lib"
LOCAL_LIB_ROOT = f"{pg_bin.pg_lib_dir}/postgresql"
log.info(
f"""
PUB_EXT_ROOT: {PUB_EXT_ROOT}
PRIVATE_EXT_ROOT: {PRIVATE_EXT_ROOT}
LOCAL_EXT_ROOT: {LOCAL_EXT_ROOT}
PUB_LIB_ROOT: {PUB_LIB_ROOT}
PRIVATE_LIB_ROOT: {PRIVATE_LIB_ROOT}
LOCAL_LIB_ROOT: {LOCAL_LIB_ROOT}
"""
)
cleanup_files = []
# Upload several test_ext{i}.control files to the bucket
for i in range(NUM_EXT):
public_ext = BytesIO(bytes(control_file_content("public", i), "utf-8"))
public_remote_name = f"{bucket_prefix}/{PUB_EXT_ROOT}/test_ext{i}.control"
public_local_name = f"{LOCAL_EXT_ROOT}/test_ext{i}.control"
private_ext = BytesIO(bytes(control_file_content(str(tenant_id), i), "utf-8"))
private_remote_name = f"{bucket_prefix}/{PRIVATE_EXT_ROOT}/private_ext{i}.control"
private_local_name = f"{LOCAL_EXT_ROOT}/private_ext{i}.control"
cleanup_files += [public_local_name, private_local_name]
remote_storage_client.upload_fileobj(
public_ext, ext_remote_storage.bucket_name, public_remote_name
)
remote_storage_client.upload_fileobj(
private_ext, ext_remote_storage.bucket_name, private_remote_name
)
# Upload SQL file for the extension we're going to create
sql_filename = "test_ext0--1.0.sql"
test_sql_public_remote_path = f"{bucket_prefix}/{PUB_EXT_ROOT}/{sql_filename}"
test_sql_local_path = f"{LOCAL_EXT_ROOT}/{sql_filename}"
test_ext_sql_file = BytesIO(bytes(sql_file_content(), "utf-8"))
remote_storage_client.upload_fileobj(
test_ext_sql_file,
ext_remote_storage.bucket_name,
test_sql_public_remote_path,
)
cleanup_files += [test_sql_local_path]
# upload some fake library files
for i in range(2):
public_library = BytesIO(bytes("\n111\n", "utf-8"))
public_remote_name = f"{bucket_prefix}/{PUB_LIB_ROOT}/test_lib{i}.so"
public_local_name = f"{LOCAL_LIB_ROOT}/test_lib{i}.so"
private_library = BytesIO(bytes("\n111\n", "utf-8"))
private_remote_name = f"{bucket_prefix}/{PRIVATE_LIB_ROOT}/private_lib{i}.so"
private_local_name = f"{LOCAL_LIB_ROOT}/private_lib{i}.so"
log.info(f"uploading library to {public_remote_name}")
log.info(f"uploading library to {private_remote_name}")
remote_storage_client.upload_fileobj(
public_library,
ext_remote_storage.bucket_name,
public_remote_name,
)
remote_storage_client.upload_fileobj(
private_library,
ext_remote_storage.bucket_name,
private_remote_name,
)
cleanup_files += [public_local_name, private_local_name]
return cleanup_files
# Generate mock extension files and upload them to the bucket.
#
# Then check that compute nodes can download them and use them
# to CREATE EXTENSION and LOAD 'library.so'
#
# NOTE: You must have appropriate AWS credentials to run REAL_S3 test.
# It may also be necessary to set the following environment variables:
# export AWS_ACCESS_KEY_ID='test'
# export AWS_SECRET_ACCESS_KEY='test'
# export AWS_SECURITY_TOKEN='test'
# export AWS_SESSION_TOKEN='test'
# export AWS_DEFAULT_REGION='us-east-1'
@pytest.mark.parametrize(
"remote_storage_kind", [RemoteStorageKind.MOCK_S3, RemoteStorageKind.REAL_S3]
)
def test_remote_extensions(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_version: PgVersion,
pg_bin: PgBin,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_remote_extensions",
enable_remote_extensions=True,
)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
assert env.ext_remote_storage is not None
assert env.remote_storage_client is not None
# Prepare some mock extension files and upload them to the bucket
cleanup_files = prepare_mock_ext_storage(
pg_version,
tenant_id,
pg_bin,
env.ext_remote_storage,
env.remote_storage_client,
)
# Start a compute node and check that it can download the extensions
# and use them to CREATE EXTENSION and LOAD 'library.so'
#
# This block is wrapped in a try/finally so that the downloaded files
# are cleaned up even if the test fails
try:
endpoint = env.endpoints.create_start(
"test_remote_extensions",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
# config_lines=["log_min_messages=debug3"],
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# Test query: check that test_ext0 was successfully downloaded
cur.execute("SELECT * FROM pg_available_extensions")
all_extensions = [x[0] for x in cur.fetchall()]
log.info(all_extensions)
for i in range(NUM_EXT):
assert f"test_ext{i}" in all_extensions
assert f"private_ext{i}" in all_extensions
cur.execute("CREATE EXTENSION test_ext0")
cur.execute("SELECT extname FROM pg_extension")
all_extensions = [x[0] for x in cur.fetchall()]
log.info(all_extensions)
assert "test_ext0" in all_extensions
# Try to load existing library file
try:
cur.execute("LOAD 'test_lib0.so'")
except Exception as e:
# expected to fail with
# could not load library ... test_ext.so: file too short
# because test_lib0.so is not real library file
log.info("LOAD test_lib0.so failed (expectedly): %s", e)
assert "file too short" in str(e)
# Try to load private library file
try:
cur.execute("LOAD 'private_lib0.so'")
except Exception as e:
# expected to fail with
# could not load library ... test_ext.so: file too short
# because test_lib0.so is not real library file
log.info("LOAD private_lib0.so failed (expectedly): %s", e)
assert "file too short" in str(e)
# Try to load existing library file without .so extension
try:
cur.execute("LOAD 'test_lib1'")
except Exception as e:
# expected to fail with
# could not load library ... test_lib1.so: file too short
# because test_lib1.so is not real library file
log.info("LOAD test_lib1 failed (expectedly): %s", e)
assert "file too short" in str(e)
# Try to load non-existent library file
try:
cur.execute("LOAD 'test_lib_fail.so'")
except Exception as e:
# expected to fail because test_lib_fail.so is not found
log.info("LOAD test_lib_fail.so failed (expectedly): %s", e)
assert (
"""could not access file "test_lib_fail.so": No such file or directory"""
in str(e)
)
finally:
# this is important because if the files aren't cleaned up then the test can
# pass even without successfully downloading the files if a previous run (or
# run with different type of remote storage) of the test did download the
# files
for file in cleanup_files:
try:
os.remove(file)
log.info(f"Deleted {file}")
except FileNotFoundError:
log.info(f"{file} does not exist, so cannot be deleted")
# TODO
# @pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
# def test_remote_extensions_shared_preload_libraries(
# neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, pg_version: PgVersion
# ):

View File

@@ -275,7 +275,6 @@ def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str]
assert isinstance(neon_env_builder.remote_storage, S3Storage)
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
assert neon_env_builder.remote_storage_client is not None
response = neon_env_builder.remote_storage_client.list_objects_v2(
Bucket=neon_env_builder.remote_storage.bucket_name,
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
@@ -464,10 +463,10 @@ def test_concurrent_timeline_delete_stuck_on(
# make the second call and assert behavior
log.info("second call start")
error_msg_re = "timeline deletion is already in progress"
error_msg_re = "Timeline deletion is already in progress"
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
assert second_call_err.value.status_code == 500
assert second_call_err.value.status_code == 409
env.pageserver.allowed_errors.append(f".*{child_timeline_id}.*{error_msg_re}.*")
# the second call will try to transition the timeline into Stopping state as well
env.pageserver.allowed_errors.append(
@@ -519,9 +518,9 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*timeline deletion is already in progress.*"
f".*{child_timeline_id}.*Timeline deletion is already in progress.*"
)
with pytest.raises(PageserverApiException, match="timeline deletion is already in progress"):
with pytest.raises(PageserverApiException, match="Timeline deletion is already in progress"):
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
# make sure the timeout was due to the failpoint
@@ -629,7 +628,7 @@ def test_timeline_delete_works_for_remote_smoke(
)
# for some reason the check above doesnt immediately take effect for the below.
# Assume it is mock server inconsistency and check twice.
# Assume it is mock server incosistency and check twice.
wait_until(
2,
0.5,