mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-02 02:00:38 +00:00
Compare commits
8 Commits
extension_
...
access_sta
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
83294d771b | ||
|
|
b3ef6c7bf5 | ||
|
|
f88ff9f3c6 | ||
|
|
593c4244fd | ||
|
|
d7aa36c4c0 | ||
|
|
df127ef209 | ||
|
|
72a73d2c82 | ||
|
|
172239c7ee |
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -659,7 +659,6 @@ jobs:
|
||||
--cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache
|
||||
--context .
|
||||
--build-arg GIT_VERSION=${{ github.sha }}
|
||||
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}}
|
||||
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
--dockerfile Dockerfile.compute-tools
|
||||
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{needs.tag.outputs.build-tag}}
|
||||
@@ -717,7 +716,6 @@ jobs:
|
||||
--context .
|
||||
--build-arg GIT_VERSION=${{ github.sha }}
|
||||
--build-arg PG_VERSION=${{ matrix.version }}
|
||||
--build-arg BUILD_TAG=${{needs.tag.outputs.build-tag}}
|
||||
--build-arg REPOSITORY=369495373322.dkr.ecr.eu-central-1.amazonaws.com
|
||||
--dockerfile Dockerfile.compute-node
|
||||
--destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -924,14 +924,12 @@ dependencies = [
|
||||
"opentelemetry",
|
||||
"postgres",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tar",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
@@ -999,7 +997,6 @@ dependencies = [
|
||||
"tar",
|
||||
"thiserror",
|
||||
"toml",
|
||||
"tracing",
|
||||
"url",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
@@ -2352,9 +2349,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
|
||||
|
||||
[[package]]
|
||||
name = "openssl"
|
||||
version = "0.10.55"
|
||||
version = "0.10.52"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d"
|
||||
checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
@@ -2384,9 +2381,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.90"
|
||||
version = "0.9.87"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6"
|
||||
checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
|
||||
@@ -2,7 +2,6 @@ ARG PG_VERSION
|
||||
ARG REPOSITORY=neondatabase
|
||||
ARG IMAGE=rust
|
||||
ARG TAG=pinned
|
||||
ARG BUILD_TAG
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -635,9 +634,6 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
#
|
||||
#########################################################################################
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
|
||||
ARG BUILD_TAG
|
||||
ENV BUILD_TAG=$BUILD_TAG
|
||||
|
||||
USER nonroot
|
||||
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
|
||||
COPY --chown=nonroot . .
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
ARG REPOSITORY=neondatabase
|
||||
ARG IMAGE=rust
|
||||
ARG TAG=pinned
|
||||
ARG BUILD_TAG
|
||||
|
||||
FROM $REPOSITORY/$IMAGE:$TAG AS rust-build
|
||||
WORKDIR /home/nonroot
|
||||
@@ -17,8 +16,6 @@ ENV CACHEPOT_S3_KEY_PREFIX=cachepot
|
||||
ARG CACHEPOT_BUCKET=neon-github-dev
|
||||
#ARG AWS_ACCESS_KEY_ID
|
||||
#ARG AWS_SECRET_ACCESS_KEY
|
||||
ARG BUILD_TAG
|
||||
ENV BUILD_TAG=$BUILD_TAG
|
||||
|
||||
COPY . .
|
||||
|
||||
|
||||
@@ -30,5 +30,3 @@ url.workspace = true
|
||||
compute_api.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
toml_edit.workspace = true
|
||||
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
|
||||
|
||||
@@ -27,8 +27,7 @@
|
||||
//! compute_ctl -D /var/db/postgres/compute \
|
||||
//! -C 'postgresql://cloud_admin@localhost/postgres' \
|
||||
//! -S /var/db/postgres/specs/current.json \
|
||||
//! -b /usr/local/bin/postgres \
|
||||
//! -r {"bucket": "my-bucket", "region": "eu-central-1"}
|
||||
//! -b /usr/local/bin/postgres
|
||||
//! ```
|
||||
//!
|
||||
use std::collections::HashMap;
|
||||
@@ -49,43 +48,16 @@ use compute_api::responses::ComputeStatus;
|
||||
|
||||
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::extension_server::{
|
||||
download_extension, get_availiable_extensions, init_remote_storage,
|
||||
};
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
use compute_tools::params::*;
|
||||
use compute_tools::spec::*;
|
||||
|
||||
use tokio::runtime::Runtime;
|
||||
const BUILD_TAG_DEFAULT: &str = "local";
|
||||
|
||||
fn main() -> Result<()> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
|
||||
|
||||
let build_tag = option_env!("BUILD_TAG").unwrap_or(BUILD_TAG_DEFAULT);
|
||||
|
||||
info!("build_tag: {build_tag}");
|
||||
|
||||
let matches = cli().get_matches();
|
||||
let pgbin_default = String::from("postgres");
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
|
||||
|
||||
let remote_ext_config = matches.get_one::<String>("remote-ext-config");
|
||||
let ext_remote_storage = match remote_ext_config {
|
||||
Some(x) => Some(init_remote_storage(x)?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
let rt = Runtime::new().unwrap();
|
||||
let copy_remote_storage = ext_remote_storage.clone();
|
||||
|
||||
// rt.block_on(async move {
|
||||
// download_extension(©_remote_storage, ExtensionType::Shared, pgbin)
|
||||
// .await
|
||||
// .expect("download extension should work");
|
||||
// });
|
||||
|
||||
let http_port = *matches
|
||||
.get_one::<u16>("http-port")
|
||||
@@ -150,6 +122,9 @@ fn main() -> Result<()> {
|
||||
let compute_id = matches.get_one::<String>("compute-id");
|
||||
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
|
||||
|
||||
// Try to use just 'postgres' if no path is provided
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap();
|
||||
|
||||
let spec;
|
||||
let mut live_config_allowed = false;
|
||||
match spec_json {
|
||||
@@ -201,9 +176,6 @@ fn main() -> Result<()> {
|
||||
live_config_allowed,
|
||||
state: Mutex::new(new_state),
|
||||
state_changed: Condvar::new(),
|
||||
ext_remote_storage,
|
||||
availiable_extensions: Vec::new(),
|
||||
availiable_libraries: Vec::new(),
|
||||
};
|
||||
let compute = Arc::new(compute_node);
|
||||
|
||||
@@ -212,21 +184,6 @@ fn main() -> Result<()> {
|
||||
let _http_handle =
|
||||
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
|
||||
|
||||
let extension_server_port: u16 = http_port;
|
||||
|
||||
// exen before we have spec, we can get public availiable extensions
|
||||
// TODO turn get_availiable_extensions() & other functions into ComputeNode method,
|
||||
// we pass to many params from it anyways..
|
||||
|
||||
compute_node.availiable_extensions = get_availiable_extensions(
|
||||
ext_remote_storage,
|
||||
pg_version, //TODO
|
||||
pgbin,
|
||||
None,
|
||||
);
|
||||
|
||||
// TODO same for libraries
|
||||
|
||||
if !spec_set {
|
||||
// No spec provided, hang waiting for it.
|
||||
info!("no compute spec provided, waiting");
|
||||
@@ -264,21 +221,10 @@ fn main() -> Result<()> {
|
||||
let _configurator_handle =
|
||||
launch_configurator(&compute).expect("cannot launch configurator thread");
|
||||
|
||||
// download private tenant extensions before postgres start
|
||||
// TODO
|
||||
// compute_node.availiable_extensions = get_availiable_extensions(ext_remote_storage,
|
||||
// pg_version, //TODO
|
||||
// pgbin,
|
||||
// tenant_id); //TODO get tenant_id from spec
|
||||
|
||||
// download preload shared libraries before postgres start (if any)
|
||||
// TODO
|
||||
// download_library_file();
|
||||
|
||||
// Start Postgres
|
||||
let mut delay_exit = false;
|
||||
let mut exit_code = None;
|
||||
let pg = match compute.start_compute(extension_server_port) {
|
||||
let pg = match compute.start_compute() {
|
||||
Ok(pg) => Some(pg),
|
||||
Err(err) => {
|
||||
error!("could not start the compute node: {:?}", err);
|
||||
@@ -397,12 +343,6 @@ fn cli() -> clap::Command {
|
||||
.long("control-plane-uri")
|
||||
.value_name("CONTROL_PLANE_API_BASE_URI"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("remote-ext-config")
|
||||
.short('r')
|
||||
.long("remote-ext-config")
|
||||
.value_name("REMOTE_EXT_CONFIG"),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -16,8 +16,6 @@ use utils::lsn::Lsn;
|
||||
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
|
||||
use crate::config;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::spec::*;
|
||||
@@ -47,10 +45,6 @@ pub struct ComputeNode {
|
||||
pub state: Mutex<ComputeState>,
|
||||
/// `Condvar` to allow notifying waiters about state changes.
|
||||
pub state_changed: Condvar,
|
||||
/// S3 extensions configuration variables
|
||||
pub ext_remote_storage: Option<GenericRemoteStorage>,
|
||||
pub availiable_extensions: Vec<RemotePath>,
|
||||
pub availiable_libraries: Vec<RemotePath>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -251,22 +245,14 @@ impl ComputeNode {
|
||||
/// Do all the preparations like PGDATA directory creation, configuration,
|
||||
/// safekeepers sync, basebackup, etc.
|
||||
#[instrument(skip(self, compute_state))]
|
||||
pub fn prepare_pgdata(
|
||||
&self,
|
||||
compute_state: &ComputeState,
|
||||
extension_server_port: u16,
|
||||
) -> Result<()> {
|
||||
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
|
||||
// Remove/create an empty pgdata directory and put configuration there.
|
||||
self.create_pgdata()?;
|
||||
config::write_postgres_conf(
|
||||
&pgdata_path.join("postgresql.conf"),
|
||||
&pspec.spec,
|
||||
Some(extension_server_port),
|
||||
)?;
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
|
||||
|
||||
// Syncing safekeepers is only safe with primary nodes: if a primary
|
||||
// is already connected it will be kicked out, so a secondary (standby)
|
||||
@@ -404,7 +390,7 @@ impl ComputeNode {
|
||||
|
||||
// Write new config
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
|
||||
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
self.pg_reload_conf(&mut client)?;
|
||||
@@ -434,7 +420,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> {
|
||||
pub fn start_compute(&self) -> Result<std::process::Child> {
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
info!(
|
||||
@@ -445,7 +431,7 @@ impl ComputeNode {
|
||||
pspec.timeline_id,
|
||||
);
|
||||
|
||||
self.prepare_pgdata(&compute_state, extension_server_port)?;
|
||||
self.prepare_pgdata(&compute_state)?;
|
||||
|
||||
let start_time = Utc::now();
|
||||
|
||||
|
||||
@@ -33,11 +33,7 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
|
||||
}
|
||||
|
||||
/// Create or completely rewrite configuration file specified by `path`
|
||||
pub fn write_postgres_conf(
|
||||
path: &Path,
|
||||
spec: &ComputeSpec,
|
||||
extension_server_port: Option<u16>,
|
||||
) -> Result<()> {
|
||||
pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
|
||||
// File::create() destroys the file content if it exists.
|
||||
let mut file = File::create(path)?;
|
||||
|
||||
@@ -99,9 +95,5 @@ pub fn write_postgres_conf(
|
||||
writeln!(file, "# Managed by compute_ctl: end")?;
|
||||
}
|
||||
|
||||
if let Some(port) = extension_server_port {
|
||||
writeln!(file, "neon.extension_server_port={}", port)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,182 +0,0 @@
|
||||
use anyhow::{self, bail, Result};
|
||||
use remote_storage::*;
|
||||
use serde_json::{self, Value};
|
||||
use std::fs::File;
|
||||
use std::io::{BufWriter, Write};
|
||||
use std::num::{NonZeroU32, NonZeroUsize};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::info;
|
||||
use utils::id::TenantId;
|
||||
|
||||
fn get_pg_config(argument: &str, pgbin: &str) -> String {
|
||||
// gives the result of `pg_config [argument]`
|
||||
// where argument is a flag like `--version` or `--sharedir`
|
||||
let pgconfig = pgbin.replace("postgres", "pg_config");
|
||||
let config_output = std::process::Command::new(pgconfig)
|
||||
.arg(argument)
|
||||
.output()
|
||||
.expect("pg_config error");
|
||||
std::str::from_utf8(&config_output.stdout)
|
||||
.expect("pg_config error")
|
||||
.trim()
|
||||
.to_string()
|
||||
}
|
||||
|
||||
fn get_pg_version(pgbin: &str) -> String {
|
||||
// pg_config --version returns a (platform specific) human readable string
|
||||
// such as "PostgreSQL 15.4". We parse this to v14/v15
|
||||
let human_version = get_pg_config("--version", pgbin);
|
||||
if human_version.contains("v15") {
|
||||
return "v15".to_string();
|
||||
}
|
||||
"v14".to_string()
|
||||
}
|
||||
|
||||
async fn download_helper(
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
remote_from_path: &RemotePath,
|
||||
download_location: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
// downloads file at remote_from_path to download_location/[file_name]
|
||||
let local_path = download_location.join(remote_from_path.object_name().expect("bad object"));
|
||||
info!(
|
||||
"Downloading {:?} to location {:?}",
|
||||
&remote_from_path, &local_path
|
||||
);
|
||||
let mut download = remote_storage.download(remote_from_path).await?;
|
||||
let mut write_data_buffer = Vec::new();
|
||||
download
|
||||
.download_stream
|
||||
.read_to_end(&mut write_data_buffer)
|
||||
.await?;
|
||||
let mut output_file = BufWriter::new(File::create(local_path)?);
|
||||
output_file.write_all(&write_data_buffer)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// download extension control files
|
||||
//
|
||||
// return list of all extension files to use it in the future searches
|
||||
//
|
||||
// if tenant_id is provided - search in a private per-tenant extension path,
|
||||
// otherwise - in public extension path
|
||||
//
|
||||
pub async fn get_availiable_extensions(
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pg_version: &str,
|
||||
pgbin: &str,
|
||||
tenant_id: Option<TenantId>,
|
||||
) -> anyhow::Result<Vec<RemotePath>> {
|
||||
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||
|
||||
let remote_sharedir = match tenant_id {
|
||||
None => RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension"))?,
|
||||
Some(tenant_id) => RemotePath::new(
|
||||
&Path::new(&pg_version)
|
||||
.join(&tenant_id.to_string())
|
||||
.join("share/postgresql/extension"),
|
||||
)?,
|
||||
};
|
||||
|
||||
let from_paths = remote_storage.list_files(Some(&remote_sharedir)).await?;
|
||||
|
||||
// download all found control files
|
||||
for remote_from_path in &from_paths {
|
||||
if remote_from_path.extension() == Some("control") {
|
||||
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(from_paths)
|
||||
}
|
||||
|
||||
// download all sql files for a given extension name
|
||||
//
|
||||
pub async fn download_extension_sql_files(
|
||||
ext_name: &str,
|
||||
availiable_extensions: Vec<RemotePath>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
) -> Result<()> {
|
||||
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||
|
||||
// check if extension files exist
|
||||
let files_to_download: Vec<&RemotePath> = availiable_extensions
|
||||
.iter()
|
||||
.filter(|ext| {
|
||||
ext.extension() == Some("sql") && ext.object_name().unwrap().starts_with(ext_name)
|
||||
})
|
||||
.collect();
|
||||
|
||||
if files_to_download.is_empty() {
|
||||
bail!("Files for extension {ext_name} are not found in the extension store");
|
||||
}
|
||||
|
||||
for remote_from_path in files_to_download {
|
||||
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// download shared library file
|
||||
pub async fn download_library_file(
|
||||
lib_name: &str,
|
||||
availiable_libraries: Vec<RemotePath>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
) -> Result<()> {
|
||||
let local_libdir: PathBuf = Path::new(&get_pg_config("--libdir", pgbin)).into();
|
||||
|
||||
// check if the library file exists
|
||||
let lib = availiable_libraries
|
||||
.iter()
|
||||
.find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name);
|
||||
|
||||
match lib {
|
||||
None => bail!("Shared library file {lib_name} is not found in the extension store"),
|
||||
Some(lib) => {
|
||||
download_helper(remote_storage, &lib, &local_libdir).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result<GenericRemoteStorage> {
|
||||
let remote_ext_config: serde_json::Value = serde_json::from_str(remote_ext_config)?;
|
||||
let remote_ext_bucket = match &remote_ext_config["bucket"] {
|
||||
Value::String(x) => x,
|
||||
_ => bail!("remote_ext_config missing bucket"),
|
||||
};
|
||||
let remote_ext_region = match &remote_ext_config["region"] {
|
||||
Value::String(x) => x,
|
||||
_ => bail!("remote_ext_config missing region"),
|
||||
};
|
||||
let remote_ext_endpoint = match &remote_ext_config["endpoint"] {
|
||||
Value::String(x) => Some(x.clone()),
|
||||
_ => None,
|
||||
};
|
||||
let remote_ext_prefix = match &remote_ext_config["prefix"] {
|
||||
Value::String(x) => Some(x.clone()),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// load will not be large, so default parameters are fine
|
||||
let config = S3Config {
|
||||
bucket_name: remote_ext_bucket.to_string(),
|
||||
bucket_region: remote_ext_region.to_string(),
|
||||
prefix_in_bucket: remote_ext_prefix,
|
||||
endpoint: remote_ext_endpoint,
|
||||
concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
|
||||
max_keys_per_list_response: None,
|
||||
};
|
||||
let config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"),
|
||||
max_sync_errors: NonZeroU32::new(100).expect("100 != 0"),
|
||||
storage: RemoteStorageKind::AwsS3(config),
|
||||
};
|
||||
GenericRemoteStorage::from_config(&config)
|
||||
}
|
||||
@@ -16,8 +16,6 @@ use tokio::task;
|
||||
use tracing::{error, info};
|
||||
use tracing_utils::http::OtelName;
|
||||
|
||||
use crate::extension_server::{download_extension_sql_files, download_library_file};
|
||||
|
||||
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
|
||||
ComputeStatusResponse {
|
||||
start_time: state.start_time,
|
||||
@@ -123,68 +121,8 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
// download extension files from S3 on demand
|
||||
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
||||
info!("serving {:?} POST request", route);
|
||||
|
||||
let is_library = false;
|
||||
|
||||
let filename = route.split('/').last().unwrap();
|
||||
|
||||
info!(
|
||||
"serving /extension_server POST request, filename: {:?}",
|
||||
filename
|
||||
);
|
||||
|
||||
if compute.ext_remote_storage.is_none() {
|
||||
error!("Remote extension storage is not set up");
|
||||
let mut resp = Response::new(Body::from("Remote extension storage is not set up"));
|
||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
return resp;
|
||||
}
|
||||
let ext_storage = &compute.ext_remote_storage.unwrap();
|
||||
|
||||
if !is_library {
|
||||
match download_extension_sql_files(
|
||||
filename,
|
||||
&compute.availiable_extensions,
|
||||
&ext_storage,
|
||||
&compute.pgbin,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => Response::new(Body::from("OK")),
|
||||
Err(e) => {
|
||||
error!("extension download failed: {}", e);
|
||||
let mut resp = Response::new(Body::from(e.to_string()));
|
||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
resp
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match download_library_file(
|
||||
filename,
|
||||
&compute.availiable_libraries,
|
||||
&ext_storage,
|
||||
&compute.pgbin,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => Response::new(Body::from("OK")),
|
||||
Err(e) => {
|
||||
error!("library download failed: {}", e);
|
||||
let mut resp = Response::new(Body::from(e.to_string()));
|
||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
resp
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return the `404 Not Found` for any other routes.
|
||||
method => {
|
||||
info!("404 Not Found for {:?}", method);
|
||||
|
||||
_ => {
|
||||
let mut not_found = Response::new(Body::from("404 Not Found"));
|
||||
*not_found.status_mut() = StatusCode::NOT_FOUND;
|
||||
not_found
|
||||
|
||||
@@ -139,34 +139,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
/extension_server:
|
||||
post:
|
||||
tags:
|
||||
- Extension
|
||||
summary: Download extension from S3 to local folder.
|
||||
description: ""
|
||||
operationId: downloadExtension
|
||||
responses:
|
||||
200:
|
||||
description: Extension downloaded
|
||||
content:
|
||||
text/plain:
|
||||
schema:
|
||||
type: string
|
||||
description: Error text or 'OK' if download succeeded.
|
||||
example: "OK"
|
||||
400:
|
||||
description: Request is invalid.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
500:
|
||||
description: Extension download request failed.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
|
||||
components:
|
||||
securitySchemes:
|
||||
|
||||
@@ -9,7 +9,6 @@ pub mod http;
|
||||
#[macro_use]
|
||||
pub mod logger;
|
||||
pub mod compute;
|
||||
pub mod extension_server;
|
||||
pub mod monitor;
|
||||
pub mod params;
|
||||
pub mod pg_helpers;
|
||||
|
||||
@@ -124,7 +124,7 @@ pub fn get_spec_from_control_plane(
|
||||
pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
|
||||
// File `postgresql.conf` is no longer included into `basebackup`, so just
|
||||
// always write all config into it creating new file.
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?;
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?;
|
||||
|
||||
update_pg_hba(pgdata_path)?;
|
||||
|
||||
|
||||
@@ -32,4 +32,3 @@ utils.workspace = true
|
||||
|
||||
compute_api.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -657,8 +657,6 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.get_one::<String>("endpoint_id")
|
||||
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
|
||||
|
||||
let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
|
||||
|
||||
// If --safekeepers argument is given, use only the listed safekeeper nodes.
|
||||
let safekeepers =
|
||||
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
||||
@@ -700,7 +698,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
_ => {}
|
||||
}
|
||||
println!("Starting existing endpoint {endpoint_id}...");
|
||||
endpoint.start(&auth_token, safekeepers, remote_ext_config)?;
|
||||
endpoint.start(&auth_token, safekeepers)?;
|
||||
} else {
|
||||
let branch_name = sub_args
|
||||
.get_one::<String>("branch-name")
|
||||
@@ -744,7 +742,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
pg_version,
|
||||
mode,
|
||||
)?;
|
||||
ep.start(&auth_token, safekeepers, remote_ext_config)?;
|
||||
ep.start(&auth_token, safekeepers)?;
|
||||
}
|
||||
}
|
||||
"stop" => {
|
||||
@@ -1004,12 +1002,6 @@ fn cli() -> Command {
|
||||
.help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
|
||||
.required(false);
|
||||
|
||||
let remote_ext_config_args = Arg::new("remote-ext-config")
|
||||
.long("remote-ext-config")
|
||||
.num_args(1)
|
||||
.help("Configure the S3 bucket that we search for extensions in.")
|
||||
.required(false);
|
||||
|
||||
let lsn_arg = Arg::new("lsn")
|
||||
.long("lsn")
|
||||
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
|
||||
@@ -1160,7 +1152,6 @@ fn cli() -> Command {
|
||||
.arg(pg_version_arg)
|
||||
.arg(hot_standby_arg)
|
||||
.arg(safekeepers_arg)
|
||||
.arg(remote_ext_config_args)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("stop")
|
||||
|
||||
@@ -67,7 +67,6 @@ pub struct EndpointConf {
|
||||
pg_port: u16,
|
||||
http_port: u16,
|
||||
pg_version: u32,
|
||||
skip_pg_catalog_updates: bool,
|
||||
}
|
||||
|
||||
//
|
||||
@@ -136,7 +135,6 @@ impl ComputeControlPlane {
|
||||
mode,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
skip_pg_catalog_updates: false,
|
||||
});
|
||||
|
||||
ep.create_endpoint_dir()?;
|
||||
@@ -150,7 +148,6 @@ impl ComputeControlPlane {
|
||||
http_port,
|
||||
pg_port,
|
||||
pg_version,
|
||||
skip_pg_catalog_updates: false,
|
||||
})?,
|
||||
)?;
|
||||
std::fs::write(
|
||||
@@ -186,9 +183,6 @@ pub struct Endpoint {
|
||||
// the endpoint runs in.
|
||||
pub env: LocalEnv,
|
||||
pageserver: Arc<PageServerNode>,
|
||||
|
||||
// Optimizations
|
||||
skip_pg_catalog_updates: bool,
|
||||
}
|
||||
|
||||
impl Endpoint {
|
||||
@@ -222,7 +216,6 @@ impl Endpoint {
|
||||
mode: conf.mode,
|
||||
tenant_id: conf.tenant_id,
|
||||
pg_version: conf.pg_version,
|
||||
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -408,12 +401,7 @@ impl Endpoint {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn start(
|
||||
&self,
|
||||
auth_token: &Option<String>,
|
||||
safekeepers: Vec<NodeId>,
|
||||
remote_ext_config: Option<&String>,
|
||||
) -> Result<()> {
|
||||
pub fn start(&self, auth_token: &Option<String>, safekeepers: Vec<NodeId>) -> Result<()> {
|
||||
if self.status() == "running" {
|
||||
anyhow::bail!("The endpoint is already running");
|
||||
}
|
||||
@@ -462,7 +450,7 @@ impl Endpoint {
|
||||
|
||||
// Create spec file
|
||||
let spec = ComputeSpec {
|
||||
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
|
||||
skip_pg_catalog_updates: false,
|
||||
format_version: 1.0,
|
||||
operation_uuid: None,
|
||||
cluster: Cluster {
|
||||
@@ -512,9 +500,6 @@ impl Endpoint {
|
||||
.stdin(std::process::Stdio::null())
|
||||
.stderr(logfile.try_clone()?)
|
||||
.stdout(logfile);
|
||||
if let Some(remote_ext_config) = remote_ext_config {
|
||||
cmd.args(["--remote-ext-config", remote_ext_config]);
|
||||
}
|
||||
let _child = cmd.spawn()?;
|
||||
|
||||
// Wait for it to start
|
||||
|
||||
@@ -1,301 +0,0 @@
|
||||
# Supporting custom user Extensions
|
||||
|
||||
Created 2023-05-03
|
||||
|
||||
## Motivation
|
||||
|
||||
There are many extensions in the PostgreSQL ecosystem, and not all extensions
|
||||
are of a quality that we can confidently support them. Additionally, our
|
||||
current extension inclusion mechanism has several problems because we build all
|
||||
extensions into the primary Compute image: We build the extensions every time
|
||||
we build the compute image regardless of whether we actually need to rebuild
|
||||
the image, and the inclusion of these extensions in the image adds a hard
|
||||
dependency on all supported extensions - thus increasing the image size, and
|
||||
with it the time it takes to download that image - increasing first start
|
||||
latency.
|
||||
|
||||
This RFC proposes a dynamic loading mechanism that solves most of these
|
||||
problems.
|
||||
|
||||
## Summary
|
||||
|
||||
`compute_ctl` is made responsible for loading extensions on-demand into
|
||||
the container's file system for dynamically loaded extensions, and will also
|
||||
make sure that the extensions in `shared_preload_libraries` are downloaded
|
||||
before the compute node starts.
|
||||
|
||||
## Components
|
||||
|
||||
compute_ctl, PostgreSQL, neon (extension), Compute Host Node, Extension Store
|
||||
|
||||
## Requirements
|
||||
|
||||
Compute nodes with no extra extensions should not be negatively impacted by
|
||||
the existence of support for many extensions.
|
||||
|
||||
Installing an extension into PostgreSQL should be easy.
|
||||
|
||||
Non-preloaded extensions shouldn't impact startup latency.
|
||||
|
||||
Uninstalled extensions shouldn't impact query latency.
|
||||
|
||||
A small latency penalty for dynamically loaded extensions is acceptable in
|
||||
the first seconds of compute startup, but not in steady-state operations.
|
||||
|
||||
## Proposed implementation
|
||||
|
||||
### On-demand, JIT-loading of extensions
|
||||
|
||||
TLDR; we download extensions as soon as we need them, or when we have spare
|
||||
time.
|
||||
|
||||
That means, we first download the extensions required to start the PostMaster
|
||||
(`shared_preload_libraries` and their dependencies), then the libraries required
|
||||
before a backend can start processing user input (`preload_libraries` and
|
||||
dependencies), and then (with network limits applied) the remainder of the
|
||||
configured extensions, with prioritization for installed extensions.
|
||||
|
||||
If PostgreSQL tries to load a library that is not yet fully on disk, it will
|
||||
ask `compute_ctl` first if the extension has been downloaded yet, and will wait
|
||||
for `compute_ctl` to finish downloading that extension. `compute_ctl` will
|
||||
prioritize downloading that extension over other extensions that were not yet
|
||||
requested.
|
||||
|
||||
#### Workflow
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
autonumber
|
||||
participant EX as External (control plane, ...)
|
||||
participant CTL as compute_ctl
|
||||
participant ST as extension store
|
||||
actor PG as PostgreSQL
|
||||
|
||||
EX ->>+ CTL: Start compute with config X
|
||||
|
||||
note over CTL: The configuration contains a list of all <br/>extensions available to that compute node, etc.
|
||||
|
||||
par Optionally parallel or concurrent
|
||||
loop Available extensions
|
||||
CTL ->>+ ST: Download control file of extension
|
||||
activate CTL
|
||||
ST ->>- CTL: Finish downloading control file
|
||||
CTL ->>- CTL: Put control file in extensions directory
|
||||
end
|
||||
|
||||
loop For each extension in shared_preload_libraries
|
||||
CTL ->>+ ST: Download extension's data
|
||||
activate CTL
|
||||
ST ->>- CTL: Finish downloading
|
||||
CTL ->>- CTL: Put extension's files in the right place
|
||||
end
|
||||
end
|
||||
|
||||
CTL ->>+ PG: Start PostgreSQL
|
||||
|
||||
note over CTL: PostgreSQL can now start accepting <br/>connections. However, users may still need to wait <br/>for preload_libraries extensions to get downloaded.
|
||||
|
||||
par Load preload_libraries
|
||||
loop For each extension in preload_libraries
|
||||
CTL ->>+ ST: Download extension's data
|
||||
activate CTL
|
||||
ST ->>- CTL: Finish downloading
|
||||
CTL ->>- CTL: Put extension's files in the right place
|
||||
end
|
||||
end
|
||||
|
||||
note over CTL: After this, connections don't have any hard <br/>waits for extension files left, except for those <br/>connections that override preload_libraries <br/>in their startup packet
|
||||
|
||||
par PG's internal_load_library(library)
|
||||
alt Library is not yet loaded
|
||||
PG ->>+ CTL: Load library X
|
||||
CTL ->>+ ST: Download the extension that provides X
|
||||
ST ->>- CTL: Finish downloading
|
||||
CTL ->> CTL: Put extension's files in the right place
|
||||
CTL ->>- PG: Ready
|
||||
else Library is already loaded
|
||||
note over PG: No-op
|
||||
end
|
||||
and Download all remaining extensions
|
||||
loop Extension X
|
||||
CTL ->>+ ST: Download not-yet-downloaded extension X
|
||||
activate CTL
|
||||
ST ->>- CTL: Finish downloading
|
||||
CTL ->>- CTL: Put extension's files in the right place
|
||||
end
|
||||
end
|
||||
|
||||
deactivate PG
|
||||
deactivate CTL
|
||||
```
|
||||
|
||||
#### Summary
|
||||
|
||||
Pros:
|
||||
- Startup is only as slow as it takes to load all (shared_)preload_libraries
|
||||
- Supports BYO Extension
|
||||
|
||||
Cons:
|
||||
- O(sizeof(extensions)) IO requirement for loading all extensions.
|
||||
|
||||
### Alternative solutions
|
||||
|
||||
1. Allow users to add their extensions to the base image
|
||||
|
||||
Pros:
|
||||
- Easy to deploy
|
||||
|
||||
Cons:
|
||||
- Doesn't scale - first start size is dependent on image size;
|
||||
- All extensions are shared across all users: It doesn't allow users to
|
||||
bring their own restrictive-licensed extensions
|
||||
|
||||
2. Bring Your Own compute image
|
||||
|
||||
Pros:
|
||||
- Still easy to deploy
|
||||
- User can bring own patched version of PostgreSQL
|
||||
|
||||
Cons:
|
||||
- First start latency is O(sizeof(extensions image))
|
||||
- Warm instance pool for skipping pod schedule latency is not feasible with
|
||||
O(n) custom images
|
||||
- Support channels are difficult to manage
|
||||
|
||||
3. Download all user extensions in bulk on compute start
|
||||
|
||||
Pros:
|
||||
- Easy to deploy
|
||||
- No startup latency issues for "clean" users.
|
||||
- Warm instance pool for skipping pod schedule latency is possible
|
||||
|
||||
Cons:
|
||||
- Downloading all extensions in advance takes a lot of time, thus startup
|
||||
latency issues
|
||||
|
||||
4. Store user's extensions in persistent storage
|
||||
|
||||
Pros:
|
||||
- Easy to deploy
|
||||
- No startup latency issues
|
||||
- Warm instance pool for skipping pod schedule latency is possible
|
||||
|
||||
Cons:
|
||||
- EC2 instances have only limited number of attachments shared between EBS
|
||||
volumes, direct-attached NVMe drives, and ENIs.
|
||||
- Compute instance migration isn't trivially solved for EBS mounts (e.g.
|
||||
the device is unavailable whilst moving the mount between instances).
|
||||
- EBS can only mount on one instance at a time (except the expensive IO2
|
||||
device type).
|
||||
|
||||
5. Store user's extensions in network drive
|
||||
|
||||
Pros:
|
||||
- Easy to deploy
|
||||
- Few startup latency issues
|
||||
- Warm instance pool for skipping pod schedule latency is possible
|
||||
|
||||
Cons:
|
||||
- We'd need networked drives, and a lot of them, which would store many
|
||||
duplicate extensions.
|
||||
- **UNCHECKED:** Compute instance migration may not work nicely with
|
||||
networked IOs
|
||||
|
||||
|
||||
### Idea extensions
|
||||
|
||||
The extension store does not have to be S3 directly, but could be a Node-local
|
||||
caching service on top of S3. This would reduce the load on the network for
|
||||
popular extensions.
|
||||
|
||||
## Extension Store implementation
|
||||
|
||||
Extension Store in our case is a private S3 bucket.
|
||||
Extensions are stored as tarballs in the bucket. The tarball contains the extension's control file and all the files that the extension needs to run.
|
||||
|
||||
We may also store the control file separately from the tarball to speed up the extension loading.
|
||||
|
||||
`s3://<the-bucket>/extensions/ext-name/sha-256+1234abcd1234abcd1234abcd1234abcd/bundle.tar`
|
||||
|
||||
where `ext-name` is an extension name and `sha-256+1234abcd1234abcd1234abcd1234abcd` is a hash of a specific extension version tarball.
|
||||
|
||||
To ensure security, there is no direct access to the S3 bucket from compute node.
|
||||
|
||||
Control plane forms a list of extensions available to the compute node
|
||||
and forms a short-lived [pre-signed URL](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ShareObjectPreSignedURL.html)
|
||||
for each extension that is available to the compute node.
|
||||
|
||||
so, `compute_ctl` receives spec in the following format
|
||||
|
||||
```
|
||||
"extensions": [{
|
||||
"meta_format": 1,
|
||||
"extension_name": "postgis",
|
||||
"link": "https://<the-bucket>/extensions/sha-256+1234abcd1234abcd1234abcd1234abcd/bundle.tar?AWSAccessKeyId=1234abcd1234abcd1234abcd1234abcd&Expires=1234567890&Signature=1234abcd1234abcd1234abcd1234abcd",
|
||||
...
|
||||
}]
|
||||
```
|
||||
|
||||
`compute_ctl` then downloads the extension from the link and unpacks it to the right place.
|
||||
|
||||
### How do we handle private extensions?
|
||||
|
||||
Private and public extensions are treated equally from the Extension Store perspective.
|
||||
The only difference is that the private extensions are not listed in the user UI (managed by control plane).
|
||||
|
||||
### How to add new extension to the Extension Store?
|
||||
|
||||
Since we need to verify that the extension is compatible with the compute node and doesn't contain any malicious code,
|
||||
we need to review the extension before adding it to the Extension Store.
|
||||
|
||||
I do not expect that we will have a lot of extensions to review, so we can do it manually for now.
|
||||
|
||||
Some admin UI may be added later to automate this process.
|
||||
|
||||
The list of extensions available to a compute node is stored in the console database.
|
||||
|
||||
### How is the list of available extensions managed?
|
||||
|
||||
We need to add new tables to the console database to store the list of available extensions, their versions and access rights.
|
||||
|
||||
something like this:
|
||||
|
||||
```
|
||||
CREATE TABLE extensions (
|
||||
id SERIAL PRIMARY KEY,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
version VARCHAR(255) NOT NULL,
|
||||
hash VARCHAR(255) NOT NULL, // this is the path to the extension in the Extension Store
|
||||
supported_postgres_versions integer[] NOT NULL,
|
||||
is_public BOOLEAN NOT NULL, // public extensions are available to all users
|
||||
is_shared_preload BOOLEAN NOT NULL, // these extensions require postgres restart
|
||||
is_preload BOOLEAN NOT NULL,
|
||||
license VARCHAR(255) NOT NULL,
|
||||
);
|
||||
|
||||
CREATE TABLE user_extensions (
|
||||
user_id INTEGER NOT NULL,
|
||||
extension_id INTEGER NOT NULL,
|
||||
FOREIGN KEY (user_id) REFERENCES users (id),
|
||||
FOREIGN KEY (extension_id) REFERENCES extensions (id)
|
||||
);
|
||||
```
|
||||
|
||||
When new extension is added to the Extension Store, we add a new record to the table and set permissions.
|
||||
|
||||
In UI, user may select the extensions that they want to use with their compute node.
|
||||
|
||||
NOTE: Extensions that require postgres restart will not be available until the next compute restart.
|
||||
Also, currently user cannot force postgres restart. We should add this feature later.
|
||||
|
||||
For other extensions, we must communicate updates to `compute_ctl` and they will be downloaded in the background.
|
||||
|
||||
### How can user update the extension?
|
||||
|
||||
User can update the extension by selecting the new version of the extension in the UI.
|
||||
|
||||
### Alternatives
|
||||
|
||||
For extensions written on trusted languages we can also adopt
|
||||
`dbdev` PostgreSQL Package Manager based on `pg_tle` by Supabase.
|
||||
This will increase the amount supported extensions and decrease the amount of work required to support them.
|
||||
@@ -70,14 +70,6 @@ impl RemotePath {
|
||||
pub fn join(&self, segment: &Path) -> Self {
|
||||
Self(self.0.join(segment))
|
||||
}
|
||||
|
||||
pub fn get_path(&self) -> &PathBuf {
|
||||
&self.0
|
||||
}
|
||||
|
||||
pub fn extension(&self) -> Option<&str> {
|
||||
self.0.extension()?.to_str()
|
||||
}
|
||||
}
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
@@ -94,19 +86,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
prefix: Option<&RemotePath>,
|
||||
) -> Result<Vec<RemotePath>, DownloadError>;
|
||||
|
||||
/// Lists all files in directory "recursively"
|
||||
/// (not really recursively, because AWS has a flat namespace)
|
||||
/// Note: This is subtely different than list_prefixes,
|
||||
/// because it is for listing files instead of listing
|
||||
/// names sharing common prefixes.
|
||||
/// For example,
|
||||
/// list_files("foo/bar") = ["foo/bar/cat123.txt",
|
||||
/// "foo/bar/cat567.txt", "foo/bar/dog123.txt", "foo/bar/dog456.txt"]
|
||||
/// whereas,
|
||||
/// list_prefixes("foo/bar/") = ["cat", "dog"]
|
||||
/// See `test_real_s3.rs` for more details.
|
||||
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>>;
|
||||
|
||||
/// Streams the local file contents into remote into the remote storage entry.
|
||||
async fn upload(
|
||||
&self,
|
||||
@@ -184,20 +163,6 @@ pub enum GenericRemoteStorage {
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
// A function for listing all the files in a "directory"
|
||||
// Example:
|
||||
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
|
||||
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.list_files(folder).await,
|
||||
Self::AwsS3(s) => s.list_files(folder).await,
|
||||
Self::Unreliable(s) => s.list_files(folder).await,
|
||||
}
|
||||
}
|
||||
|
||||
// lists common *prefixes*, if any of files
|
||||
// Example:
|
||||
// list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
|
||||
pub async fn list_prefixes(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
|
||||
@@ -48,14 +48,6 @@ impl LocalFs {
|
||||
Ok(Self { storage_root })
|
||||
}
|
||||
|
||||
// mirrors S3Bucket::s3_object_to_relative_path
|
||||
fn local_file_to_relative_path(&self, key: PathBuf) -> RemotePath {
|
||||
let relative_path = key
|
||||
.strip_prefix(&self.storage_root)
|
||||
.expect("relative path must contain storage_root as prefix");
|
||||
RemotePath(relative_path.into())
|
||||
}
|
||||
|
||||
async fn read_storage_metadata(
|
||||
&self,
|
||||
file_path: &Path,
|
||||
@@ -140,34 +132,6 @@ impl RemoteStorage for LocalFs {
|
||||
Ok(prefixes)
|
||||
}
|
||||
|
||||
// recursively lists all files in a directory,
|
||||
// mirroring the `list_files` for `s3_bucket`
|
||||
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||
let full_path = match folder {
|
||||
Some(folder) => folder.with_base(&self.storage_root),
|
||||
None => self.storage_root.clone(),
|
||||
};
|
||||
let mut files = vec![];
|
||||
let mut directory_queue = vec![full_path.clone()];
|
||||
|
||||
while !directory_queue.is_empty() {
|
||||
let cur_folder = directory_queue
|
||||
.pop()
|
||||
.expect("queue cannot be empty: we just checked");
|
||||
let mut entries = fs::read_dir(cur_folder.clone()).await?;
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let file_name: PathBuf = entry.file_name().into();
|
||||
let full_file_name = cur_folder.clone().join(&file_name);
|
||||
let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
|
||||
files.push(file_remote_path.clone());
|
||||
if full_file_name.is_dir() {
|
||||
directory_queue.push(full_file_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
data: impl io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
|
||||
@@ -347,52 +347,6 @@ impl RemoteStorage for S3Bucket {
|
||||
Ok(document_keys)
|
||||
}
|
||||
|
||||
/// See the doc for `RemoteStorage::list_files`
|
||||
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||
// TODO: if bucket prefix is empty, folder is prefixed with a "/" I think. Is this desired?
|
||||
let folder_name = folder
|
||||
.map(|p| self.relative_path_to_s3_object(p))
|
||||
.or_else(|| self.prefix_in_bucket.clone());
|
||||
|
||||
// AWS may need to break the response into several parts
|
||||
let mut continuation_token = None;
|
||||
let mut all_files = vec![];
|
||||
loop {
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 list_files")?;
|
||||
metrics::inc_list_objects();
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.list_objects_v2()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.set_prefix(folder_name.clone())
|
||||
.set_continuation_token(continuation_token)
|
||||
.set_max_keys(self.max_keys_per_list_response)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
metrics::inc_list_objects_fail();
|
||||
e
|
||||
})
|
||||
.context("Failed to list files in S3 bucket")?;
|
||||
|
||||
for object in response.contents().unwrap_or_default() {
|
||||
let object_path = object.key().expect("response does not contain a key");
|
||||
let remote_path = self.s3_object_to_relative_path(object_path);
|
||||
all_files.push(remote_path);
|
||||
}
|
||||
match response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
Ok(all_files)
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
|
||||
@@ -83,11 +83,6 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
self.inner.list_prefixes(prefix).await
|
||||
}
|
||||
|
||||
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||
self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?;
|
||||
self.inner.list_files(folder).await
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
|
||||
@@ -88,58 +88,6 @@ async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> any
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries.
|
||||
/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set.
|
||||
/// See `s3_pagination_should_work` for more information.
|
||||
///
|
||||
/// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_s3_data`]
|
||||
/// Then performs the following queries:
|
||||
/// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt`
|
||||
/// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt`
|
||||
#[test_context(MaybeEnabledS3WithSimpleTestBlobs)]
|
||||
#[tokio::test]
|
||||
async fn s3_list_files_works(ctx: &mut MaybeEnabledS3WithSimpleTestBlobs) -> anyhow::Result<()> {
|
||||
let ctx = match ctx {
|
||||
MaybeEnabledS3WithSimpleTestBlobs::Enabled(ctx) => ctx,
|
||||
MaybeEnabledS3WithSimpleTestBlobs::Disabled => return Ok(()),
|
||||
MaybeEnabledS3WithSimpleTestBlobs::UploadsFailed(e, _) => {
|
||||
anyhow::bail!("S3 init failed: {e:?}")
|
||||
}
|
||||
};
|
||||
let test_client = Arc::clone(&ctx.enabled.client);
|
||||
let base_prefix =
|
||||
RemotePath::new(Path::new("folder1")).context("common_prefix construction")?;
|
||||
let root_files = test_client
|
||||
.list_files(None)
|
||||
.await
|
||||
.context("client list root files failure")?
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>();
|
||||
assert_eq!(
|
||||
root_files,
|
||||
ctx.remote_blobs.clone(),
|
||||
"remote storage list_files on root mismatches with the uploads."
|
||||
);
|
||||
let nested_remote_files = test_client
|
||||
.list_files(Some(&base_prefix))
|
||||
.await
|
||||
.context("client list nested files failure")?
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>();
|
||||
let trim_remote_blobs: HashSet<_> = ctx
|
||||
.remote_blobs
|
||||
.iter()
|
||||
.map(|x| x.get_path().to_str().expect("must be valid name"))
|
||||
.filter(|x| x.starts_with("folder1"))
|
||||
.map(|x| RemotePath::new(Path::new(x)).expect("must be valid name"))
|
||||
.collect();
|
||||
assert_eq!(
|
||||
nested_remote_files, trim_remote_blobs,
|
||||
"remote storage list_files on subdirrectory mismatches with the uploads."
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_context(MaybeEnabledS3)]
|
||||
#[tokio::test]
|
||||
async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> {
|
||||
@@ -300,66 +248,6 @@ impl AsyncTestContext for MaybeEnabledS3WithTestBlobs {
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: the setups for the list_prefixes test and the list_files test are very similar
|
||||
// However, they are not idential. The list_prefixes function is concerned with listing prefixes,
|
||||
// whereas the list_files function is concerned with listing files.
|
||||
// See `RemoteStorage::list_files` documentation for more details
|
||||
enum MaybeEnabledS3WithSimpleTestBlobs {
|
||||
Enabled(S3WithSimpleTestBlobs),
|
||||
Disabled,
|
||||
UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs),
|
||||
}
|
||||
struct S3WithSimpleTestBlobs {
|
||||
enabled: EnabledS3,
|
||||
remote_blobs: HashSet<RemotePath>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl AsyncTestContext for MaybeEnabledS3WithSimpleTestBlobs {
|
||||
async fn setup() -> Self {
|
||||
ensure_logging_ready();
|
||||
if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() {
|
||||
info!(
|
||||
"`{}` env variable is not set, skipping the test",
|
||||
ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME
|
||||
);
|
||||
return Self::Disabled;
|
||||
}
|
||||
|
||||
let max_keys_in_list_response = 10;
|
||||
let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap());
|
||||
|
||||
let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await;
|
||||
|
||||
match upload_simple_s3_data(&enabled.client, upload_tasks_count).await {
|
||||
ControlFlow::Continue(uploads) => {
|
||||
info!("Remote objects created successfully");
|
||||
|
||||
Self::Enabled(S3WithSimpleTestBlobs {
|
||||
enabled,
|
||||
remote_blobs: uploads,
|
||||
})
|
||||
}
|
||||
ControlFlow::Break(uploads) => Self::UploadsFailed(
|
||||
anyhow::anyhow!("One or multiple blobs failed to upload to S3"),
|
||||
S3WithSimpleTestBlobs {
|
||||
enabled,
|
||||
remote_blobs: uploads,
|
||||
},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
async fn teardown(self) {
|
||||
match self {
|
||||
Self::Disabled => {}
|
||||
Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => {
|
||||
cleanup(&ctx.enabled.client, ctx.remote_blobs).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn create_s3_client(
|
||||
max_keys_per_list_response: Option<i32>,
|
||||
) -> anyhow::Result<Arc<GenericRemoteStorage>> {
|
||||
@@ -370,7 +258,7 @@ fn create_s3_client(
|
||||
let random_prefix_part = std::time::SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.context("random s3 test prefix part calculation")?
|
||||
.as_nanos();
|
||||
.as_millis();
|
||||
let remote_storage_config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: NonZeroUsize::new(100).unwrap(),
|
||||
max_sync_errors: NonZeroU32::new(5).unwrap(),
|
||||
@@ -476,52 +364,3 @@ async fn cleanup(client: &Arc<GenericRemoteStorage>, objects_to_delete: HashSet<
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Uploads files `folder{j}/blob{i}.txt`. See test description for more details.
|
||||
async fn upload_simple_s3_data(
|
||||
client: &Arc<GenericRemoteStorage>,
|
||||
upload_tasks_count: usize,
|
||||
) -> ControlFlow<HashSet<RemotePath>, HashSet<RemotePath>> {
|
||||
info!("Creating {upload_tasks_count} S3 files");
|
||||
let mut upload_tasks = JoinSet::new();
|
||||
for i in 1..upload_tasks_count + 1 {
|
||||
let task_client = Arc::clone(client);
|
||||
upload_tasks.spawn(async move {
|
||||
let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
|
||||
let blob_path = RemotePath::new(&blob_path)
|
||||
.with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
|
||||
debug!("Creating remote item {i} at path {blob_path:?}");
|
||||
|
||||
let data = format!("remote blob data {i}").into_bytes();
|
||||
let data_len = data.len();
|
||||
task_client
|
||||
.upload(std::io::Cursor::new(data), data_len, &blob_path, None)
|
||||
.await?;
|
||||
|
||||
Ok::<_, anyhow::Error>(blob_path)
|
||||
});
|
||||
}
|
||||
|
||||
let mut upload_tasks_failed = false;
|
||||
let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count);
|
||||
while let Some(task_run_result) = upload_tasks.join_next().await {
|
||||
match task_run_result
|
||||
.context("task join failed")
|
||||
.and_then(|task_result| task_result.context("upload task failed"))
|
||||
{
|
||||
Ok(upload_path) => {
|
||||
uploaded_blobs.insert(upload_path);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Upload task failed: {e:?}");
|
||||
upload_tasks_failed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if upload_tasks_failed {
|
||||
ControlFlow::Break(uploaded_blobs)
|
||||
} else {
|
||||
ControlFlow::Continue(uploaded_blobs)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use hyper::{header, Body, Response, StatusCode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::error::Error as StdError;
|
||||
use thiserror::Error;
|
||||
use tracing::error;
|
||||
|
||||
@@ -16,7 +15,7 @@ pub enum ApiError {
|
||||
Unauthorized(String),
|
||||
|
||||
#[error("NotFound: {0}")]
|
||||
NotFound(Box<dyn StdError + Send + Sync + 'static>),
|
||||
NotFound(anyhow::Error),
|
||||
|
||||
#[error("Conflict: {0}")]
|
||||
Conflict(String),
|
||||
|
||||
@@ -142,7 +142,7 @@ impl From<TenantMapInsertError> for ApiError {
|
||||
impl From<TenantStateError> for ApiError {
|
||||
fn from(tse: TenantStateError) -> ApiError {
|
||||
match tse {
|
||||
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
|
||||
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid)),
|
||||
_ => ApiError::InternalServerError(anyhow::Error::new(tse)),
|
||||
}
|
||||
}
|
||||
@@ -151,7 +151,7 @@ impl From<TenantStateError> for ApiError {
|
||||
impl From<GetTenantError> for ApiError {
|
||||
fn from(tse: GetTenantError) -> ApiError {
|
||||
match tse {
|
||||
GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
|
||||
GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid)),
|
||||
e @ GetTenantError::NotActive(_) => {
|
||||
// Why is this not `ApiError::NotFound`?
|
||||
// Because we must be careful to never return 404 for a tenant if it does
|
||||
@@ -169,7 +169,7 @@ impl From<SetNewTenantConfigError> for ApiError {
|
||||
fn from(e: SetNewTenantConfigError) -> ApiError {
|
||||
match e {
|
||||
SetNewTenantConfigError::GetTenant(tid) => {
|
||||
ApiError::NotFound(anyhow!("tenant {}", tid).into())
|
||||
ApiError::NotFound(anyhow!("tenant {}", tid))
|
||||
}
|
||||
e @ SetNewTenantConfigError::Persist(_) => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(e))
|
||||
@@ -182,7 +182,7 @@ impl From<crate::tenant::DeleteTimelineError> for ApiError {
|
||||
fn from(value: crate::tenant::DeleteTimelineError) -> Self {
|
||||
use crate::tenant::DeleteTimelineError::*;
|
||||
match value {
|
||||
NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()),
|
||||
NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found")),
|
||||
HasChildren(children) => ApiError::PreconditionFailed(
|
||||
format!("Cannot delete timeline which has child timelines: {children:?}")
|
||||
.into_boxed_str(),
|
||||
@@ -397,7 +397,7 @@ async fn timeline_detail_handler(
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, false)
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?;
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
let timeline_info = build_timeline_info(
|
||||
&timeline,
|
||||
@@ -1061,7 +1061,7 @@ async fn timeline_download_remote_layers_handler_get(
|
||||
let info = timeline
|
||||
.get_download_all_remote_layers_task_info()
|
||||
.context("task never started since last pageserver process start")
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?;
|
||||
.map_err(ApiError::NotFound)?;
|
||||
json_response(StatusCode::OK, info)
|
||||
}
|
||||
|
||||
@@ -1072,7 +1072,7 @@ async fn active_timeline_of_active_tenant(
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(|e| ApiError::NotFound(e.into()))
|
||||
.map_err(ApiError::NotFound)
|
||||
}
|
||||
|
||||
async fn always_panic_handler(
|
||||
|
||||
@@ -390,9 +390,7 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(|e| anyhow::anyhow!(e))?;
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
|
||||
// switch client to COPYBOTH
|
||||
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
|
||||
@@ -1232,6 +1230,6 @@ async fn get_active_tenant_timeline(
|
||||
.map_err(GetActiveTimelineError::Tenant)?;
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(|e| GetActiveTimelineError::Timeline(anyhow::anyhow!(e)))?;
|
||||
.map_err(GetActiveTimelineError::Timeline)?;
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
@@ -421,21 +421,6 @@ remote:
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
|
||||
pub enum GetTimelineError {
|
||||
#[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
|
||||
NotActive {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
state: TimelineState,
|
||||
},
|
||||
#[error("Timeline {tenant_id}/{timeline_id} was not found")]
|
||||
NotFound {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DeleteTimelineError {
|
||||
#[error("NotFound")]
|
||||
@@ -961,117 +946,6 @@ impl Tenant {
|
||||
tenant
|
||||
}
|
||||
|
||||
pub fn scan_and_sort_timelines_dir(
|
||||
self: Arc<Tenant>,
|
||||
) -> anyhow::Result<Vec<(TimelineId, TimelineMetadata)>> {
|
||||
let timelines_dir = self.conf.timelines_path(&self.tenant_id);
|
||||
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
|
||||
|
||||
for entry in
|
||||
std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")?
|
||||
{
|
||||
let entry = entry.context("read timeline dir entry")?;
|
||||
let timeline_dir = entry.path();
|
||||
|
||||
if crate::is_temporary(&timeline_dir) {
|
||||
info!(
|
||||
"Found temporary timeline directory, removing: {}",
|
||||
timeline_dir.display()
|
||||
);
|
||||
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
|
||||
error!(
|
||||
"Failed to remove temporary directory '{}': {:?}",
|
||||
timeline_dir.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
} else if is_uninit_mark(&timeline_dir) {
|
||||
if !timeline_dir.exists() {
|
||||
warn!(
|
||||
"Timeline dir entry become invalid: {}",
|
||||
timeline_dir.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let timeline_uninit_mark_file = &timeline_dir;
|
||||
info!(
|
||||
"Found an uninit mark file {}, removing the timeline and its uninit mark",
|
||||
timeline_uninit_mark_file.display()
|
||||
);
|
||||
let timeline_id = timeline_uninit_mark_file
|
||||
.file_stem()
|
||||
.and_then(OsStr::to_str)
|
||||
.unwrap_or_default()
|
||||
.parse::<TimelineId>()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Could not parse timeline id out of the timeline uninit mark name {}",
|
||||
timeline_uninit_mark_file.display()
|
||||
)
|
||||
})?;
|
||||
let timeline_dir = self.conf.timeline_path(&timeline_id, &self.tenant_id);
|
||||
if let Err(e) =
|
||||
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
|
||||
{
|
||||
error!("Failed to clean up uninit marked timeline: {e:?}");
|
||||
}
|
||||
} else {
|
||||
if !timeline_dir.exists() {
|
||||
warn!(
|
||||
"Timeline dir entry become invalid: {}",
|
||||
timeline_dir.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let timeline_id = timeline_dir
|
||||
.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
.unwrap_or_default()
|
||||
.parse::<TimelineId>()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Could not parse timeline id out of the timeline dir name {}",
|
||||
timeline_dir.display()
|
||||
)
|
||||
})?;
|
||||
let timeline_uninit_mark_file = self
|
||||
.conf
|
||||
.timeline_uninit_mark_file_path(self.tenant_id, timeline_id);
|
||||
if timeline_uninit_mark_file.exists() {
|
||||
info!(
|
||||
%timeline_id,
|
||||
"Found an uninit mark file, removing the timeline and its uninit mark",
|
||||
);
|
||||
if let Err(e) =
|
||||
remove_timeline_and_uninit_mark(&timeline_dir, &timeline_uninit_mark_file)
|
||||
{
|
||||
error!("Failed to clean up uninit marked timeline: {e:?}");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let file_name = entry.file_name();
|
||||
if let Ok(timeline_id) =
|
||||
file_name.to_str().unwrap_or_default().parse::<TimelineId>()
|
||||
{
|
||||
let metadata = load_metadata(self.conf, timeline_id, self.tenant_id)
|
||||
.context("failed to load metadata")?;
|
||||
timelines_to_load.insert(timeline_id, metadata);
|
||||
} else {
|
||||
// A file or directory that doesn't look like a timeline ID
|
||||
warn!(
|
||||
"unexpected file or directory in timelines directory: {}",
|
||||
file_name.to_string_lossy()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort the array of timeline IDs into tree-order, so that parent comes before
|
||||
// all its children.
|
||||
tree_sort_timelines(timelines_to_load)
|
||||
}
|
||||
|
||||
///
|
||||
/// Background task to load in-memory data structures for this tenant, from
|
||||
/// files on disk. Used at pageserver startup.
|
||||
@@ -1088,16 +962,110 @@ impl Tenant {
|
||||
|
||||
utils::failpoint_sleep_millis_async!("before-loading-tenant");
|
||||
|
||||
// TODO split this into two functions, scan and actual load
|
||||
|
||||
// Load in-memory state to reflect the local files on disk
|
||||
//
|
||||
// Scan the directory, peek into the metadata file of each timeline, and
|
||||
// collect a list of timelines and their ancestors.
|
||||
let tenant_id = self.tenant_id;
|
||||
let conf = self.conf;
|
||||
let span = info_span!("blocking");
|
||||
let cloned = Arc::clone(self);
|
||||
|
||||
let sorted_timelines: Vec<(_, _)> = tokio::task::spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
cloned.scan_and_sort_timelines_dir()
|
||||
let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
|
||||
let timelines_dir = conf.timelines_path(&tenant_id);
|
||||
|
||||
for entry in
|
||||
std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")?
|
||||
{
|
||||
let entry = entry.context("read timeline dir entry")?;
|
||||
let timeline_dir = entry.path();
|
||||
|
||||
if crate::is_temporary(&timeline_dir) {
|
||||
info!(
|
||||
"Found temporary timeline directory, removing: {}",
|
||||
timeline_dir.display()
|
||||
);
|
||||
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
|
||||
error!(
|
||||
"Failed to remove temporary directory '{}': {:?}",
|
||||
timeline_dir.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
} else if is_uninit_mark(&timeline_dir) {
|
||||
let timeline_uninit_mark_file = &timeline_dir;
|
||||
info!(
|
||||
"Found an uninit mark file {}, removing the timeline and its uninit mark",
|
||||
timeline_uninit_mark_file.display()
|
||||
);
|
||||
let timeline_id = timeline_uninit_mark_file
|
||||
.file_stem()
|
||||
.and_then(OsStr::to_str)
|
||||
.unwrap_or_default()
|
||||
.parse::<TimelineId>()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Could not parse timeline id out of the timeline uninit mark name {}",
|
||||
timeline_uninit_mark_file.display()
|
||||
)
|
||||
})?;
|
||||
let timeline_dir = conf.timeline_path(&timeline_id, &tenant_id);
|
||||
if let Err(e) =
|
||||
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
|
||||
{
|
||||
error!("Failed to clean up uninit marked timeline: {e:?}");
|
||||
}
|
||||
} else {
|
||||
let timeline_id = timeline_dir
|
||||
.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
.unwrap_or_default()
|
||||
.parse::<TimelineId>()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Could not parse timeline id out of the timeline dir name {}",
|
||||
timeline_dir.display()
|
||||
)
|
||||
})?;
|
||||
let timeline_uninit_mark_file =
|
||||
conf.timeline_uninit_mark_file_path(tenant_id, timeline_id);
|
||||
if timeline_uninit_mark_file.exists() {
|
||||
info!(
|
||||
%timeline_id,
|
||||
"Found an uninit mark file, removing the timeline and its uninit mark",
|
||||
);
|
||||
if let Err(e) = remove_timeline_and_uninit_mark(
|
||||
&timeline_dir,
|
||||
&timeline_uninit_mark_file,
|
||||
) {
|
||||
error!("Failed to clean up uninit marked timeline: {e:?}");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let file_name = entry.file_name();
|
||||
if let Ok(timeline_id) =
|
||||
file_name.to_str().unwrap_or_default().parse::<TimelineId>()
|
||||
{
|
||||
let metadata = load_metadata(conf, timeline_id, tenant_id)
|
||||
.context("failed to load metadata")?;
|
||||
timelines_to_load.insert(timeline_id, metadata);
|
||||
} else {
|
||||
// A file or directory that doesn't look like a timeline ID
|
||||
warn!(
|
||||
"unexpected file or directory in timelines directory: {}",
|
||||
file_name.to_string_lossy()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort the array of timeline IDs into tree-order, so that parent comes before
|
||||
// all its children.
|
||||
tree_sort_timelines(timelines_to_load)
|
||||
})
|
||||
.await
|
||||
.context("load spawn_blocking")
|
||||
@@ -1245,21 +1213,19 @@ impl Tenant {
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
active_only: bool,
|
||||
) -> Result<Arc<Timeline>, GetTimelineError> {
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let timelines_accessor = self.timelines.lock().unwrap();
|
||||
let timeline = timelines_accessor
|
||||
.get(&timeline_id)
|
||||
.ok_or(GetTimelineError::NotFound {
|
||||
tenant_id: self.tenant_id,
|
||||
timeline_id,
|
||||
})?;
|
||||
let timeline = timelines_accessor.get(&timeline_id).with_context(|| {
|
||||
format!("Timeline {}/{} was not found", self.tenant_id, timeline_id)
|
||||
})?;
|
||||
|
||||
if active_only && !timeline.is_active() {
|
||||
Err(GetTimelineError::NotActive {
|
||||
tenant_id: self.tenant_id,
|
||||
anyhow::bail!(
|
||||
"Timeline {}/{} is not active, state: {:?}",
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
state: timeline.current_state(),
|
||||
})
|
||||
timeline.current_state()
|
||||
)
|
||||
} else {
|
||||
Ok(Arc::clone(timeline))
|
||||
}
|
||||
@@ -3409,8 +3375,9 @@ where
|
||||
#[cfg(test)]
|
||||
pub mod harness {
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use once_cell::sync::Lazy;
|
||||
use once_cell::sync::OnceCell;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use std::{fs, path::PathBuf};
|
||||
use utils::logging;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -3443,6 +3410,8 @@ pub mod harness {
|
||||
buf.freeze()
|
||||
}
|
||||
|
||||
static LOCK: Lazy<RwLock<()>> = Lazy::new(|| RwLock::new(()));
|
||||
|
||||
impl From<TenantConf> for TenantConfOpt {
|
||||
fn from(tenant_conf: TenantConf) -> Self {
|
||||
Self {
|
||||
@@ -3469,16 +3438,33 @@ pub mod harness {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TenantHarness {
|
||||
pub struct TenantHarness<'a> {
|
||||
pub conf: &'static PageServerConf,
|
||||
pub tenant_conf: TenantConf,
|
||||
pub tenant_id: TenantId,
|
||||
|
||||
pub lock_guard: (
|
||||
Option<RwLockReadGuard<'a, ()>>,
|
||||
Option<RwLockWriteGuard<'a, ()>>,
|
||||
),
|
||||
}
|
||||
|
||||
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
|
||||
|
||||
impl TenantHarness {
|
||||
impl<'a> TenantHarness<'a> {
|
||||
pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
|
||||
Self::create_internal(test_name, false)
|
||||
}
|
||||
pub fn create_exclusive(test_name: &'static str) -> anyhow::Result<Self> {
|
||||
Self::create_internal(test_name, true)
|
||||
}
|
||||
fn create_internal(test_name: &'static str, exclusive: bool) -> anyhow::Result<Self> {
|
||||
let lock_guard = if exclusive {
|
||||
(None, Some(LOCK.write().unwrap()))
|
||||
} else {
|
||||
(Some(LOCK.read().unwrap()), None)
|
||||
};
|
||||
|
||||
LOG_HANDLE.get_or_init(|| {
|
||||
logging::init(
|
||||
logging::LogFormat::Test,
|
||||
@@ -3514,6 +3500,7 @@ pub mod harness {
|
||||
conf,
|
||||
tenant_conf,
|
||||
tenant_id,
|
||||
lock_guard,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3538,12 +3525,26 @@ pub mod harness {
|
||||
self.tenant_id,
|
||||
None,
|
||||
));
|
||||
// populate tenant with locally available timelines
|
||||
let mut timelines_to_load = HashMap::new();
|
||||
for timeline_dir_entry in fs::read_dir(self.conf.timelines_path(&self.tenant_id))
|
||||
.expect("should be able to read timelines dir")
|
||||
{
|
||||
let timeline_dir_entry = timeline_dir_entry?;
|
||||
let timeline_id: TimelineId = timeline_dir_entry
|
||||
.path()
|
||||
.file_name()
|
||||
.unwrap()
|
||||
.to_string_lossy()
|
||||
.parse()?;
|
||||
|
||||
let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?;
|
||||
timelines_to_load.insert(timeline_id, timeline_metadata);
|
||||
}
|
||||
tenant
|
||||
.load(None, ctx)
|
||||
.instrument(info_span!("try_load", tenant_id=%self.tenant_id))
|
||||
.await?;
|
||||
|
||||
// TODO reuse Tenant::activate (needs broker)
|
||||
tenant.state.send_replace(TenantState::Active);
|
||||
for timeline in tenant.timelines.lock().unwrap().values() {
|
||||
timeline.set_state(TimelineState::Active);
|
||||
@@ -4069,13 +4070,9 @@ mod tests {
|
||||
std::fs::write(metadata_path, metadata_bytes)?;
|
||||
|
||||
let err = harness.try_load(&ctx).await.err().expect("should fail");
|
||||
// get all the stack with all .context, not tonly the last one
|
||||
let message = format!("{err:#}");
|
||||
let expected = "Failed to parse metadata bytes from path";
|
||||
assert!(
|
||||
message.contains(expected),
|
||||
"message '{message}' expected to contain {expected}"
|
||||
);
|
||||
assert!(err
|
||||
.to_string()
|
||||
.starts_with("Failed to parse metadata bytes from path"));
|
||||
|
||||
let mut found_error_message = false;
|
||||
let mut err_source = err.source();
|
||||
@@ -4509,44 +4506,6 @@ mod tests {
|
||||
assert!(expect_initdb_optimization);
|
||||
assert!(initdb_optimization_count > 0);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_uninit_mark_crash() -> anyhow::Result<()> {
|
||||
let name = "test_uninit_mark_crash";
|
||||
let harness = TenantHarness::create(name)?;
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
// Keeps uninit mark in place
|
||||
std::mem::forget(tline);
|
||||
}
|
||||
|
||||
let (tenant, _) = harness.load().await;
|
||||
match tenant.get_timeline(TIMELINE_ID, false) {
|
||||
Ok(_) => panic!("timeline should've been removed during load"),
|
||||
Err(e) => {
|
||||
assert_eq!(
|
||||
e,
|
||||
GetTimelineError::NotFound {
|
||||
tenant_id: tenant.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
assert!(!harness
|
||||
.conf
|
||||
.timeline_path(&TIMELINE_ID, &tenant.tenant_id)
|
||||
.exists());
|
||||
|
||||
assert!(!harness
|
||||
.conf
|
||||
.timeline_uninit_mark_file_path(tenant.tenant_id, TIMELINE_ID)
|
||||
.exists());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -675,7 +675,7 @@ pub async fn immediate_gc(
|
||||
.get(&tenant_id)
|
||||
.map(Arc::clone)
|
||||
.with_context(|| format!("tenant {tenant_id}"))
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?;
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
|
||||
// Use tenant's pitr setting
|
||||
@@ -724,11 +724,11 @@ pub async fn immediate_compact(
|
||||
.get(&tenant_id)
|
||||
.map(Arc::clone)
|
||||
.with_context(|| format!("tenant {tenant_id}"))
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?;
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(|e| ApiError::NotFound(e.into()))?;
|
||||
.map_err(ApiError::NotFound)?;
|
||||
|
||||
// Run in task_mgr to avoid race with tenant_detach operation
|
||||
let ctx = ctx.detached_child(TaskKind::Compaction, DownloadBehavior::Download);
|
||||
|
||||
@@ -1367,7 +1367,7 @@ mod tests {
|
||||
struct TestSetup {
|
||||
runtime: &'static tokio::runtime::Runtime,
|
||||
entered_runtime: EnterGuard<'static>,
|
||||
harness: TenantHarness,
|
||||
harness: TenantHarness<'static>,
|
||||
tenant: Arc<Tenant>,
|
||||
tenant_ctx: RequestContext,
|
||||
remote_fs_dir: PathBuf,
|
||||
|
||||
@@ -3953,7 +3953,7 @@ impl Timeline {
|
||||
/// for example. The caller should hold `Tenant::gc_cs` lock to ensure
|
||||
/// that.
|
||||
///
|
||||
#[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
|
||||
#[instrument(skip_all, fields(timline_id=%self.timeline_id))]
|
||||
pub(super) async fn update_gc_info(
|
||||
&self,
|
||||
retain_lsns: Vec<Lsn>,
|
||||
|
||||
@@ -1321,7 +1321,7 @@ mod tests {
|
||||
|
||||
const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
|
||||
|
||||
async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
|
||||
async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState {
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
|
||||
@@ -4,11 +4,11 @@
|
||||
MODULE_big = neon
|
||||
OBJS = \
|
||||
$(WIN32RES) \
|
||||
extension_server.o \
|
||||
file_cache.o \
|
||||
libpagestore.o \
|
||||
libpqwalproposer.o \
|
||||
neon.o \
|
||||
access_stat.o \
|
||||
pagestore_smgr.o \
|
||||
relsize_cache.o \
|
||||
walproposer.o \
|
||||
|
||||
274
pgxn/neon/access_stat.c
Normal file
274
pgxn/neon/access_stat.c
Normal file
@@ -0,0 +1,274 @@
|
||||
|
||||
/*
|
||||
* We want this statistic to rpresent current access patern mthis is why when
|
||||
* (n_seq_accesses + n_rnd_accesses) > MAX_ACCESS_COUNTER then we divide both counters by two,
|
||||
* so decreasng weight of historical data
|
||||
*/
|
||||
#include "postgres.h"
|
||||
#include "funcapi.h"
|
||||
#include "miscadmin.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "storage/relfilenode.h"
|
||||
#include "utils/guc.h"
|
||||
|
||||
/* Structure used to predict sequential access */
|
||||
|
||||
typedef struct AccessStatEntry {
|
||||
RelFileNode relnode;
|
||||
BlockNumber blkno; /* last accessed black number */
|
||||
uint32 n_seq_accesses; /* number of sequential accesses (when block N+1 is accessed after block N) */
|
||||
uint32 n_rnd_accesses; /* number of random accesses */
|
||||
uint32 hash;
|
||||
uint32 status;
|
||||
uint64 access_count; /* total number of relation accesses since backend start */
|
||||
dlist_node lru_node; /* LRU list node */
|
||||
} AccessStatEntry;
|
||||
|
||||
#define SH_PREFIX as
|
||||
#define SH_ELEMENT_TYPE AccessStatEntry
|
||||
#define SH_KEY_TYPE RelFileNode
|
||||
#define SH_KEY relnode
|
||||
#define SH_STORE_HASH
|
||||
#define SH_GET_HASH(tb, a) ((a)->hash)
|
||||
#define SH_HASH_KEY(tb, key) hash_bytes( \
|
||||
((const unsigned char *) &(key)), \
|
||||
sizeof(RelFileNode) \
|
||||
)
|
||||
|
||||
#define SH_EQUAL(tb, a, b) RelFileNodeEquals((a), (b))
|
||||
#define SH_SCOPE static inline
|
||||
#define SH_DEFINE
|
||||
#define SH_DECLARE
|
||||
#include "lib/simplehash.h"
|
||||
|
||||
static as_hash *hash;
|
||||
static dlist_head lru;
|
||||
static int max_access_stat_size;
|
||||
static int max_access_stat_count;
|
||||
static double min_seq_access_ratio;
|
||||
static int min_seq_access_count;
|
||||
|
||||
void access_stat_init(void)
|
||||
{
|
||||
MemoryContext memctx = AllocSetContextCreate(TopMemoryContext,
|
||||
"NeonSMGR/access_stat",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
DefineCustomIntVariable("neon.max_access_stat_size",
|
||||
"Maximal size of Neon relation access statistic hash",
|
||||
NULL,
|
||||
&max_access_stat_size,
|
||||
1024,
|
||||
0,
|
||||
INT_MAX,
|
||||
PGC_USERSET,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
DefineCustomIntVariable("neon.max_access_stat_count",
|
||||
"Maximal value of relation access counter after which counters are divided by 2",
|
||||
NULL,
|
||||
&max_access_stat_count,
|
||||
1024,
|
||||
0,
|
||||
INT_MAX,
|
||||
PGC_USERSET,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
DefineCustomRealVariable("neon.min_seq_access_ratio",
|
||||
"Minimal seq/(rnd+seq) ratio to determine sequential access",
|
||||
NULL,
|
||||
&min_seq_access_ratio,
|
||||
0.9,
|
||||
0,
|
||||
INT_MAX,
|
||||
PGC_USERSET,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
DefineCustomIntVariable("neon.min_seq_access_count",
|
||||
"Minimal access count to determine sequetial access",
|
||||
NULL,
|
||||
&min_seq_access_count,
|
||||
10,
|
||||
0,
|
||||
INT_MAX,
|
||||
PGC_USERSET,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
hash = as_create(memctx, max_access_stat_size, NULL);
|
||||
dlist_init(&lru);
|
||||
}
|
||||
|
||||
|
||||
bool is_sequential_access(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
|
||||
{
|
||||
bool is_seq_access = false;
|
||||
if (forkNum == MAIN_FORKNUM /* prefetch makes sense only for main fork */
|
||||
&& max_access_stat_size != 0)
|
||||
{
|
||||
AccessStatEntry* entry = as_lookup(hash, rnode);
|
||||
if (entry == NULL)
|
||||
{
|
||||
bool found;
|
||||
/* New item */
|
||||
while (hash->members >= max_access_stat_size)
|
||||
{
|
||||
/* Hash overflow: find candidate for replacement */
|
||||
AccessStatEntry* victim = dlist_container(AccessStatEntry, lru_node, dlist_pop_head_node(&lru));
|
||||
as_delete_item(hash, victim);
|
||||
}
|
||||
entry = as_insert(hash, rnode, &found);
|
||||
Assert(!found);
|
||||
/* Set both counter to zero because we don't know whethr first access is sequential or random */
|
||||
entry->n_seq_accesses = 0;
|
||||
entry->n_rnd_accesses = 0;
|
||||
entry->access_count = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
uint32 access_count = entry->n_seq_accesses + entry->n_rnd_accesses;
|
||||
/*
|
||||
* We want this function to represent most recent access pattern,
|
||||
* so when number of accesses exceed threashold value `max_access_stat_count`
|
||||
* we divide bother coutners by two devaluing old data
|
||||
*/
|
||||
if (access_count >= max_access_stat_count)
|
||||
{
|
||||
entry->n_seq_accesses >>= 1;
|
||||
entry->n_rnd_accesses >>= 1;
|
||||
}
|
||||
if (entry->blkno+1 == blkno)
|
||||
entry->n_seq_accesses += 1;
|
||||
else
|
||||
entry->n_rnd_accesses += 1;
|
||||
entry->access_count += 1;
|
||||
access_count = entry->n_seq_accesses + entry->n_rnd_accesses;
|
||||
|
||||
is_seq_access = access_count >= min_seq_access_count
|
||||
&& (double)entry->n_seq_accesses / access_count >= min_seq_access_ratio;
|
||||
|
||||
|
||||
/* Remove entry from LRU list tobe able to insert it to the end of this list */
|
||||
dlist_delete(&entry->lru_node);
|
||||
}
|
||||
/* Place entry to the tail of LRU list */
|
||||
dlist_push_tail(&lru, &entry->lru_node);
|
||||
entry->blkno = blkno;
|
||||
}
|
||||
return is_seq_access;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get relation access pattern
|
||||
*/
|
||||
PG_FUNCTION_INFO_V1(get_relation_access_statistics);
|
||||
|
||||
|
||||
typedef struct
|
||||
{
|
||||
TupleDesc tupdesc;
|
||||
dlist_node* curr;
|
||||
} AccessStatContext;
|
||||
|
||||
#define NUM_ACCESS_STAT_COLUMNS 6
|
||||
|
||||
Datum
|
||||
get_relation_access_statistics(PG_FUNCTION_ARGS)
|
||||
{
|
||||
FuncCallContext *funcctx;
|
||||
Datum result;
|
||||
MemoryContext oldcontext;
|
||||
AccessStatContext *fctx; /* User function context. */
|
||||
TupleDesc tupledesc;
|
||||
TupleDesc expected_tupledesc;
|
||||
HeapTuple tuple;
|
||||
|
||||
if (SRF_IS_FIRSTCALL())
|
||||
{
|
||||
funcctx = SRF_FIRSTCALL_INIT();
|
||||
|
||||
/* Switch context when allocating stuff to be used in later calls */
|
||||
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
|
||||
|
||||
/* Create a user function context for cross-call persistence */
|
||||
fctx = (AccessStatContext *) palloc(sizeof(AccessStatContext));
|
||||
|
||||
/*
|
||||
* To smoothly support upgrades from version 1.0 of this extension
|
||||
* transparently handle the (non-)existence of the pinning_backends
|
||||
* column. We unfortunately have to get the result type for that... -
|
||||
* we can't use the result type determined by the function definition
|
||||
* without potentially crashing when somebody uses the old (or even
|
||||
* wrong) function definition though.
|
||||
*/
|
||||
if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE)
|
||||
elog(ERROR, "return type must be a row type");
|
||||
|
||||
if (expected_tupledesc->natts != NUM_ACCESS_STAT_COLUMNS)
|
||||
elog(ERROR, "incorrect number of output arguments");
|
||||
|
||||
/* Construct a tuple descriptor for the result rows. */
|
||||
tupledesc = CreateTemplateTupleDesc(expected_tupledesc->natts);
|
||||
TupleDescInitEntry(tupledesc, (AttrNumber) 1, "relfilenode",
|
||||
OIDOID, -1, 0);
|
||||
TupleDescInitEntry(tupledesc, (AttrNumber) 2, "reltablespace",
|
||||
OIDOID, -1, 0);
|
||||
TupleDescInitEntry(tupledesc, (AttrNumber) 3, "reldatabase",
|
||||
OIDOID, -1, 0);
|
||||
TupleDescInitEntry(tupledesc, (AttrNumber) 4, "seqaccess",
|
||||
INT4OID, -1, 0);
|
||||
TupleDescInitEntry(tupledesc, (AttrNumber) 5, "rndaccess",
|
||||
INT4OID, -1, 0);
|
||||
TupleDescInitEntry(tupledesc, (AttrNumber) 6, "accesscnt",
|
||||
INT8OID, -1, 0);
|
||||
|
||||
fctx->tupdesc = BlessTupleDesc(tupledesc);
|
||||
fctx->curr = dlist_is_empty(&lru) ? NULL : dlist_tail_node(&lru);
|
||||
|
||||
|
||||
/* Set max calls and remember the user function context. */
|
||||
funcctx->max_calls = hash->members;
|
||||
funcctx->user_fctx = fctx;
|
||||
|
||||
/* Return to original context when allocating transient memory */
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
|
||||
funcctx = SRF_PERCALL_SETUP();
|
||||
|
||||
/* Get the saved state */
|
||||
fctx = funcctx->user_fctx;
|
||||
if (fctx->curr)
|
||||
{
|
||||
AccessStatEntry* entry = dlist_container(AccessStatEntry, lru_node, fctx->curr);
|
||||
Datum values[NUM_ACCESS_STAT_COLUMNS];
|
||||
bool nulls[NUM_ACCESS_STAT_COLUMNS] = {
|
||||
false, false, false, false, false, false
|
||||
};
|
||||
|
||||
values[0] = ObjectIdGetDatum(entry->relnode.relNode);
|
||||
values[1] = ObjectIdGetDatum(entry->relnode.spcNode);
|
||||
values[2] = ObjectIdGetDatum(entry->relnode.dbNode);
|
||||
values[3] = Int32GetDatum(entry->n_seq_accesses);
|
||||
values[4] = Int32GetDatum(entry->n_rnd_accesses);
|
||||
values[5] = Int64GetDatum(entry->access_count);
|
||||
|
||||
/* Build and return the tuple. */
|
||||
tuple = heap_form_tuple(fctx->tupdesc, values, nulls);
|
||||
result = HeapTupleGetDatum(tuple);
|
||||
|
||||
fctx->curr = dlist_has_prev(&lru, fctx->curr) ? dlist_prev_node(&lru, fctx->curr) : NULL;
|
||||
|
||||
SRF_RETURN_NEXT(funcctx, result);
|
||||
}
|
||||
else
|
||||
SRF_RETURN_DONE(funcctx);
|
||||
}
|
||||
|
||||
@@ -1,91 +0,0 @@
|
||||
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* extension_server.c
|
||||
* Request compute_ctl to download extension files.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/extension_server.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
#include "tcop/pquery.h"
|
||||
#include "tcop/utility.h"
|
||||
#include "access/xact.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "miscadmin.h"
|
||||
#include "utils/acl.h"
|
||||
#include "fmgr.h"
|
||||
#include "utils/guc.h"
|
||||
#include "port.h"
|
||||
#include "fmgr.h"
|
||||
|
||||
#include <curl/curl.h>
|
||||
|
||||
static int extension_server_port = 0;
|
||||
|
||||
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
|
||||
|
||||
// curl -X POST http://localhost:8080/extension_server/postgis-3.so
|
||||
static bool
|
||||
neon_download_extension_file_http(const char *filename)
|
||||
{
|
||||
CURL *curl;
|
||||
CURLcode res;
|
||||
char *compute_ctl_url;
|
||||
char *postdata;
|
||||
bool ret = false;
|
||||
|
||||
if ((curl = curl_easy_init()) == NULL)
|
||||
{
|
||||
elog(ERROR, "Failed to initialize curl handle");
|
||||
}
|
||||
|
||||
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s", extension_server_port, filename);
|
||||
|
||||
elog(LOG, "curl_easy_perform() url: %s", compute_ctl_url);
|
||||
|
||||
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
|
||||
curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url);
|
||||
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */);
|
||||
|
||||
if (curl)
|
||||
{
|
||||
/* Perform the request, res will get the return code */
|
||||
res = curl_easy_perform(curl);
|
||||
/* Check for errors */
|
||||
if (res == CURLE_OK)
|
||||
{
|
||||
elog(LOG, "curl_easy_perform() succeeded");
|
||||
ret = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(WARNING, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
|
||||
}
|
||||
|
||||
/* always cleanup */
|
||||
curl_easy_cleanup(curl);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void pg_init_extension_server()
|
||||
{
|
||||
DefineCustomIntVariable("neon.extension_server_port",
|
||||
"connection string to the compute_ctl",
|
||||
NULL,
|
||||
&extension_server_port,
|
||||
0, 0, INT_MAX,
|
||||
PGC_POSTMASTER,
|
||||
0, /* no flags required */
|
||||
NULL, NULL, NULL);
|
||||
|
||||
// set download_extension_file_hook
|
||||
prev_download_extension_file_hook = download_extension_file_hook;
|
||||
download_extension_file_hook = neon_download_extension_file_http;
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
|
||||
@@ -58,6 +58,7 @@ char *neon_auth_token;
|
||||
int n_unflushed_requests = 0;
|
||||
int flush_every_n_requests = 8;
|
||||
int readahead_buffer_size = 128;
|
||||
int readahead_distance = 10;
|
||||
|
||||
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
|
||||
|
||||
@@ -452,6 +453,18 @@ pg_init_libpagestore(void)
|
||||
PGC_USERSET,
|
||||
0, /* no flags required */
|
||||
NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL);
|
||||
DefineCustomIntVariable("neon.readahead_distance",
|
||||
"Number of read-ahead blocks",
|
||||
NULL,
|
||||
&readahead_distance,
|
||||
10,
|
||||
0,
|
||||
INT_MAX,
|
||||
PGC_USERSET,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
relsize_hash_init();
|
||||
|
||||
@@ -476,4 +489,5 @@ pg_init_libpagestore(void)
|
||||
redo_read_buffer_filter = neon_redo_read_buffer_filter;
|
||||
}
|
||||
lfc_init();
|
||||
access_stat_init();
|
||||
}
|
||||
|
||||
@@ -27,8 +27,18 @@ RETURNS SETOF RECORD
|
||||
AS 'MODULE_PATHNAME', 'local_cache_pages'
|
||||
LANGUAGE C PARALLEL SAFE;
|
||||
|
||||
CREATE FUNCTION get_relation_access_statistics()
|
||||
RETURNS SETOF RECORD
|
||||
AS 'MODULE_PATHNAME', 'get_relation_access_statistics'
|
||||
LANGUAGE C PARALLEL SAFE;
|
||||
|
||||
-- Create a view for convenient access.
|
||||
CREATE VIEW local_cache AS
|
||||
SELECT P.* FROM local_cache_pages() AS P
|
||||
SELECT relname,P.* FROM local_cache_pages() AS P
|
||||
(pageoffs int8, relfilenode oid, reltablespace oid, reldatabase oid,
|
||||
relforknumber int2, relblocknumber int8, accesscount int4);
|
||||
relforknumber int2, relblocknumber int8, accesscount int4) JOIN pg_class pc ON (P.relfilenode = pc.relfilenode);
|
||||
|
||||
CREATE VIEW relation_access_statistics AS
|
||||
SELECT relname,P.* FROM get_relation_access_statistics() AS P
|
||||
(relfilenode oid, reltablespace oid, reldatabase oid,
|
||||
seqaccess int4, rndaccess int4, access_count int8) JOIN pg_class pc ON (P.relfilenode = pc.relfilenode);
|
||||
|
||||
@@ -35,11 +35,8 @@ _PG_init(void)
|
||||
{
|
||||
pg_init_libpagestore();
|
||||
pg_init_walproposer();
|
||||
|
||||
InitControlPlaneConnector();
|
||||
|
||||
pg_init_extension_server();
|
||||
|
||||
// Important: This must happen after other parts of the extension
|
||||
// are loaded, otherwise any settings to GUCs that were set before
|
||||
// the extension was loaded will be removed.
|
||||
|
||||
@@ -21,8 +21,6 @@ extern char *neon_tenant;
|
||||
extern void pg_init_libpagestore(void);
|
||||
extern void pg_init_walproposer(void);
|
||||
|
||||
extern void pg_init_extension_server(void);
|
||||
|
||||
/*
|
||||
* Returns true if we shouldn't do REDO on that block in record indicated by
|
||||
* block_id; false otherwise.
|
||||
|
||||
@@ -157,6 +157,7 @@ extern page_server_api * page_server;
|
||||
extern char *page_server_connstring;
|
||||
extern int flush_every_n_requests;
|
||||
extern int readahead_buffer_size;
|
||||
extern int readahead_distance;
|
||||
extern bool seqscan_prefetch_enabled;
|
||||
extern int seqscan_prefetch_distance;
|
||||
extern char *neon_timeline;
|
||||
@@ -210,5 +211,8 @@ extern bool lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumbe
|
||||
extern void lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
|
||||
extern void lfc_init(void);
|
||||
|
||||
/* Access statistic */
|
||||
extern void access_stat_init(void);
|
||||
extern bool is_sequential_access(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
|
||||
|
||||
#endif
|
||||
|
||||
@@ -1881,6 +1881,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
if (RecoveryInProgress() && !(MyBackendType == B_STARTUP))
|
||||
XLogWaitForReplayOf(request_lsn);
|
||||
|
||||
|
||||
/*
|
||||
* Try to find prefetched page in the list of received pages.
|
||||
*/
|
||||
@@ -2003,6 +2004,10 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
/* If it is expected to be sequential access then initiate prefetch of next block */
|
||||
if (is_sequential_access(reln->smgr_rnode.node, forkNum, blkno))
|
||||
neon_prefetch(reln, forkNum, blkno + readahead_distance);
|
||||
|
||||
/* Try to read from local file cache */
|
||||
if (lfc_read(reln->smgr_rnode.node, forkNum, blkno, buffer))
|
||||
{
|
||||
|
||||
@@ -266,7 +266,7 @@ impl From<TimelineError> for ApiError {
|
||||
fn from(te: TimelineError) -> ApiError {
|
||||
match te {
|
||||
TimelineError::NotFound(ttid) => {
|
||||
ApiError::NotFound(anyhow!("timeline {} not found", ttid).into())
|
||||
ApiError::NotFound(anyhow!("timeline {} not found", ttid))
|
||||
}
|
||||
_ => ApiError::InternalServerError(anyhow!("{}", te)),
|
||||
}
|
||||
|
||||
@@ -600,8 +600,6 @@ class NeonEnvBuilder:
|
||||
self.rust_log_override = rust_log_override
|
||||
self.port_distributor = port_distributor
|
||||
self.remote_storage = remote_storage
|
||||
self.ext_remote_storage: Optional[Any] = None
|
||||
self.remote_storage_client: Optional[Any] = None
|
||||
self.remote_storage_users = remote_storage_users
|
||||
self.broker = broker
|
||||
self.run_id = run_id
|
||||
@@ -653,18 +651,13 @@ class NeonEnvBuilder:
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
test_name: str,
|
||||
force_enable: bool = True,
|
||||
enable_remote_extensions: bool = False,
|
||||
):
|
||||
if remote_storage_kind == RemoteStorageKind.NOOP:
|
||||
return
|
||||
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
||||
self.enable_local_fs_remote_storage(force_enable=force_enable)
|
||||
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
self.enable_mock_s3_remote_storage(
|
||||
bucket_name=test_name,
|
||||
force_enable=force_enable,
|
||||
enable_remote_extensions=enable_remote_extensions,
|
||||
)
|
||||
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
|
||||
elif remote_storage_kind == RemoteStorageKind.REAL_S3:
|
||||
self.enable_real_s3_remote_storage(test_name=test_name, force_enable=force_enable)
|
||||
else:
|
||||
@@ -680,15 +673,11 @@ class NeonEnvBuilder:
|
||||
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
|
||||
self.remote_storage = LocalFsStorage(Path(self.repo_dir / "local_fs_remote_storage"))
|
||||
|
||||
def enable_mock_s3_remote_storage(
|
||||
self, bucket_name: str, force_enable: bool = True, enable_remote_extensions: bool = False
|
||||
):
|
||||
def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable: bool = True):
|
||||
"""
|
||||
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
|
||||
Starts up the mock server, if that does not run yet.
|
||||
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
|
||||
|
||||
Also creates the bucket for extensions, self.ext_remote_storage bucket
|
||||
"""
|
||||
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
|
||||
mock_endpoint = self.mock_s3_server.endpoint()
|
||||
@@ -711,17 +700,6 @@ class NeonEnvBuilder:
|
||||
secret_key=self.mock_s3_server.secret_key(),
|
||||
)
|
||||
|
||||
if enable_remote_extensions:
|
||||
ext_bucket_name = f"ext_{bucket_name}"
|
||||
self.remote_storage_client.create_bucket(Bucket=ext_bucket_name)
|
||||
self.ext_remote_storage = S3Storage(
|
||||
bucket_name=ext_bucket_name,
|
||||
endpoint=mock_endpoint,
|
||||
bucket_region=mock_region,
|
||||
access_key=self.mock_s3_server.access_key(),
|
||||
secret_key=self.mock_s3_server.secret_key(),
|
||||
)
|
||||
|
||||
def enable_real_s3_remote_storage(self, test_name: str, force_enable: bool = True):
|
||||
"""
|
||||
Sets up configuration to use real s3 endpoint without mock server
|
||||
@@ -762,17 +740,6 @@ class NeonEnvBuilder:
|
||||
prefix_in_bucket=self.remote_storage_prefix,
|
||||
)
|
||||
|
||||
ext_bucket_name = os.getenv("EXT_REMOTE_STORAGE_S3_BUCKET")
|
||||
if ext_bucket_name is not None:
|
||||
ext_bucket_name = f"ext_{ext_bucket_name}"
|
||||
self.ext_remote_storage = S3Storage(
|
||||
bucket_name=ext_bucket_name,
|
||||
bucket_region=region,
|
||||
access_key=access_key,
|
||||
secret_key=secret_key,
|
||||
prefix_in_bucket=self.remote_storage_prefix,
|
||||
)
|
||||
|
||||
def cleanup_local_storage(self):
|
||||
if self.preserve_database_files:
|
||||
return
|
||||
@@ -806,7 +773,6 @@ class NeonEnvBuilder:
|
||||
# `self.remote_storage_prefix` is coupled with `S3Storage` storage type,
|
||||
# so this line effectively a no-op
|
||||
assert isinstance(self.remote_storage, S3Storage)
|
||||
assert self.remote_storage_client is not None
|
||||
|
||||
if self.keep_remote_storage_contents:
|
||||
log.info("keep_remote_storage_contents skipping remote storage cleanup")
|
||||
@@ -936,8 +902,6 @@ class NeonEnv:
|
||||
self.neon_binpath = config.neon_binpath
|
||||
self.pg_distrib_dir = config.pg_distrib_dir
|
||||
self.endpoint_counter = 0
|
||||
self.remote_storage_client = config.remote_storage_client
|
||||
self.ext_remote_storage = config.ext_remote_storage
|
||||
|
||||
# generate initial tenant ID here instead of letting 'neon init' generate it,
|
||||
# so that we don't need to dig it out of the config file afterwards.
|
||||
@@ -1524,7 +1488,6 @@ class NeonCli(AbstractNeonCli):
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"endpoint",
|
||||
@@ -1534,8 +1497,6 @@ class NeonCli(AbstractNeonCli):
|
||||
"--pg-version",
|
||||
self.env.pg_version,
|
||||
]
|
||||
if remote_ext_config is not None:
|
||||
args.extend(["--remote-ext-config", remote_ext_config])
|
||||
if lsn is not None:
|
||||
args.append(f"--lsn={lsn}")
|
||||
args.extend(["--pg-port", str(pg_port)])
|
||||
@@ -2397,7 +2358,7 @@ class Endpoint(PgProtocol):
|
||||
|
||||
return self
|
||||
|
||||
def start(self, remote_ext_config: Optional[str] = None) -> "Endpoint":
|
||||
def start(self) -> "Endpoint":
|
||||
"""
|
||||
Start the Postgres instance.
|
||||
Returns self.
|
||||
@@ -2413,7 +2374,6 @@ class Endpoint(PgProtocol):
|
||||
http_port=self.http_port,
|
||||
tenant_id=self.tenant_id,
|
||||
safekeepers=self.active_safekeepers,
|
||||
remote_ext_config=remote_ext_config,
|
||||
)
|
||||
self.running = True
|
||||
|
||||
@@ -2455,17 +2415,6 @@ class Endpoint(PgProtocol):
|
||||
|
||||
return self
|
||||
|
||||
def respec(self, **kwargs):
|
||||
"""Update the endpoint.json file used by control_plane."""
|
||||
# Read config
|
||||
config_path = os.path.join(self.endpoint_path(), "endpoint.json")
|
||||
with open(config_path, "r") as f:
|
||||
data_dict = json.load(f)
|
||||
|
||||
# Write it back updated
|
||||
with open(config_path, "w") as file:
|
||||
json.dump(dict(data_dict, **kwargs), file, indent=4)
|
||||
|
||||
def stop(self) -> "Endpoint":
|
||||
"""
|
||||
Stop the Postgres instance if it's running.
|
||||
@@ -2503,7 +2452,6 @@ class Endpoint(PgProtocol):
|
||||
hot_standby: bool = False,
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
Create an endpoint, apply config, and start Postgres.
|
||||
@@ -2518,7 +2466,7 @@ class Endpoint(PgProtocol):
|
||||
config_lines=config_lines,
|
||||
hot_standby=hot_standby,
|
||||
lsn=lsn,
|
||||
).start(remote_ext_config=remote_ext_config)
|
||||
).start()
|
||||
|
||||
log.info(f"Postgres startup took {time.time() - started_at} seconds")
|
||||
|
||||
@@ -2552,7 +2500,6 @@ class EndpointFactory:
|
||||
lsn: Optional[Lsn] = None,
|
||||
hot_standby: bool = False,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
) -> Endpoint:
|
||||
ep = Endpoint(
|
||||
self.env,
|
||||
@@ -2569,7 +2516,6 @@ class EndpointFactory:
|
||||
hot_standby=hot_standby,
|
||||
config_lines=config_lines,
|
||||
lsn=lsn,
|
||||
remote_ext_config=remote_ext_config,
|
||||
)
|
||||
|
||||
def create(
|
||||
|
||||
@@ -32,18 +32,13 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
|
||||
|
||||
env.neon_cli.create_branch("test_startup")
|
||||
|
||||
endpoint = None
|
||||
|
||||
# We do two iterations so we can see if the second startup is faster. It should
|
||||
# be because the compute node should already be configured with roles, databases,
|
||||
# extensions, etc from the first run.
|
||||
for i in range(2):
|
||||
# Start
|
||||
with zenbenchmark.record_duration(f"{i}_start_and_select"):
|
||||
if endpoint:
|
||||
endpoint.start()
|
||||
else:
|
||||
endpoint = env.endpoints.create_start("test_startup")
|
||||
endpoint = env.endpoints.create_start("test_startup")
|
||||
endpoint.safe_psql("select 1;")
|
||||
|
||||
# Get metrics
|
||||
@@ -62,9 +57,6 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
|
||||
# Stop so we can restart
|
||||
endpoint.stop()
|
||||
|
||||
# Imitate optimizations that console would do for the second start
|
||||
endpoint.respec(skip_pg_catalog_updates=True)
|
||||
|
||||
|
||||
# This test sometimes runs for longer than the global 5 minute timeout.
|
||||
@pytest.mark.timeout(600)
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
pg8000==1.29.8
|
||||
pg8000==1.29.4
|
||||
scramp>=1.4.3
|
||||
|
||||
@@ -396,9 +396,9 @@ checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
|
||||
|
||||
[[package]]
|
||||
name = "openssl"
|
||||
version = "0.10.55"
|
||||
version = "0.10.52"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d"
|
||||
checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
@@ -428,9 +428,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.90"
|
||||
version = "0.9.87"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6"
|
||||
checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM rust:1.70
|
||||
FROM rust:1.69
|
||||
WORKDIR /source
|
||||
|
||||
COPY . .
|
||||
|
||||
@@ -5,8 +5,8 @@
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/vapor/postgres-nio.git",
|
||||
"state" : {
|
||||
"revision" : "061a0836d7c1887e04a975d1d2eaa2ef5fd7dfab",
|
||||
"version" : "1.16.0"
|
||||
"revision" : "dbf9c2eb596df39cba8ff3f74d74b2e6a31bd937",
|
||||
"version" : "1.14.1"
|
||||
}
|
||||
},
|
||||
{
|
||||
@@ -59,8 +59,8 @@
|
||||
"kind" : "remoteSourceControl",
|
||||
"location" : "https://github.com/apple/swift-nio.git",
|
||||
"state" : {
|
||||
"revision" : "6213ba7a06febe8fef60563a4a7d26a4085783cf",
|
||||
"version" : "2.54.0"
|
||||
"revision" : "d1690f85419fdac8d54e350fb6d2ab9fd95afd75",
|
||||
"version" : "2.51.1"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
||||
@@ -4,7 +4,7 @@ import PackageDescription
|
||||
let package = Package(
|
||||
name: "PostgresNIOExample",
|
||||
dependencies: [
|
||||
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.16.0")
|
||||
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.14.1")
|
||||
],
|
||||
targets: [
|
||||
.executableTarget(
|
||||
|
||||
@@ -5,7 +5,23 @@
|
||||
"packages": {
|
||||
"": {
|
||||
"dependencies": {
|
||||
"postgresql-client": "2.5.9"
|
||||
"postgresql-client": "2.5.5"
|
||||
}
|
||||
},
|
||||
"node_modules/debug": {
|
||||
"version": "4.3.4",
|
||||
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.4.tgz",
|
||||
"integrity": "sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==",
|
||||
"dependencies": {
|
||||
"ms": "2.1.2"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=6.0"
|
||||
},
|
||||
"peerDependenciesMeta": {
|
||||
"supports-color": {
|
||||
"optional": true
|
||||
}
|
||||
}
|
||||
},
|
||||
"node_modules/doublylinked": {
|
||||
@@ -25,6 +41,11 @@
|
||||
"putil-promisify": "^1.8.6"
|
||||
}
|
||||
},
|
||||
"node_modules/ms": {
|
||||
"version": "2.1.2",
|
||||
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
|
||||
"integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
|
||||
},
|
||||
"node_modules/obuf": {
|
||||
"version": "1.1.2",
|
||||
"resolved": "https://registry.npmjs.org/obuf/-/obuf-1.1.2.tgz",
|
||||
@@ -42,28 +63,30 @@
|
||||
}
|
||||
},
|
||||
"node_modules/postgresql-client": {
|
||||
"version": "2.5.9",
|
||||
"resolved": "https://registry.npmjs.org/postgresql-client/-/postgresql-client-2.5.9.tgz",
|
||||
"integrity": "sha512-s+kgTN6TfWLzehEyxw4Im4odnxVRCbZ0DEJzWS6SLowPAmB2m1/DOiOvZC0+ZVoi5AfbGE6SBqFxKguSyVAXZg==",
|
||||
"version": "2.5.5",
|
||||
"resolved": "https://registry.npmjs.org/postgresql-client/-/postgresql-client-2.5.5.tgz",
|
||||
"integrity": "sha512-2Mu3i+6NQ9cnkoZNd0XeSZo9WoUpuWf4ZSiCCoDWSj82T93py2/SKXZ1aUaP8mVaU0oKpyyGe0IwLYZ1VHShnA==",
|
||||
"dependencies": {
|
||||
"debug": "^4.3.4",
|
||||
"doublylinked": "^2.5.2",
|
||||
"lightning-pool": "^4.2.1",
|
||||
"postgres-bytea": "^3.0.0",
|
||||
"power-tasks": "^1.7.0",
|
||||
"power-tasks": "^1.6.4",
|
||||
"putil-merge": "^3.10.3",
|
||||
"putil-promisify": "^1.10.0",
|
||||
"putil-varhelpers": "^1.6.5"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=16.0",
|
||||
"node": ">=14.0",
|
||||
"npm": ">=7.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/power-tasks": {
|
||||
"version": "1.7.0",
|
||||
"resolved": "https://registry.npmjs.org/power-tasks/-/power-tasks-1.7.0.tgz",
|
||||
"integrity": "sha512-rndZXCDxhuIDjPUJJvQwBDHaYagCkjvbPF/NA+omh/Ef4rAI9KtnvdA0k98dyiGpn1zXOpc6c2c0JWzg/xAhJg==",
|
||||
"version": "1.6.4",
|
||||
"resolved": "https://registry.npmjs.org/power-tasks/-/power-tasks-1.6.4.tgz",
|
||||
"integrity": "sha512-LX8GGgEIP1N7jsZqlqZ275e6f1Ehq97APCEGj8uVO0NoEoB+77QUX12BFv3LmlNKfq4fIuNSPiHhyHFjqn2gfA==",
|
||||
"dependencies": {
|
||||
"debug": "^4.3.4",
|
||||
"doublylinked": "^2.5.2",
|
||||
"strict-typed-events": "^2.3.1"
|
||||
},
|
||||
@@ -109,9 +132,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/ts-gems": {
|
||||
"version": "2.4.0",
|
||||
"resolved": "https://registry.npmjs.org/ts-gems/-/ts-gems-2.4.0.tgz",
|
||||
"integrity": "sha512-SdugYAXoWvbqrxLodIObzxhEKacDxh5LfAJIiIkiH7q5thvuuCzdmkdTVQYf7uEDrEpPhfx4tokDMamdO3be9A=="
|
||||
"version": "2.3.0",
|
||||
"resolved": "https://registry.npmjs.org/ts-gems/-/ts-gems-2.3.0.tgz",
|
||||
"integrity": "sha512-bUvrwrzlct7vfaNvtgMhynDf6lAki/kTtrNsIGhX6l7GJGK3s6b8Ro7dazOLXabV0m2jyShBzDQ8X1+h/C2Cug=="
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"type": "module",
|
||||
"dependencies": {
|
||||
"postgresql-client": "2.5.9"
|
||||
"postgresql-client": "2.5.5"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM node:20
|
||||
FROM node:18
|
||||
WORKDIR /source
|
||||
|
||||
COPY . .
|
||||
|
||||
@@ -5,16 +5,16 @@
|
||||
"packages": {
|
||||
"": {
|
||||
"dependencies": {
|
||||
"@neondatabase/serverless": "0.4.18",
|
||||
"@neondatabase/serverless": "0.4.3",
|
||||
"ws": "8.13.0"
|
||||
}
|
||||
},
|
||||
"node_modules/@neondatabase/serverless": {
|
||||
"version": "0.4.18",
|
||||
"resolved": "https://registry.npmjs.org/@neondatabase/serverless/-/serverless-0.4.18.tgz",
|
||||
"integrity": "sha512-2TZnIyRGC/+0fjZ8TKCzaSTPUD94PM7NBGuantGZbUrbWyqBwGnUoRtdZAQ95qBKVHqORLVfymlv2NE+HQMFeA==",
|
||||
"version": "0.4.3",
|
||||
"resolved": "https://registry.npmjs.org/@neondatabase/serverless/-/serverless-0.4.3.tgz",
|
||||
"integrity": "sha512-U8tpuF5f0R5WRsciR7iaJ5S2h54DWa6Z6CEW+J4KgwyvRN3q3qDz0MibdfFXU0WqnRoi/9RSf/2XN4TfeaOCbQ==",
|
||||
"dependencies": {
|
||||
"@types/pg": "8.6.6"
|
||||
"@types/pg": "^8.6.6"
|
||||
}
|
||||
},
|
||||
"node_modules/@types/node": {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"type": "module",
|
||||
"dependencies": {
|
||||
"@neondatabase/serverless": "0.4.18",
|
||||
"@neondatabase/serverless": "0.4.3",
|
||||
"ws": "8.13.0"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,90 +0,0 @@
|
||||
import json
|
||||
import os
|
||||
from contextlib import closing
|
||||
from io import BytesIO
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
RemoteStorageKind,
|
||||
)
|
||||
|
||||
|
||||
def test_file_download(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Tests we can download a file
|
||||
First we set up the mock s3 bucket by uploading test_ext.control to the bucket
|
||||
Then, we download test_ext.control from the bucket to pg_install/v15/share/postgresql/extension/
|
||||
Finally, we list available extensions and assert that test_ext is present
|
||||
"""
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
test_name="test_file_download",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
assert env.ext_remote_storage is not None
|
||||
assert env.remote_storage_client is not None
|
||||
|
||||
TEST_EXT_PATH = "v14/share/postgresql/extension/test_ext.control"
|
||||
BUCKET_PREFIX = "5314225671" # this is the build number
|
||||
|
||||
# 4. Upload test_ext.control file to the bucket
|
||||
# In the non-mock version this is done by CI/CD
|
||||
|
||||
test_ext_file = BytesIO(
|
||||
b"""# mock extension
|
||||
comment = 'This is a mock extension'
|
||||
default_version = '1.0'
|
||||
module_pathname = '$libdir/test_ext'
|
||||
relocatable = true
|
||||
"""
|
||||
)
|
||||
env.remote_storage_client.upload_fileobj(
|
||||
test_ext_file,
|
||||
env.ext_remote_storage.bucket_name,
|
||||
os.path.join(BUCKET_PREFIX, TEST_EXT_PATH),
|
||||
)
|
||||
|
||||
# 5. Download file from the bucket to correct local location
|
||||
# Later this will be replaced by our rust code
|
||||
# resp = env.remote_storage_client.get_object(
|
||||
# Bucket=env.ext_remote_storage.bucket_name, Key=os.path.join(BUCKET_PREFIX, TEST_EXT_PATH)
|
||||
# )
|
||||
# response = resp["Body"]
|
||||
# fname = f"pg_install/{TEST_EXT_PATH}"
|
||||
# with open(fname, "wb") as f:
|
||||
# f.write(response.read())
|
||||
|
||||
tenant, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_file_download", tenant_id=tenant)
|
||||
|
||||
remote_ext_config = json.dumps(
|
||||
{
|
||||
"bucket": env.ext_remote_storage.bucket_name,
|
||||
"region": "us-east-1",
|
||||
"endpoint": env.ext_remote_storage.endpoint,
|
||||
"prefix": BUCKET_PREFIX,
|
||||
}
|
||||
)
|
||||
|
||||
# 6. Start endpoint and ensure that test_ext is present in select * from pg_available_extensions
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_file_download", tenant_id=tenant, remote_ext_config=remote_ext_config
|
||||
)
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# test query: insert some values and select them
|
||||
cur.execute("CREATE TABLE t(key int primary key, value text)")
|
||||
for i in range(100):
|
||||
cur.execute(f"insert into t values({i}, {2*i})")
|
||||
cur.execute("select * from t")
|
||||
log.info(cur.fetchall())
|
||||
|
||||
# the real test query: check that test_ext is present
|
||||
cur.execute("SELECT * FROM pg_available_extensions")
|
||||
all_extensions = [x[0] for x in cur.fetchall()]
|
||||
log.info(all_extensions)
|
||||
assert "test_ext" in all_extensions
|
||||
@@ -275,7 +275,6 @@ def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str]
|
||||
assert isinstance(neon_env_builder.remote_storage, S3Storage)
|
||||
|
||||
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
|
||||
assert neon_env_builder.remote_storage_client is not None
|
||||
response = neon_env_builder.remote_storage_client.list_objects_v2(
|
||||
Bucket=neon_env_builder.remote_storage.bucket_name,
|
||||
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
|
||||
@@ -629,7 +628,7 @@ def test_timeline_delete_works_for_remote_smoke(
|
||||
)
|
||||
|
||||
# for some reason the check above doesnt immediately take effect for the below.
|
||||
# Assume it is mock server inconsistency and check twice.
|
||||
# Assume it is mock server incosistency and check twice.
|
||||
wait_until(
|
||||
2,
|
||||
0.5,
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 5adfb36043...a2daebc6b4
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: ff7b85cd8a...2df2ce3744
Reference in New Issue
Block a user