mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-05 09:20:38 +00:00
Compare commits
63 Commits
skip-sync
...
extension_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a4ddad92c1 | ||
|
|
fd3dfe9d52 | ||
|
|
384e3ab1a8 | ||
|
|
4259464f72 | ||
|
|
152206211b | ||
|
|
9c35c06c58 | ||
|
|
245b4c9d72 | ||
|
|
3d0f74fc0c | ||
|
|
ce55f70cac | ||
|
|
053d592ddb | ||
|
|
b85416b58d | ||
|
|
195838436c | ||
|
|
9313045de6 | ||
|
|
44ac7a45be | ||
|
|
e35e8a7dcb | ||
|
|
a79b0d69c4 | ||
|
|
d475e901e5 | ||
|
|
bf3b83b504 | ||
|
|
94781e8710 | ||
|
|
4b83a206bf | ||
|
|
f984f9e7d3 | ||
|
|
6b42464c23 | ||
|
|
605c30e5c5 | ||
|
|
0b11d8e836 | ||
|
|
7602483af9 | ||
|
|
5e1e859ab8 | ||
|
|
85a7511700 | ||
|
|
89b8ea132e | ||
|
|
bfbae98f24 | ||
|
|
02a1d4d8c1 | ||
|
|
4a35f29301 | ||
|
|
559e318328 | ||
|
|
a4d236b02f | ||
|
|
8b9f72e117 | ||
|
|
bb414e5a0a | ||
|
|
32c03bc784 | ||
|
|
c99e203094 | ||
|
|
e7b9259675 | ||
|
|
356f7d3a7e | ||
|
|
0f6b05337e | ||
|
|
2e81d280c8 | ||
|
|
f9700c8bb9 | ||
|
|
e6137d45d2 | ||
|
|
ab1d903600 | ||
|
|
bfd670b9a7 | ||
|
|
5e96ab43ea | ||
|
|
890061d371 | ||
|
|
6b74d1a76a | ||
|
|
a936b8a92b | ||
|
|
c7bea52849 | ||
|
|
1b7ab6d468 | ||
|
|
e07d5d00e9 | ||
|
|
15d3d007eb | ||
|
|
77157c7741 | ||
|
|
b9b1b3596c | ||
|
|
91a809332f | ||
|
|
214ecacfc4 | ||
|
|
7465c644b9 | ||
|
|
bb931f2ce0 | ||
|
|
8013f9630d | ||
|
|
34f22e9b12 | ||
|
|
4f3f817384 | ||
|
|
b50475b567 |
9
.github/workflows/benchmarking.yml
vendored
9
.github/workflows/benchmarking.yml
vendored
@@ -180,8 +180,7 @@ jobs:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
|
||||
# Increase timeout to 8h, default timeout is 6h
|
||||
timeout-minutes: 480
|
||||
timeout-minutes: 360 # 6h
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
@@ -322,6 +321,8 @@ jobs:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
|
||||
timeout-minutes: 360 # 6h
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
@@ -413,6 +414,8 @@ jobs:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
|
||||
timeout-minutes: 360 # 6h
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
@@ -498,6 +501,8 @@ jobs:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
|
||||
timeout-minutes: 360 # 6h
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
|
||||
6
.github/workflows/build_and_test.yml
vendored
6
.github/workflows/build_and_test.yml
vendored
@@ -738,7 +738,7 @@ jobs:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
env:
|
||||
VM_BUILDER_VERSION: v0.11.0
|
||||
VM_BUILDER_VERSION: v0.8.0
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
@@ -916,7 +916,7 @@ jobs:
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- name: Create git tag
|
||||
- name: Create tag "release-${{ needs.tag.outputs.build-tag }}"
|
||||
if: github.ref_name == 'release'
|
||||
uses: actions/github-script@v6
|
||||
with:
|
||||
@@ -926,7 +926,7 @@ jobs:
|
||||
github.rest.git.createRef({
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
ref: "refs/tags/${{ needs.tag.outputs.build-tag }}",
|
||||
ref: "refs/tags/release-${{ needs.tag.outputs.build-tag }}",
|
||||
sha: context.sha,
|
||||
})
|
||||
|
||||
|
||||
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -924,12 +924,14 @@ dependencies = [
|
||||
"opentelemetry",
|
||||
"postgres",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tar",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
@@ -997,6 +999,7 @@ dependencies = [
|
||||
"tar",
|
||||
"thiserror",
|
||||
"toml",
|
||||
"tracing",
|
||||
"url",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
|
||||
@@ -481,40 +481,6 @@ RUN wget https://github.com/rdkit/rdkit/archive/refs/tags/Release_2023_03_1.tar.
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/rdkit.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-uuidv7-pg-build"
|
||||
# compile pg_uuidv7 extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg-uuidv7-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://github.com/fboulnois/pg_uuidv7/archive/refs/tags/v1.0.1.tar.gz -O pg_uuidv7.tar.gz && \
|
||||
echo "0d0759ab01b7fb23851ecffb0bce27822e1868a4a5819bfd276101c716637a7a pg_uuidv7.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_uuidv7-src && cd pg_uuidv7-src && tar xvzf ../pg_uuidv7.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_uuidv7.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-roaringbitmap-pg-build"
|
||||
# compile pg_roaringbitmap extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg-roaringbitmap-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4.tar.gz -O pg_roaringbitmap.tar.gz && \
|
||||
echo "b75201efcb1c2d1b014ec4ae6a22769cc7a224e6e406a587f5784a37b6b5a2aa pg_roaringbitmap.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_roaringbitmap-src && cd pg_roaringbitmap-src && tar xvzf ../pg_roaringbitmap.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/roaringbitmap.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions"
|
||||
@@ -648,8 +614,6 @@ COPY --from=kq-imcx-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-cron-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-pgx-ulid-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-roaringbitmap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
|
||||
@@ -30,3 +30,5 @@ url.workspace = true
|
||||
compute_api.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
toml_edit.workspace = true
|
||||
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
|
||||
|
||||
@@ -27,7 +27,8 @@
|
||||
//! compute_ctl -D /var/db/postgres/compute \
|
||||
//! -C 'postgresql://cloud_admin@localhost/postgres' \
|
||||
//! -S /var/db/postgres/specs/current.json \
|
||||
//! -b /usr/local/bin/postgres
|
||||
//! -b /usr/local/bin/postgres \
|
||||
//! -r {"bucket": "my-bucket", "region": "eu-central-1"}
|
||||
//! ```
|
||||
//!
|
||||
use std::collections::HashMap;
|
||||
@@ -48,12 +49,16 @@ use compute_api::responses::ComputeStatus;
|
||||
|
||||
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::extension_server::{
|
||||
download_extension, get_availiable_extensions, init_remote_storage,
|
||||
};
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
use compute_tools::params::*;
|
||||
use compute_tools::spec::*;
|
||||
|
||||
use tokio::runtime::Runtime;
|
||||
const BUILD_TAG_DEFAULT: &str = "local";
|
||||
|
||||
fn main() -> Result<()> {
|
||||
@@ -64,6 +69,23 @@ fn main() -> Result<()> {
|
||||
info!("build_tag: {build_tag}");
|
||||
|
||||
let matches = cli().get_matches();
|
||||
let pgbin_default = String::from("postgres");
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
|
||||
|
||||
let remote_ext_config = matches.get_one::<String>("remote-ext-config");
|
||||
let ext_remote_storage = match remote_ext_config {
|
||||
Some(x) => Some(init_remote_storage(x)?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
let rt = Runtime::new().unwrap();
|
||||
let copy_remote_storage = ext_remote_storage.clone();
|
||||
|
||||
// rt.block_on(async move {
|
||||
// download_extension(©_remote_storage, ExtensionType::Shared, pgbin)
|
||||
// .await
|
||||
// .expect("download extension should work");
|
||||
// });
|
||||
|
||||
let http_port = *matches
|
||||
.get_one::<u16>("http-port")
|
||||
@@ -128,9 +150,6 @@ fn main() -> Result<()> {
|
||||
let compute_id = matches.get_one::<String>("compute-id");
|
||||
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
|
||||
|
||||
// Try to use just 'postgres' if no path is provided
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap();
|
||||
|
||||
let spec;
|
||||
let mut live_config_allowed = false;
|
||||
match spec_json {
|
||||
@@ -182,6 +201,9 @@ fn main() -> Result<()> {
|
||||
live_config_allowed,
|
||||
state: Mutex::new(new_state),
|
||||
state_changed: Condvar::new(),
|
||||
ext_remote_storage,
|
||||
availiable_extensions: Vec::new(),
|
||||
availiable_libraries: Vec::new(),
|
||||
};
|
||||
let compute = Arc::new(compute_node);
|
||||
|
||||
@@ -190,6 +212,21 @@ fn main() -> Result<()> {
|
||||
let _http_handle =
|
||||
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
|
||||
|
||||
let extension_server_port: u16 = http_port;
|
||||
|
||||
// exen before we have spec, we can get public availiable extensions
|
||||
// TODO turn get_availiable_extensions() & other functions into ComputeNode method,
|
||||
// we pass to many params from it anyways..
|
||||
|
||||
compute_node.availiable_extensions = get_availiable_extensions(
|
||||
ext_remote_storage,
|
||||
pg_version, //TODO
|
||||
pgbin,
|
||||
None,
|
||||
);
|
||||
|
||||
// TODO same for libraries
|
||||
|
||||
if !spec_set {
|
||||
// No spec provided, hang waiting for it.
|
||||
info!("no compute spec provided, waiting");
|
||||
@@ -227,10 +264,21 @@ fn main() -> Result<()> {
|
||||
let _configurator_handle =
|
||||
launch_configurator(&compute).expect("cannot launch configurator thread");
|
||||
|
||||
// download private tenant extensions before postgres start
|
||||
// TODO
|
||||
// compute_node.availiable_extensions = get_availiable_extensions(ext_remote_storage,
|
||||
// pg_version, //TODO
|
||||
// pgbin,
|
||||
// tenant_id); //TODO get tenant_id from spec
|
||||
|
||||
// download preload shared libraries before postgres start (if any)
|
||||
// TODO
|
||||
// download_library_file();
|
||||
|
||||
// Start Postgres
|
||||
let mut delay_exit = false;
|
||||
let mut exit_code = None;
|
||||
let pg = match compute.start_compute() {
|
||||
let pg = match compute.start_compute(extension_server_port) {
|
||||
Ok(pg) => Some(pg),
|
||||
Err(err) => {
|
||||
error!("could not start the compute node: {:?}", err);
|
||||
@@ -349,6 +397,12 @@ fn cli() -> clap::Command {
|
||||
.long("control-plane-uri")
|
||||
.value_name("CONTROL_PLANE_API_BASE_URI"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("remote-ext-config")
|
||||
.short('r')
|
||||
.long("remote-ext-config")
|
||||
.value_name("REMOTE_EXT_CONFIG"),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -16,6 +16,8 @@ use utils::lsn::Lsn;
|
||||
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
|
||||
use crate::config;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::spec::*;
|
||||
@@ -45,6 +47,10 @@ pub struct ComputeNode {
|
||||
pub state: Mutex<ComputeState>,
|
||||
/// `Condvar` to allow notifying waiters about state changes.
|
||||
pub state_changed: Condvar,
|
||||
/// S3 extensions configuration variables
|
||||
pub ext_remote_storage: Option<GenericRemoteStorage>,
|
||||
pub availiable_extensions: Vec<RemotePath>,
|
||||
pub availiable_libraries: Vec<RemotePath>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -133,84 +139,6 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
}
|
||||
}
|
||||
|
||||
/// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
|
||||
/// that we give to customers
|
||||
fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
let roles = spec
|
||||
.cluster
|
||||
.roles
|
||||
.iter()
|
||||
.map(|r| format!("'{}'", escape_literal(&r.name)))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let dbs = spec
|
||||
.cluster
|
||||
.databases
|
||||
.iter()
|
||||
.map(|db| format!("'{}'", escape_literal(&db.name)))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let roles_decl = if roles.is_empty() {
|
||||
String::from("roles text[] := NULL;")
|
||||
} else {
|
||||
format!(
|
||||
r#"
|
||||
roles text[] := ARRAY(SELECT rolname
|
||||
FROM pg_catalog.pg_roles
|
||||
WHERE rolname IN ({}));"#,
|
||||
roles.join(", ")
|
||||
)
|
||||
};
|
||||
|
||||
let database_decl = if dbs.is_empty() {
|
||||
String::from("dbs text[] := NULL;")
|
||||
} else {
|
||||
format!(
|
||||
r#"
|
||||
dbs text[] := ARRAY(SELECT datname
|
||||
FROM pg_catalog.pg_database
|
||||
WHERE datname IN ({}));"#,
|
||||
dbs.join(", ")
|
||||
)
|
||||
};
|
||||
|
||||
// ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on all databases
|
||||
// (see https://www.postgresql.org/docs/current/ddl-priv.html)
|
||||
let query = format!(
|
||||
r#"
|
||||
DO $$
|
||||
DECLARE
|
||||
r text;
|
||||
{}
|
||||
{}
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
|
||||
THEN
|
||||
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN IN ROLE pg_read_all_data, pg_write_all_data;
|
||||
IF array_length(roles, 1) IS NOT NULL THEN
|
||||
EXECUTE format('GRANT neon_superuser TO %s',
|
||||
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));
|
||||
FOREACH r IN ARRAY roles LOOP
|
||||
EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', quote_ident(r));
|
||||
END LOOP;
|
||||
END IF;
|
||||
IF array_length(dbs, 1) IS NOT NULL THEN
|
||||
EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser',
|
||||
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(dbs) as x), ', '));
|
||||
END IF;
|
||||
END IF;
|
||||
END
|
||||
$$;"#,
|
||||
roles_decl, database_decl,
|
||||
);
|
||||
info!("Neon superuser created:\n{}", &query);
|
||||
client
|
||||
.simple_query(&query)
|
||||
.map_err(|e| anyhow::anyhow!(e).context(query))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl ComputeNode {
|
||||
pub fn set_status(&self, status: ComputeStatus) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
@@ -323,14 +251,22 @@ impl ComputeNode {
|
||||
/// Do all the preparations like PGDATA directory creation, configuration,
|
||||
/// safekeepers sync, basebackup, etc.
|
||||
#[instrument(skip(self, compute_state))]
|
||||
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
pub fn prepare_pgdata(
|
||||
&self,
|
||||
compute_state: &ComputeState,
|
||||
extension_server_port: u16,
|
||||
) -> Result<()> {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
|
||||
// Remove/create an empty pgdata directory and put configuration there.
|
||||
self.create_pgdata()?;
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
|
||||
config::write_postgres_conf(
|
||||
&pgdata_path.join("postgresql.conf"),
|
||||
&pspec.spec,
|
||||
Some(extension_server_port),
|
||||
)?;
|
||||
|
||||
// Syncing safekeepers is only safe with primary nodes: if a primary
|
||||
// is already connected it will be kicked out, so a secondary (standby)
|
||||
@@ -338,13 +274,9 @@ impl ComputeNode {
|
||||
let lsn = match spec.mode {
|
||||
ComputeMode::Primary => {
|
||||
info!("starting safekeepers syncing");
|
||||
let lsn = if let Some(synced_lsn) = spec.skip_sync_safekeepers {
|
||||
info!("no need to sync");
|
||||
synced_lsn
|
||||
} else {
|
||||
self.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||
.with_context(|| "failed to sync safekeepers")?
|
||||
};
|
||||
let lsn = self
|
||||
.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||
.with_context(|| "failed to sync safekeepers")?;
|
||||
info!("safekeepers synced at LSN {}", lsn);
|
||||
lsn
|
||||
}
|
||||
@@ -429,8 +361,6 @@ impl ComputeNode {
|
||||
.map_err(|_| anyhow::anyhow!("invalid connstr"))?;
|
||||
|
||||
let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?;
|
||||
// Disable forwarding so that users don't get a cloud_admin role
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
|
||||
client.simple_query("GRANT zenith_admin TO cloud_admin")?;
|
||||
drop(client);
|
||||
@@ -441,16 +371,14 @@ impl ComputeNode {
|
||||
Ok(client) => client,
|
||||
};
|
||||
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
// Disable DDL forwarding because control plane already knows about these roles/databases.
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
|
||||
create_neon_superuser(spec, &mut client)?;
|
||||
handle_roles(spec, &mut client)?;
|
||||
handle_databases(spec, &mut client)?;
|
||||
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(spec, self.connstr.as_str())?;
|
||||
handle_grants(spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_extensions(spec, &mut client)?;
|
||||
|
||||
// 'Close' connection
|
||||
@@ -476,7 +404,7 @@ impl ComputeNode {
|
||||
|
||||
// Write new config
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec, None)?;
|
||||
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
self.pg_reload_conf(&mut client)?;
|
||||
@@ -488,7 +416,7 @@ impl ComputeNode {
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(&spec, self.connstr.as_str())?;
|
||||
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
}
|
||||
|
||||
@@ -506,7 +434,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub fn start_compute(&self) -> Result<std::process::Child> {
|
||||
pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> {
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
info!(
|
||||
@@ -517,7 +445,7 @@ impl ComputeNode {
|
||||
pspec.timeline_id,
|
||||
);
|
||||
|
||||
self.prepare_pgdata(&compute_state)?;
|
||||
self.prepare_pgdata(&compute_state, extension_server_port)?;
|
||||
|
||||
let start_time = Utc::now();
|
||||
|
||||
|
||||
@@ -33,7 +33,11 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
|
||||
}
|
||||
|
||||
/// Create or completely rewrite configuration file specified by `path`
|
||||
pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
|
||||
pub fn write_postgres_conf(
|
||||
path: &Path,
|
||||
spec: &ComputeSpec,
|
||||
extension_server_port: Option<u16>,
|
||||
) -> Result<()> {
|
||||
// File::create() destroys the file content if it exists.
|
||||
let mut file = File::create(path)?;
|
||||
|
||||
@@ -95,5 +99,9 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
|
||||
writeln!(file, "# Managed by compute_ctl: end")?;
|
||||
}
|
||||
|
||||
if let Some(port) = extension_server_port {
|
||||
writeln!(file, "neon.extension_server_port={}", port)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
182
compute_tools/src/extension_server.rs
Normal file
182
compute_tools/src/extension_server.rs
Normal file
@@ -0,0 +1,182 @@
|
||||
use anyhow::{self, bail, Result};
|
||||
use remote_storage::*;
|
||||
use serde_json::{self, Value};
|
||||
use std::fs::File;
|
||||
use std::io::{BufWriter, Write};
|
||||
use std::num::{NonZeroU32, NonZeroUsize};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::info;
|
||||
use utils::id::TenantId;
|
||||
|
||||
fn get_pg_config(argument: &str, pgbin: &str) -> String {
|
||||
// gives the result of `pg_config [argument]`
|
||||
// where argument is a flag like `--version` or `--sharedir`
|
||||
let pgconfig = pgbin.replace("postgres", "pg_config");
|
||||
let config_output = std::process::Command::new(pgconfig)
|
||||
.arg(argument)
|
||||
.output()
|
||||
.expect("pg_config error");
|
||||
std::str::from_utf8(&config_output.stdout)
|
||||
.expect("pg_config error")
|
||||
.trim()
|
||||
.to_string()
|
||||
}
|
||||
|
||||
fn get_pg_version(pgbin: &str) -> String {
|
||||
// pg_config --version returns a (platform specific) human readable string
|
||||
// such as "PostgreSQL 15.4". We parse this to v14/v15
|
||||
let human_version = get_pg_config("--version", pgbin);
|
||||
if human_version.contains("v15") {
|
||||
return "v15".to_string();
|
||||
}
|
||||
"v14".to_string()
|
||||
}
|
||||
|
||||
async fn download_helper(
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
remote_from_path: &RemotePath,
|
||||
download_location: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
// downloads file at remote_from_path to download_location/[file_name]
|
||||
let local_path = download_location.join(remote_from_path.object_name().expect("bad object"));
|
||||
info!(
|
||||
"Downloading {:?} to location {:?}",
|
||||
&remote_from_path, &local_path
|
||||
);
|
||||
let mut download = remote_storage.download(remote_from_path).await?;
|
||||
let mut write_data_buffer = Vec::new();
|
||||
download
|
||||
.download_stream
|
||||
.read_to_end(&mut write_data_buffer)
|
||||
.await?;
|
||||
let mut output_file = BufWriter::new(File::create(local_path)?);
|
||||
output_file.write_all(&write_data_buffer)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// download extension control files
|
||||
//
|
||||
// return list of all extension files to use it in the future searches
|
||||
//
|
||||
// if tenant_id is provided - search in a private per-tenant extension path,
|
||||
// otherwise - in public extension path
|
||||
//
|
||||
pub async fn get_availiable_extensions(
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pg_version: &str,
|
||||
pgbin: &str,
|
||||
tenant_id: Option<TenantId>,
|
||||
) -> anyhow::Result<Vec<RemotePath>> {
|
||||
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||
|
||||
let remote_sharedir = match tenant_id {
|
||||
None => RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension"))?,
|
||||
Some(tenant_id) => RemotePath::new(
|
||||
&Path::new(&pg_version)
|
||||
.join(&tenant_id.to_string())
|
||||
.join("share/postgresql/extension"),
|
||||
)?,
|
||||
};
|
||||
|
||||
let from_paths = remote_storage.list_files(Some(&remote_sharedir)).await?;
|
||||
|
||||
// download all found control files
|
||||
for remote_from_path in &from_paths {
|
||||
if remote_from_path.extension() == Some("control") {
|
||||
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(from_paths)
|
||||
}
|
||||
|
||||
// download all sql files for a given extension name
|
||||
//
|
||||
pub async fn download_extension_sql_files(
|
||||
ext_name: &str,
|
||||
availiable_extensions: Vec<RemotePath>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
) -> Result<()> {
|
||||
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||
|
||||
// check if extension files exist
|
||||
let files_to_download: Vec<&RemotePath> = availiable_extensions
|
||||
.iter()
|
||||
.filter(|ext| {
|
||||
ext.extension() == Some("sql") && ext.object_name().unwrap().starts_with(ext_name)
|
||||
})
|
||||
.collect();
|
||||
|
||||
if files_to_download.is_empty() {
|
||||
bail!("Files for extension {ext_name} are not found in the extension store");
|
||||
}
|
||||
|
||||
for remote_from_path in files_to_download {
|
||||
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// download shared library file
|
||||
pub async fn download_library_file(
|
||||
lib_name: &str,
|
||||
availiable_libraries: Vec<RemotePath>,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
) -> Result<()> {
|
||||
let local_libdir: PathBuf = Path::new(&get_pg_config("--libdir", pgbin)).into();
|
||||
|
||||
// check if the library file exists
|
||||
let lib = availiable_libraries
|
||||
.iter()
|
||||
.find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name);
|
||||
|
||||
match lib {
|
||||
None => bail!("Shared library file {lib_name} is not found in the extension store"),
|
||||
Some(lib) => {
|
||||
download_helper(remote_storage, &lib, &local_libdir).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result<GenericRemoteStorage> {
|
||||
let remote_ext_config: serde_json::Value = serde_json::from_str(remote_ext_config)?;
|
||||
let remote_ext_bucket = match &remote_ext_config["bucket"] {
|
||||
Value::String(x) => x,
|
||||
_ => bail!("remote_ext_config missing bucket"),
|
||||
};
|
||||
let remote_ext_region = match &remote_ext_config["region"] {
|
||||
Value::String(x) => x,
|
||||
_ => bail!("remote_ext_config missing region"),
|
||||
};
|
||||
let remote_ext_endpoint = match &remote_ext_config["endpoint"] {
|
||||
Value::String(x) => Some(x.clone()),
|
||||
_ => None,
|
||||
};
|
||||
let remote_ext_prefix = match &remote_ext_config["prefix"] {
|
||||
Value::String(x) => Some(x.clone()),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// load will not be large, so default parameters are fine
|
||||
let config = S3Config {
|
||||
bucket_name: remote_ext_bucket.to_string(),
|
||||
bucket_region: remote_ext_region.to_string(),
|
||||
prefix_in_bucket: remote_ext_prefix,
|
||||
endpoint: remote_ext_endpoint,
|
||||
concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
|
||||
max_keys_per_list_response: None,
|
||||
};
|
||||
let config = RemoteStorageConfig {
|
||||
max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"),
|
||||
max_sync_errors: NonZeroU32::new(100).expect("100 != 0"),
|
||||
storage: RemoteStorageKind::AwsS3(config),
|
||||
};
|
||||
GenericRemoteStorage::from_config(&config)
|
||||
}
|
||||
@@ -16,6 +16,8 @@ use tokio::task;
|
||||
use tracing::{error, info};
|
||||
use tracing_utils::http::OtelName;
|
||||
|
||||
use crate::extension_server::{download_extension_sql_files, download_library_file};
|
||||
|
||||
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
|
||||
ComputeStatusResponse {
|
||||
start_time: state.start_time,
|
||||
@@ -121,8 +123,68 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
// download extension files from S3 on demand
|
||||
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
||||
info!("serving {:?} POST request", route);
|
||||
|
||||
let is_library = false;
|
||||
|
||||
let filename = route.split('/').last().unwrap();
|
||||
|
||||
info!(
|
||||
"serving /extension_server POST request, filename: {:?}",
|
||||
filename
|
||||
);
|
||||
|
||||
if compute.ext_remote_storage.is_none() {
|
||||
error!("Remote extension storage is not set up");
|
||||
let mut resp = Response::new(Body::from("Remote extension storage is not set up"));
|
||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
return resp;
|
||||
}
|
||||
let ext_storage = &compute.ext_remote_storage.unwrap();
|
||||
|
||||
if !is_library {
|
||||
match download_extension_sql_files(
|
||||
filename,
|
||||
&compute.availiable_extensions,
|
||||
&ext_storage,
|
||||
&compute.pgbin,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => Response::new(Body::from("OK")),
|
||||
Err(e) => {
|
||||
error!("extension download failed: {}", e);
|
||||
let mut resp = Response::new(Body::from(e.to_string()));
|
||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
resp
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match download_library_file(
|
||||
filename,
|
||||
&compute.availiable_libraries,
|
||||
&ext_storage,
|
||||
&compute.pgbin,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => Response::new(Body::from("OK")),
|
||||
Err(e) => {
|
||||
error!("library download failed: {}", e);
|
||||
let mut resp = Response::new(Body::from(e.to_string()));
|
||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
resp
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return the `404 Not Found` for any other routes.
|
||||
_ => {
|
||||
method => {
|
||||
info!("404 Not Found for {:?}", method);
|
||||
|
||||
let mut not_found = Response::new(Body::from("404 Not Found"));
|
||||
*not_found.status_mut() = StatusCode::NOT_FOUND;
|
||||
not_found
|
||||
|
||||
@@ -139,6 +139,34 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
/extension_server:
|
||||
post:
|
||||
tags:
|
||||
- Extension
|
||||
summary: Download extension from S3 to local folder.
|
||||
description: ""
|
||||
operationId: downloadExtension
|
||||
responses:
|
||||
200:
|
||||
description: Extension downloaded
|
||||
content:
|
||||
text/plain:
|
||||
schema:
|
||||
type: string
|
||||
description: Error text or 'OK' if download succeeded.
|
||||
example: "OK"
|
||||
400:
|
||||
description: Request is invalid.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
500:
|
||||
description: Extension download request failed.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
|
||||
components:
|
||||
securitySchemes:
|
||||
|
||||
@@ -9,6 +9,7 @@ pub mod http;
|
||||
#[macro_use]
|
||||
pub mod logger;
|
||||
pub mod compute;
|
||||
pub mod extension_server;
|
||||
pub mod monitor;
|
||||
pub mod params;
|
||||
pub mod pg_helpers;
|
||||
|
||||
@@ -17,7 +17,7 @@ use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
|
||||
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
|
||||
|
||||
/// Escape a string for including it in a SQL literal
|
||||
pub fn escape_literal(s: &str) -> String {
|
||||
fn escape_literal(s: &str) -> String {
|
||||
s.replace('\'', "''").replace('\\', "\\\\")
|
||||
}
|
||||
|
||||
|
||||
@@ -124,7 +124,7 @@ pub fn get_spec_from_control_plane(
|
||||
pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
|
||||
// File `postgresql.conf` is no longer included into `basebackup`, so just
|
||||
// always write all config into it creating new file.
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?;
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec, None)?;
|
||||
|
||||
update_pg_hba(pgdata_path)?;
|
||||
|
||||
@@ -269,13 +269,17 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
xact.execute(query.as_str(), &[])?;
|
||||
}
|
||||
RoleAction::Create => {
|
||||
let mut query: String = format!(
|
||||
"CREATE ROLE {} CREATEROLE CREATEDB IN ROLE neon_superuser",
|
||||
name.pg_quote()
|
||||
);
|
||||
let mut query: String = format!("CREATE ROLE {} ", name.pg_quote());
|
||||
info!("role create query: '{}'", &query);
|
||||
query.push_str(&role.to_pg_options());
|
||||
xact.execute(query.as_str(), &[])?;
|
||||
|
||||
let grant_query = format!(
|
||||
"GRANT pg_read_all_data, pg_write_all_data TO {}",
|
||||
name.pg_quote()
|
||||
);
|
||||
xact.execute(grant_query.as_str(), &[])?;
|
||||
info!("role grant query: '{}'", &grant_query);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -472,11 +476,6 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
query.push_str(&db.to_pg_options());
|
||||
let _guard = info_span!("executing", query).entered();
|
||||
client.execute(query.as_str(), &[])?;
|
||||
let grant_query: String = format!(
|
||||
"GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
|
||||
name.pg_quote()
|
||||
);
|
||||
client.execute(grant_query.as_str(), &[])?;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -496,9 +495,35 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
|
||||
/// to allow users creating trusted extensions and re-creating `public` schema, for example.
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_grants(spec: &ComputeSpec, connstr: &str) -> Result<()> {
|
||||
pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
|
||||
info!("cluster spec grants:");
|
||||
|
||||
// We now have a separate `web_access` role to connect to the database
|
||||
// via the web interface and proxy link auth. And also we grant a
|
||||
// read / write all data privilege to every role. So also grant
|
||||
// create to everyone.
|
||||
// XXX: later we should stop messing with Postgres ACL in such horrible
|
||||
// ways.
|
||||
let roles = spec
|
||||
.cluster
|
||||
.roles
|
||||
.iter()
|
||||
.map(|r| r.name.pg_quote())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for db in &spec.cluster.databases {
|
||||
let dbname = &db.name;
|
||||
|
||||
let query: String = format!(
|
||||
"GRANT CREATE ON DATABASE {} TO {}",
|
||||
dbname.pg_quote(),
|
||||
roles.join(", ")
|
||||
);
|
||||
info!("grant query {}", &query);
|
||||
|
||||
client.execute(query.as_str(), &[])?;
|
||||
}
|
||||
|
||||
// Do some per-database access adjustments. We'd better do this at db creation time,
|
||||
// but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
|
||||
// atomically.
|
||||
|
||||
@@ -32,3 +32,4 @@ utils.workspace = true
|
||||
|
||||
compute_api.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -657,6 +657,8 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.get_one::<String>("endpoint_id")
|
||||
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
|
||||
|
||||
let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
|
||||
|
||||
// If --safekeepers argument is given, use only the listed safekeeper nodes.
|
||||
let safekeepers =
|
||||
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
|
||||
@@ -698,7 +700,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
_ => {}
|
||||
}
|
||||
println!("Starting existing endpoint {endpoint_id}...");
|
||||
endpoint.start(&auth_token, safekeepers)?;
|
||||
endpoint.start(&auth_token, safekeepers, remote_ext_config)?;
|
||||
} else {
|
||||
let branch_name = sub_args
|
||||
.get_one::<String>("branch-name")
|
||||
@@ -742,7 +744,7 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
pg_version,
|
||||
mode,
|
||||
)?;
|
||||
ep.start(&auth_token, safekeepers)?;
|
||||
ep.start(&auth_token, safekeepers, remote_ext_config)?;
|
||||
}
|
||||
}
|
||||
"stop" => {
|
||||
@@ -1002,6 +1004,12 @@ fn cli() -> Command {
|
||||
.help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
|
||||
.required(false);
|
||||
|
||||
let remote_ext_config_args = Arg::new("remote-ext-config")
|
||||
.long("remote-ext-config")
|
||||
.num_args(1)
|
||||
.help("Configure the S3 bucket that we search for extensions in.")
|
||||
.required(false);
|
||||
|
||||
let lsn_arg = Arg::new("lsn")
|
||||
.long("lsn")
|
||||
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
|
||||
@@ -1152,6 +1160,7 @@ fn cli() -> Command {
|
||||
.arg(pg_version_arg)
|
||||
.arg(hot_standby_arg)
|
||||
.arg(safekeepers_arg)
|
||||
.arg(remote_ext_config_args)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("stop")
|
||||
|
||||
@@ -68,7 +68,6 @@ pub struct EndpointConf {
|
||||
http_port: u16,
|
||||
pg_version: u32,
|
||||
skip_pg_catalog_updates: bool,
|
||||
skip_sync_safekeepers: Option<utils::lsn::Lsn>,
|
||||
}
|
||||
|
||||
//
|
||||
@@ -138,7 +137,6 @@ impl ComputeControlPlane {
|
||||
tenant_id,
|
||||
pg_version,
|
||||
skip_pg_catalog_updates: false,
|
||||
skip_sync_safekeepers: None,
|
||||
});
|
||||
|
||||
ep.create_endpoint_dir()?;
|
||||
@@ -153,7 +151,6 @@ impl ComputeControlPlane {
|
||||
pg_port,
|
||||
pg_version,
|
||||
skip_pg_catalog_updates: false,
|
||||
skip_sync_safekeepers: None,
|
||||
})?,
|
||||
)?;
|
||||
std::fs::write(
|
||||
@@ -192,7 +189,6 @@ pub struct Endpoint {
|
||||
|
||||
// Optimizations
|
||||
skip_pg_catalog_updates: bool,
|
||||
skip_sync_safekeepers: Option<utils::lsn::Lsn>,
|
||||
}
|
||||
|
||||
impl Endpoint {
|
||||
@@ -227,7 +223,6 @@ impl Endpoint {
|
||||
tenant_id: conf.tenant_id,
|
||||
pg_version: conf.pg_version,
|
||||
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
|
||||
skip_sync_safekeepers: conf.skip_sync_safekeepers,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -413,7 +408,12 @@ impl Endpoint {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn start(&self, auth_token: &Option<String>, safekeepers: Vec<NodeId>) -> Result<()> {
|
||||
pub fn start(
|
||||
&self,
|
||||
auth_token: &Option<String>,
|
||||
safekeepers: Vec<NodeId>,
|
||||
remote_ext_config: Option<&String>,
|
||||
) -> Result<()> {
|
||||
if self.status() == "running" {
|
||||
anyhow::bail!("The endpoint is already running");
|
||||
}
|
||||
@@ -462,7 +462,6 @@ impl Endpoint {
|
||||
|
||||
// Create spec file
|
||||
let spec = ComputeSpec {
|
||||
skip_sync_safekeepers: self.skip_sync_safekeepers,
|
||||
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
|
||||
format_version: 1.0,
|
||||
operation_uuid: None,
|
||||
@@ -513,6 +512,9 @@ impl Endpoint {
|
||||
.stdin(std::process::Stdio::null())
|
||||
.stderr(logfile.try_clone()?)
|
||||
.stdout(logfile);
|
||||
if let Some(remote_ext_config) = remote_ext_config {
|
||||
cmd.args(["--remote-ext-config", remote_ext_config]);
|
||||
}
|
||||
let _child = cmd.spawn()?;
|
||||
|
||||
// Wait for it to start
|
||||
|
||||
301
docs/rfcs/024-extension-loading.md
Normal file
301
docs/rfcs/024-extension-loading.md
Normal file
@@ -0,0 +1,301 @@
|
||||
# Supporting custom user Extensions
|
||||
|
||||
Created 2023-05-03
|
||||
|
||||
## Motivation
|
||||
|
||||
There are many extensions in the PostgreSQL ecosystem, and not all extensions
|
||||
are of a quality that we can confidently support them. Additionally, our
|
||||
current extension inclusion mechanism has several problems because we build all
|
||||
extensions into the primary Compute image: We build the extensions every time
|
||||
we build the compute image regardless of whether we actually need to rebuild
|
||||
the image, and the inclusion of these extensions in the image adds a hard
|
||||
dependency on all supported extensions - thus increasing the image size, and
|
||||
with it the time it takes to download that image - increasing first start
|
||||
latency.
|
||||
|
||||
This RFC proposes a dynamic loading mechanism that solves most of these
|
||||
problems.
|
||||
|
||||
## Summary
|
||||
|
||||
`compute_ctl` is made responsible for loading extensions on-demand into
|
||||
the container's file system for dynamically loaded extensions, and will also
|
||||
make sure that the extensions in `shared_preload_libraries` are downloaded
|
||||
before the compute node starts.
|
||||
|
||||
## Components
|
||||
|
||||
compute_ctl, PostgreSQL, neon (extension), Compute Host Node, Extension Store
|
||||
|
||||
## Requirements
|
||||
|
||||
Compute nodes with no extra extensions should not be negatively impacted by
|
||||
the existence of support for many extensions.
|
||||
|
||||
Installing an extension into PostgreSQL should be easy.
|
||||
|
||||
Non-preloaded extensions shouldn't impact startup latency.
|
||||
|
||||
Uninstalled extensions shouldn't impact query latency.
|
||||
|
||||
A small latency penalty for dynamically loaded extensions is acceptable in
|
||||
the first seconds of compute startup, but not in steady-state operations.
|
||||
|
||||
## Proposed implementation
|
||||
|
||||
### On-demand, JIT-loading of extensions
|
||||
|
||||
TLDR; we download extensions as soon as we need them, or when we have spare
|
||||
time.
|
||||
|
||||
That means, we first download the extensions required to start the PostMaster
|
||||
(`shared_preload_libraries` and their dependencies), then the libraries required
|
||||
before a backend can start processing user input (`preload_libraries` and
|
||||
dependencies), and then (with network limits applied) the remainder of the
|
||||
configured extensions, with prioritization for installed extensions.
|
||||
|
||||
If PostgreSQL tries to load a library that is not yet fully on disk, it will
|
||||
ask `compute_ctl` first if the extension has been downloaded yet, and will wait
|
||||
for `compute_ctl` to finish downloading that extension. `compute_ctl` will
|
||||
prioritize downloading that extension over other extensions that were not yet
|
||||
requested.
|
||||
|
||||
#### Workflow
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
autonumber
|
||||
participant EX as External (control plane, ...)
|
||||
participant CTL as compute_ctl
|
||||
participant ST as extension store
|
||||
actor PG as PostgreSQL
|
||||
|
||||
EX ->>+ CTL: Start compute with config X
|
||||
|
||||
note over CTL: The configuration contains a list of all <br/>extensions available to that compute node, etc.
|
||||
|
||||
par Optionally parallel or concurrent
|
||||
loop Available extensions
|
||||
CTL ->>+ ST: Download control file of extension
|
||||
activate CTL
|
||||
ST ->>- CTL: Finish downloading control file
|
||||
CTL ->>- CTL: Put control file in extensions directory
|
||||
end
|
||||
|
||||
loop For each extension in shared_preload_libraries
|
||||
CTL ->>+ ST: Download extension's data
|
||||
activate CTL
|
||||
ST ->>- CTL: Finish downloading
|
||||
CTL ->>- CTL: Put extension's files in the right place
|
||||
end
|
||||
end
|
||||
|
||||
CTL ->>+ PG: Start PostgreSQL
|
||||
|
||||
note over CTL: PostgreSQL can now start accepting <br/>connections. However, users may still need to wait <br/>for preload_libraries extensions to get downloaded.
|
||||
|
||||
par Load preload_libraries
|
||||
loop For each extension in preload_libraries
|
||||
CTL ->>+ ST: Download extension's data
|
||||
activate CTL
|
||||
ST ->>- CTL: Finish downloading
|
||||
CTL ->>- CTL: Put extension's files in the right place
|
||||
end
|
||||
end
|
||||
|
||||
note over CTL: After this, connections don't have any hard <br/>waits for extension files left, except for those <br/>connections that override preload_libraries <br/>in their startup packet
|
||||
|
||||
par PG's internal_load_library(library)
|
||||
alt Library is not yet loaded
|
||||
PG ->>+ CTL: Load library X
|
||||
CTL ->>+ ST: Download the extension that provides X
|
||||
ST ->>- CTL: Finish downloading
|
||||
CTL ->> CTL: Put extension's files in the right place
|
||||
CTL ->>- PG: Ready
|
||||
else Library is already loaded
|
||||
note over PG: No-op
|
||||
end
|
||||
and Download all remaining extensions
|
||||
loop Extension X
|
||||
CTL ->>+ ST: Download not-yet-downloaded extension X
|
||||
activate CTL
|
||||
ST ->>- CTL: Finish downloading
|
||||
CTL ->>- CTL: Put extension's files in the right place
|
||||
end
|
||||
end
|
||||
|
||||
deactivate PG
|
||||
deactivate CTL
|
||||
```
|
||||
|
||||
#### Summary
|
||||
|
||||
Pros:
|
||||
- Startup is only as slow as it takes to load all (shared_)preload_libraries
|
||||
- Supports BYO Extension
|
||||
|
||||
Cons:
|
||||
- O(sizeof(extensions)) IO requirement for loading all extensions.
|
||||
|
||||
### Alternative solutions
|
||||
|
||||
1. Allow users to add their extensions to the base image
|
||||
|
||||
Pros:
|
||||
- Easy to deploy
|
||||
|
||||
Cons:
|
||||
- Doesn't scale - first start size is dependent on image size;
|
||||
- All extensions are shared across all users: It doesn't allow users to
|
||||
bring their own restrictive-licensed extensions
|
||||
|
||||
2. Bring Your Own compute image
|
||||
|
||||
Pros:
|
||||
- Still easy to deploy
|
||||
- User can bring own patched version of PostgreSQL
|
||||
|
||||
Cons:
|
||||
- First start latency is O(sizeof(extensions image))
|
||||
- Warm instance pool for skipping pod schedule latency is not feasible with
|
||||
O(n) custom images
|
||||
- Support channels are difficult to manage
|
||||
|
||||
3. Download all user extensions in bulk on compute start
|
||||
|
||||
Pros:
|
||||
- Easy to deploy
|
||||
- No startup latency issues for "clean" users.
|
||||
- Warm instance pool for skipping pod schedule latency is possible
|
||||
|
||||
Cons:
|
||||
- Downloading all extensions in advance takes a lot of time, thus startup
|
||||
latency issues
|
||||
|
||||
4. Store user's extensions in persistent storage
|
||||
|
||||
Pros:
|
||||
- Easy to deploy
|
||||
- No startup latency issues
|
||||
- Warm instance pool for skipping pod schedule latency is possible
|
||||
|
||||
Cons:
|
||||
- EC2 instances have only limited number of attachments shared between EBS
|
||||
volumes, direct-attached NVMe drives, and ENIs.
|
||||
- Compute instance migration isn't trivially solved for EBS mounts (e.g.
|
||||
the device is unavailable whilst moving the mount between instances).
|
||||
- EBS can only mount on one instance at a time (except the expensive IO2
|
||||
device type).
|
||||
|
||||
5. Store user's extensions in network drive
|
||||
|
||||
Pros:
|
||||
- Easy to deploy
|
||||
- Few startup latency issues
|
||||
- Warm instance pool for skipping pod schedule latency is possible
|
||||
|
||||
Cons:
|
||||
- We'd need networked drives, and a lot of them, which would store many
|
||||
duplicate extensions.
|
||||
- **UNCHECKED:** Compute instance migration may not work nicely with
|
||||
networked IOs
|
||||
|
||||
|
||||
### Idea extensions
|
||||
|
||||
The extension store does not have to be S3 directly, but could be a Node-local
|
||||
caching service on top of S3. This would reduce the load on the network for
|
||||
popular extensions.
|
||||
|
||||
## Extension Store implementation
|
||||
|
||||
Extension Store in our case is a private S3 bucket.
|
||||
Extensions are stored as tarballs in the bucket. The tarball contains the extension's control file and all the files that the extension needs to run.
|
||||
|
||||
We may also store the control file separately from the tarball to speed up the extension loading.
|
||||
|
||||
`s3://<the-bucket>/extensions/ext-name/sha-256+1234abcd1234abcd1234abcd1234abcd/bundle.tar`
|
||||
|
||||
where `ext-name` is an extension name and `sha-256+1234abcd1234abcd1234abcd1234abcd` is a hash of a specific extension version tarball.
|
||||
|
||||
To ensure security, there is no direct access to the S3 bucket from compute node.
|
||||
|
||||
Control plane forms a list of extensions available to the compute node
|
||||
and forms a short-lived [pre-signed URL](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ShareObjectPreSignedURL.html)
|
||||
for each extension that is available to the compute node.
|
||||
|
||||
so, `compute_ctl` receives spec in the following format
|
||||
|
||||
```
|
||||
"extensions": [{
|
||||
"meta_format": 1,
|
||||
"extension_name": "postgis",
|
||||
"link": "https://<the-bucket>/extensions/sha-256+1234abcd1234abcd1234abcd1234abcd/bundle.tar?AWSAccessKeyId=1234abcd1234abcd1234abcd1234abcd&Expires=1234567890&Signature=1234abcd1234abcd1234abcd1234abcd",
|
||||
...
|
||||
}]
|
||||
```
|
||||
|
||||
`compute_ctl` then downloads the extension from the link and unpacks it to the right place.
|
||||
|
||||
### How do we handle private extensions?
|
||||
|
||||
Private and public extensions are treated equally from the Extension Store perspective.
|
||||
The only difference is that the private extensions are not listed in the user UI (managed by control plane).
|
||||
|
||||
### How to add new extension to the Extension Store?
|
||||
|
||||
Since we need to verify that the extension is compatible with the compute node and doesn't contain any malicious code,
|
||||
we need to review the extension before adding it to the Extension Store.
|
||||
|
||||
I do not expect that we will have a lot of extensions to review, so we can do it manually for now.
|
||||
|
||||
Some admin UI may be added later to automate this process.
|
||||
|
||||
The list of extensions available to a compute node is stored in the console database.
|
||||
|
||||
### How is the list of available extensions managed?
|
||||
|
||||
We need to add new tables to the console database to store the list of available extensions, their versions and access rights.
|
||||
|
||||
something like this:
|
||||
|
||||
```
|
||||
CREATE TABLE extensions (
|
||||
id SERIAL PRIMARY KEY,
|
||||
name VARCHAR(255) NOT NULL,
|
||||
version VARCHAR(255) NOT NULL,
|
||||
hash VARCHAR(255) NOT NULL, // this is the path to the extension in the Extension Store
|
||||
supported_postgres_versions integer[] NOT NULL,
|
||||
is_public BOOLEAN NOT NULL, // public extensions are available to all users
|
||||
is_shared_preload BOOLEAN NOT NULL, // these extensions require postgres restart
|
||||
is_preload BOOLEAN NOT NULL,
|
||||
license VARCHAR(255) NOT NULL,
|
||||
);
|
||||
|
||||
CREATE TABLE user_extensions (
|
||||
user_id INTEGER NOT NULL,
|
||||
extension_id INTEGER NOT NULL,
|
||||
FOREIGN KEY (user_id) REFERENCES users (id),
|
||||
FOREIGN KEY (extension_id) REFERENCES extensions (id)
|
||||
);
|
||||
```
|
||||
|
||||
When new extension is added to the Extension Store, we add a new record to the table and set permissions.
|
||||
|
||||
In UI, user may select the extensions that they want to use with their compute node.
|
||||
|
||||
NOTE: Extensions that require postgres restart will not be available until the next compute restart.
|
||||
Also, currently user cannot force postgres restart. We should add this feature later.
|
||||
|
||||
For other extensions, we must communicate updates to `compute_ctl` and they will be downloaded in the background.
|
||||
|
||||
### How can user update the extension?
|
||||
|
||||
User can update the extension by selecting the new version of the extension in the UI.
|
||||
|
||||
### Alternatives
|
||||
|
||||
For extensions written on trusted languages we can also adopt
|
||||
`dbdev` PostgreSQL Package Manager based on `pg_tle` by Supabase.
|
||||
This will increase the amount supported extensions and decrease the amount of work required to support them.
|
||||
@@ -33,15 +33,6 @@ pub struct ComputeSpec {
|
||||
#[serde(default)] // Default false
|
||||
pub skip_pg_catalog_updates: bool,
|
||||
|
||||
/// An optinal hint that can be passed to speed up startup time if we know
|
||||
/// that safekeepers have already been synced at the given LSN.
|
||||
///
|
||||
/// NOTE: If there's any possibility that the safekeepers could have advanced
|
||||
/// (e.g. if we started compute, and it crashed) we should stay on the
|
||||
/// safe side and provide None.
|
||||
#[serde(default)]
|
||||
pub skip_sync_safekeepers: Option<Lsn>,
|
||||
|
||||
// Information needed to connect to the storage layer.
|
||||
//
|
||||
// `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.
|
||||
|
||||
@@ -23,7 +23,6 @@ use prometheus::{Registry, Result};
|
||||
pub mod launch_timestamp;
|
||||
mod wrappers;
|
||||
pub use wrappers::{CountedReader, CountedWriter};
|
||||
pub mod metric_vec_duration;
|
||||
|
||||
pub type UIntGauge = GenericGauge<AtomicU64>;
|
||||
pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
//! Helpers for observing duration on HistogramVec / CounterVec / GaugeVec / MetricVec<T>.
|
||||
|
||||
use std::{future::Future, time::Instant};
|
||||
|
||||
pub trait DurationResultObserver {
|
||||
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration);
|
||||
}
|
||||
|
||||
pub async fn observe_async_block_duration_by_result<
|
||||
T,
|
||||
E,
|
||||
F: Future<Output = Result<T, E>>,
|
||||
O: DurationResultObserver,
|
||||
>(
|
||||
observer: &O,
|
||||
block: F,
|
||||
) -> Result<T, E> {
|
||||
let start = Instant::now();
|
||||
let result = block.await;
|
||||
let duration = start.elapsed();
|
||||
observer.observe_result(&result, duration);
|
||||
result
|
||||
}
|
||||
@@ -184,6 +184,20 @@ pub enum GenericRemoteStorage {
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
// A function for listing all the files in a "directory"
|
||||
// Example:
|
||||
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
|
||||
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.list_files(folder).await,
|
||||
Self::AwsS3(s) => s.list_files(folder).await,
|
||||
Self::Unreliable(s) => s.list_files(folder).await,
|
||||
}
|
||||
}
|
||||
|
||||
// lists common *prefixes*, if any of files
|
||||
// Example:
|
||||
// list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"]
|
||||
pub async fn list_prefixes(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
@@ -195,14 +209,6 @@ impl GenericRemoteStorage {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.list_files(folder).await,
|
||||
Self::AwsS3(s) => s.list_files(folder).await,
|
||||
Self::Unreliable(s) => s.list_files(folder).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn upload(
|
||||
&self,
|
||||
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
|
||||
@@ -349,6 +349,7 @@ impl RemoteStorage for S3Bucket {
|
||||
|
||||
/// See the doc for `RemoteStorage::list_files`
|
||||
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||
// TODO: if bucket prefix is empty, folder is prefixed with a "/" I think. Is this desired?
|
||||
let folder_name = folder
|
||||
.map(|p| self.relative_path_to_s3_object(p))
|
||||
.or_else(|| self.prefix_in_bucket.clone());
|
||||
|
||||
@@ -173,15 +173,10 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
|
||||
let path2 = RemotePath::new(&PathBuf::from(format!("{}/path2", ctx.base_prefix,)))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
let path3 = RemotePath::new(&PathBuf::from(format!("{}/path3", ctx.base_prefix,)))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
let data1 = "remote blob data1".as_bytes();
|
||||
let data1_len = data1.len();
|
||||
let data2 = "remote blob data2".as_bytes();
|
||||
let data2_len = data2.len();
|
||||
let data3 = "remote blob data3".as_bytes();
|
||||
let data3_len = data3.len();
|
||||
ctx.client
|
||||
.upload(std::io::Cursor::new(data1), data1_len, &path1, None)
|
||||
.await?;
|
||||
@@ -190,18 +185,8 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
|
||||
.upload(std::io::Cursor::new(data2), data2_len, &path2, None)
|
||||
.await?;
|
||||
|
||||
ctx.client
|
||||
.upload(std::io::Cursor::new(data3), data3_len, &path3, None)
|
||||
.await?;
|
||||
|
||||
ctx.client.delete_objects(&[path1, path2]).await?;
|
||||
|
||||
let prefixes = ctx.client.list_prefixes(None).await?;
|
||||
|
||||
assert_eq!(prefixes.len(), 1);
|
||||
|
||||
ctx.client.delete_objects(&[path3]).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -495,50 +495,50 @@ fn start_pageserver(
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
|
||||
let background_jobs_barrier = background_jobs_barrier;
|
||||
let metrics_ctx = RequestContext::todo_child(
|
||||
TaskKind::MetricsCollection,
|
||||
// This task itself shouldn't download anything.
|
||||
// The actual size calculation does need downloads, and
|
||||
// creates a child context with the right DownloadBehavior.
|
||||
DownloadBehavior::Error,
|
||||
);
|
||||
task_mgr::spawn(
|
||||
crate::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::MetricsCollection,
|
||||
None,
|
||||
None,
|
||||
"consumption metrics collection",
|
||||
true,
|
||||
async move {
|
||||
// first wait until background jobs are cleared to launch.
|
||||
//
|
||||
// this is because we only process active tenants and timelines, and the
|
||||
// Timeline::get_current_logical_size will spawn the logical size calculation,
|
||||
// which will not be rate-limited.
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
|
||||
let background_jobs_barrier = background_jobs_barrier;
|
||||
let metrics_ctx = RequestContext::todo_child(
|
||||
TaskKind::MetricsCollection,
|
||||
// This task itself shouldn't download anything.
|
||||
// The actual size calculation does need downloads, and
|
||||
// creates a child context with the right DownloadBehavior.
|
||||
DownloadBehavior::Error,
|
||||
);
|
||||
task_mgr::spawn(
|
||||
MGMT_REQUEST_RUNTIME.handle(),
|
||||
TaskKind::MetricsCollection,
|
||||
None,
|
||||
None,
|
||||
"consumption metrics collection",
|
||||
true,
|
||||
async move {
|
||||
// first wait until background jobs are cleared to launch.
|
||||
//
|
||||
// this is because we only process active tenants and timelines, and the
|
||||
// Timeline::get_current_logical_size will spawn the logical size calculation,
|
||||
// which will not be rate-limited.
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()); },
|
||||
_ = background_jobs_barrier.wait() => {}
|
||||
};
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()); },
|
||||
_ = background_jobs_barrier.wait() => {}
|
||||
};
|
||||
|
||||
pageserver::consumption_metrics::collect_metrics(
|
||||
metric_collection_endpoint,
|
||||
conf.metric_collection_interval,
|
||||
conf.cached_metric_collection_interval,
|
||||
conf.synthetic_size_calculation_interval,
|
||||
conf.id,
|
||||
metrics_ctx,
|
||||
)
|
||||
.instrument(info_span!("metrics_collection"))
|
||||
.await?;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
pageserver::consumption_metrics::collect_metrics(
|
||||
metric_collection_endpoint,
|
||||
conf.metric_collection_interval,
|
||||
conf.cached_metric_collection_interval,
|
||||
conf.synthetic_size_calculation_interval,
|
||||
conf.id,
|
||||
metrics_ctx,
|
||||
)
|
||||
.instrument(info_span!("metrics_collection"))
|
||||
.await?;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn a task to listen for libpq connections. It will spawn further tasks
|
||||
|
||||
@@ -96,12 +96,12 @@ pub mod defaults {
|
||||
|
||||
#background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
|
||||
|
||||
[tenant_config]
|
||||
# [tenant_config]
|
||||
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
|
||||
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
|
||||
#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
|
||||
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
|
||||
#compaction_threshold = {DEFAULT_COMPACTION_THRESHOLD}
|
||||
#compaction_threshold = '{DEFAULT_COMPACTION_THRESHOLD}'
|
||||
|
||||
#gc_period = '{DEFAULT_GC_PERIOD}'
|
||||
#gc_horizon = {DEFAULT_GC_HORIZON}
|
||||
@@ -111,8 +111,7 @@ pub mod defaults {
|
||||
#min_resident_size_override = .. # in bytes
|
||||
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
|
||||
#gc_feedback = false
|
||||
|
||||
[remote_storage]
|
||||
# [remote_storage]
|
||||
|
||||
"###
|
||||
);
|
||||
|
||||
@@ -186,8 +186,10 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
delete:
|
||||
description: "Attempts to delete specified timeline. 500 and 409 errors should be retried"
|
||||
description: "Attempts to delete specified timeline. On 500 errors should be retried"
|
||||
responses:
|
||||
"200":
|
||||
description: Ok
|
||||
"400":
|
||||
description: Error when no tenant id found in path or no timeline id
|
||||
content:
|
||||
@@ -212,12 +214,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/NotFoundError"
|
||||
"409":
|
||||
description: Deletion is already in progress, continue polling
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ConflictError"
|
||||
"412":
|
||||
description: Tenant is missing, or timeline has children
|
||||
content:
|
||||
|
||||
@@ -187,7 +187,6 @@ impl From<crate::tenant::DeleteTimelineError> for ApiError {
|
||||
format!("Cannot delete timeline which has child timelines: {children:?}")
|
||||
.into_boxed_str(),
|
||||
),
|
||||
a @ AlreadyInProgress => ApiError::Conflict(a.to_string()),
|
||||
Other(e) => ApiError::InternalServerError(e),
|
||||
}
|
||||
}
|
||||
@@ -1129,6 +1128,8 @@ async fn disk_usage_eviction_run(
|
||||
freed_bytes: 0,
|
||||
};
|
||||
|
||||
use crate::task_mgr::MGMT_REQUEST_RUNTIME;
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
|
||||
let state = get_state(&r);
|
||||
@@ -1146,7 +1147,7 @@ async fn disk_usage_eviction_run(
|
||||
let _g = cancel.drop_guard();
|
||||
|
||||
crate::task_mgr::spawn(
|
||||
crate::task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
MGMT_REQUEST_RUNTIME.handle(),
|
||||
TaskKind::DiskUsageEviction,
|
||||
None,
|
||||
None,
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use metrics::metric_vec_duration::DurationResultObserver;
|
||||
use metrics::{
|
||||
register_counter_vec, register_histogram, register_histogram_vec, register_int_counter,
|
||||
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
|
||||
@@ -425,27 +424,6 @@ pub static SMGR_QUERY_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub struct BasebackupQueryTime(HistogramVec);
|
||||
pub static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|| {
|
||||
BasebackupQueryTime({
|
||||
register_histogram_vec!(
|
||||
"pageserver_basebackup_query_seconds",
|
||||
"Histogram of basebackup queries durations, by result type",
|
||||
&["result"],
|
||||
CRITICAL_OP_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
})
|
||||
});
|
||||
|
||||
impl DurationResultObserver for BasebackupQueryTime {
|
||||
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration) {
|
||||
let label_value = if res.is_ok() { "ok" } else { "error" };
|
||||
let metric = self.0.get_metric_with_label_values(&[label_value]).unwrap();
|
||||
metric.observe(duration.as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
pub static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_live_connections",
|
||||
@@ -845,6 +823,11 @@ impl TimelineMetrics {
|
||||
let evictions_with_low_residence_duration =
|
||||
evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id);
|
||||
|
||||
// TODO(chi): remove this once we remove Lazy for all metrics. Otherwise this will not appear in the exporter
|
||||
// and integration test will error.
|
||||
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get();
|
||||
MATERIALIZED_PAGE_CACHE_HIT.get();
|
||||
|
||||
TimelineMetrics {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
@@ -1319,8 +1302,4 @@ pub fn preinitialize_metrics() {
|
||||
|
||||
// Same as above for this metric, but, it's a Vec-type metric for which we don't know all the labels.
|
||||
BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT.reset();
|
||||
|
||||
// Python tests need these.
|
||||
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get();
|
||||
MATERIALIZED_PAGE_CACHE_HIT.get();
|
||||
}
|
||||
|
||||
@@ -913,24 +913,10 @@ where
|
||||
None
|
||||
};
|
||||
|
||||
metrics::metric_vec_duration::observe_async_block_duration_by_result(
|
||||
&*crate::metrics::BASEBACKUP_QUERY_TIME,
|
||||
async move {
|
||||
self.handle_basebackup_request(
|
||||
pgb,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
None,
|
||||
false,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
anyhow::Ok(())
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
// Check that the timeline exists
|
||||
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false, ctx)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
}
|
||||
// return pair of prev_lsn and last_lsn
|
||||
else if query_string.starts_with("get_last_record_rlsn ") {
|
||||
|
||||
@@ -440,13 +440,8 @@ pub enum GetTimelineError {
|
||||
pub enum DeleteTimelineError {
|
||||
#[error("NotFound")]
|
||||
NotFound,
|
||||
|
||||
#[error("HasChildren")]
|
||||
HasChildren(Vec<TimelineId>),
|
||||
|
||||
#[error("Timeline deletion is already in progress")]
|
||||
AlreadyInProgress,
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
@@ -1760,11 +1755,14 @@ impl Tenant {
|
||||
timeline = Arc::clone(timeline_entry.get());
|
||||
|
||||
// Prevent two tasks from trying to delete the timeline at the same time.
|
||||
delete_lock_guard = DeletionGuard(
|
||||
Arc::clone(&timeline.delete_lock)
|
||||
.try_lock_owned()
|
||||
.map_err(|_| DeleteTimelineError::AlreadyInProgress)?,
|
||||
);
|
||||
delete_lock_guard =
|
||||
DeletionGuard(Arc::clone(&timeline.delete_lock).try_lock_owned().map_err(
|
||||
|_| {
|
||||
DeleteTimelineError::Other(anyhow::anyhow!(
|
||||
"timeline deletion is already in progress"
|
||||
))
|
||||
},
|
||||
)?);
|
||||
|
||||
// If another task finished the deletion just before we acquired the lock,
|
||||
// return success.
|
||||
|
||||
@@ -862,8 +862,10 @@ impl RemoteTimelineClient {
|
||||
"Found {} files not bound to index_file.json, proceeding with their deletion",
|
||||
remaining.len()
|
||||
);
|
||||
warn!("About to remove {} files", remaining.len());
|
||||
self.storage_impl.delete_objects(&remaining).await?;
|
||||
for file in remaining {
|
||||
warn!("Removing {}", file.object_name().unwrap_or_default());
|
||||
self.storage_impl.delete(&file).await?;
|
||||
}
|
||||
}
|
||||
|
||||
let index_file_path = timeline_storage_path.join(Path::new(IndexPart::FILE_NAME));
|
||||
|
||||
@@ -129,7 +129,7 @@ pub struct Timeline {
|
||||
|
||||
pub pg_version: u32,
|
||||
|
||||
pub(crate) layers: Arc<tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>>,
|
||||
pub(crate) layers: tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
|
||||
/// Set of key ranges which should be covered by image layers to
|
||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||
@@ -1418,7 +1418,7 @@ impl Timeline {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
layers: Arc::new(tokio::sync::RwLock::new(LayerMap::default())),
|
||||
layers: tokio::sync::RwLock::new(LayerMap::default()),
|
||||
wanted_image_layers: Mutex::new(None),
|
||||
|
||||
walredo_mgr,
|
||||
@@ -3370,14 +3370,14 @@ struct CompactLevel0Phase1StatsBuilder {
|
||||
version: Option<u64>,
|
||||
tenant_id: Option<TenantId>,
|
||||
timeline_id: Option<TimelineId>,
|
||||
read_lock_acquisition_micros: DurationRecorder,
|
||||
read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
|
||||
read_lock_held_prerequisites_micros: DurationRecorder,
|
||||
read_lock_held_compute_holes_micros: DurationRecorder,
|
||||
read_lock_drop_micros: DurationRecorder,
|
||||
prepare_iterators_micros: DurationRecorder,
|
||||
write_layer_files_micros: DurationRecorder,
|
||||
first_read_lock_acquisition_micros: DurationRecorder,
|
||||
get_level0_deltas_plus_drop_lock_micros: DurationRecorder,
|
||||
level0_deltas_count: Option<usize>,
|
||||
time_spent_between_locks: DurationRecorder,
|
||||
second_read_lock_acquisition_micros: DurationRecorder,
|
||||
second_read_lock_held_micros: DurationRecorder,
|
||||
sort_holes_micros: DurationRecorder,
|
||||
write_layer_files_micros: DurationRecorder,
|
||||
new_deltas_count: Option<usize>,
|
||||
new_deltas_size: Option<u64>,
|
||||
}
|
||||
@@ -3390,14 +3390,14 @@ struct CompactLevel0Phase1Stats {
|
||||
tenant_id: TenantId,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
timeline_id: TimelineId,
|
||||
read_lock_acquisition_micros: RecordedDuration,
|
||||
read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
|
||||
read_lock_held_prerequisites_micros: RecordedDuration,
|
||||
read_lock_held_compute_holes_micros: RecordedDuration,
|
||||
read_lock_drop_micros: RecordedDuration,
|
||||
prepare_iterators_micros: RecordedDuration,
|
||||
write_layer_files_micros: RecordedDuration,
|
||||
first_read_lock_acquisition_micros: RecordedDuration,
|
||||
get_level0_deltas_plus_drop_lock_micros: RecordedDuration,
|
||||
level0_deltas_count: usize,
|
||||
time_spent_between_locks: RecordedDuration,
|
||||
second_read_lock_acquisition_micros: RecordedDuration,
|
||||
second_read_lock_held_micros: RecordedDuration,
|
||||
sort_holes_micros: RecordedDuration,
|
||||
write_layer_files_micros: RecordedDuration,
|
||||
new_deltas_count: usize,
|
||||
new_deltas_size: u64,
|
||||
}
|
||||
@@ -3406,51 +3406,54 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
version: value.version.ok_or_else(|| anyhow!("version not set"))?,
|
||||
tenant_id: value
|
||||
.tenant_id
|
||||
.ok_or_else(|| anyhow!("tenant_id not set"))?,
|
||||
timeline_id: value
|
||||
.timeline_id
|
||||
.ok_or_else(|| anyhow!("timeline_id not set"))?,
|
||||
read_lock_acquisition_micros: value
|
||||
.read_lock_acquisition_micros
|
||||
let CompactLevel0Phase1StatsBuilder {
|
||||
version,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
first_read_lock_acquisition_micros,
|
||||
get_level0_deltas_plus_drop_lock_micros,
|
||||
level0_deltas_count,
|
||||
time_spent_between_locks,
|
||||
second_read_lock_acquisition_micros,
|
||||
second_read_lock_held_micros,
|
||||
sort_holes_micros,
|
||||
write_layer_files_micros,
|
||||
new_deltas_count,
|
||||
new_deltas_size,
|
||||
} = value;
|
||||
Ok(CompactLevel0Phase1Stats {
|
||||
version: version.ok_or_else(|| anyhow::anyhow!("version not set"))?,
|
||||
tenant_id: tenant_id.ok_or_else(|| anyhow::anyhow!("tenant_id not set"))?,
|
||||
timeline_id: timeline_id.ok_or_else(|| anyhow::anyhow!("timeline_id not set"))?,
|
||||
first_read_lock_acquisition_micros: first_read_lock_acquisition_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
|
||||
read_lock_held_spawn_blocking_startup_micros: value
|
||||
.read_lock_held_spawn_blocking_startup_micros
|
||||
.ok_or_else(|| anyhow::anyhow!("first_read_lock_acquisition_micros not set"))?,
|
||||
get_level0_deltas_plus_drop_lock_micros: get_level0_deltas_plus_drop_lock_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
|
||||
read_lock_held_prerequisites_micros: value
|
||||
.read_lock_held_prerequisites_micros
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!("get_level0_deltas_plus_drop_lock_micros not set")
|
||||
})?,
|
||||
level0_deltas_count: level0_deltas_count
|
||||
.ok_or_else(|| anyhow::anyhow!("level0_deltas_count not set"))?,
|
||||
time_spent_between_locks: time_spent_between_locks
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
|
||||
read_lock_held_compute_holes_micros: value
|
||||
.read_lock_held_compute_holes_micros
|
||||
.ok_or_else(|| anyhow::anyhow!("time_spent_between_locks not set"))?,
|
||||
second_read_lock_acquisition_micros: second_read_lock_acquisition_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
|
||||
read_lock_drop_micros: value
|
||||
.read_lock_drop_micros
|
||||
.ok_or_else(|| anyhow::anyhow!("second_read_lock_acquisition_micros not set"))?,
|
||||
second_read_lock_held_micros: second_read_lock_held_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
|
||||
prepare_iterators_micros: value
|
||||
.prepare_iterators_micros
|
||||
.ok_or_else(|| anyhow::anyhow!("second_read_lock_held_micros not set"))?,
|
||||
sort_holes_micros: sort_holes_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("prepare_iterators_micros not set"))?,
|
||||
write_layer_files_micros: value
|
||||
.write_layer_files_micros
|
||||
.ok_or_else(|| anyhow::anyhow!("sort_holes_micros not set"))?,
|
||||
write_layer_files_micros: write_layer_files_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
|
||||
level0_deltas_count: value
|
||||
.level0_deltas_count
|
||||
.ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
|
||||
new_deltas_count: value
|
||||
.new_deltas_count
|
||||
.ok_or_else(|| anyhow!("new_deltas_count not set"))?,
|
||||
new_deltas_size: value
|
||||
.new_deltas_size
|
||||
.ok_or_else(|| anyhow!("new_deltas_size not set"))?,
|
||||
.ok_or_else(|| anyhow::anyhow!("write_layer_files_micros not set"))?,
|
||||
new_deltas_count: new_deltas_count
|
||||
.ok_or_else(|| anyhow::anyhow!("new_deltas_count not set"))?,
|
||||
new_deltas_size: new_deltas_size
|
||||
.ok_or_else(|| anyhow::anyhow!("new_deltas_size not set"))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -3461,18 +3464,30 @@ impl Timeline {
|
||||
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
|
||||
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
|
||||
/// start of level0 files compaction, the on-demand download should be revisited as well.
|
||||
fn compact_level0_phase1(
|
||||
self: Arc<Self>,
|
||||
async fn compact_level0_phase1(
|
||||
&self,
|
||||
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
layers: tokio::sync::OwnedRwLockReadGuard<LayerMap<dyn PersistentLayer>>,
|
||||
mut stats: CompactLevel0Phase1StatsBuilder,
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactLevel0Phase1Result, CompactionError> {
|
||||
stats.read_lock_held_spawn_blocking_startup_micros =
|
||||
stats.read_lock_acquisition_micros.till_now(); // set by caller
|
||||
let mut stats = CompactLevel0Phase1StatsBuilder {
|
||||
version: Some(1),
|
||||
tenant_id: Some(self.tenant_id),
|
||||
timeline_id: Some(self.timeline_id),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let begin = tokio::time::Instant::now();
|
||||
let layers = self.layers.read().await;
|
||||
let now = tokio::time::Instant::now();
|
||||
stats.first_read_lock_acquisition_micros =
|
||||
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
drop(layers);
|
||||
stats.level0_deltas_count = Some(level0_deltas.len());
|
||||
stats.get_level0_deltas_plus_drop_lock_micros =
|
||||
stats.first_read_lock_acquisition_micros.till_now();
|
||||
|
||||
// Only compact if enough layers have accumulated.
|
||||
let threshold = self.get_compaction_threshold();
|
||||
if level0_deltas.is_empty() || level0_deltas.len() < threshold {
|
||||
@@ -3550,53 +3565,6 @@ impl Timeline {
|
||||
// we don't accidentally use it later in the function.
|
||||
drop(level0_deltas);
|
||||
|
||||
stats.read_lock_held_prerequisites_micros = stats
|
||||
.read_lock_held_spawn_blocking_startup_micros
|
||||
.till_now();
|
||||
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
|
||||
let mut prev: Option<Key> = None;
|
||||
for (next_key, _next_lsn, _size) in itertools::process_results(
|
||||
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|
||||
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
|
||||
)? {
|
||||
if let Some(prev_key) = prev {
|
||||
// just first fast filter
|
||||
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
|
||||
let key_range = prev_key..next_key;
|
||||
// Measuring hole by just subtraction of i128 representation of key range boundaries
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
coverage_size,
|
||||
});
|
||||
if heap.len() > max_holes {
|
||||
heap.pop(); // remove smallest hole
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
prev = Some(next_key.next());
|
||||
}
|
||||
stats.read_lock_held_compute_holes_micros =
|
||||
stats.read_lock_held_prerequisites_micros.till_now();
|
||||
drop(layers);
|
||||
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
|
||||
let mut holes = heap.into_vec();
|
||||
holes.sort_unstable_by_key(|hole| hole.key_range.start);
|
||||
let mut next_hole = 0; // index of next hole in holes vector
|
||||
|
||||
// This iterator walks through all key-value pairs from all the layers
|
||||
// we're compacting, in key, LSN order.
|
||||
let all_values_iter = itertools::process_results(
|
||||
@@ -3636,7 +3604,50 @@ impl Timeline {
|
||||
},
|
||||
)?;
|
||||
|
||||
stats.prepare_iterators_micros = stats.read_lock_drop_micros.till_now();
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
stats.time_spent_between_locks = stats.get_level0_deltas_plus_drop_lock_micros.till_now();
|
||||
let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here?
|
||||
stats.second_read_lock_acquisition_micros = stats.time_spent_between_locks.till_now();
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
|
||||
let mut prev: Option<Key> = None;
|
||||
for (next_key, _next_lsn, _size) in itertools::process_results(
|
||||
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|
||||
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
|
||||
)? {
|
||||
if let Some(prev_key) = prev {
|
||||
// just first fast filter
|
||||
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
|
||||
let key_range = prev_key..next_key;
|
||||
// Measuring hole by just subtraction of i128 representation of key range boundaries
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
coverage_size,
|
||||
});
|
||||
if heap.len() > max_holes {
|
||||
heap.pop(); // remove smallest hole
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
prev = Some(next_key.next());
|
||||
}
|
||||
drop(layers);
|
||||
stats.second_read_lock_held_micros = stats.second_read_lock_acquisition_micros.till_now();
|
||||
let mut holes = heap.into_vec();
|
||||
holes.sort_unstable_by_key(|hole| hole.key_range.start);
|
||||
let mut next_hole = 0; // index of next hole in holes vector
|
||||
stats.sort_holes_micros = stats.second_read_lock_held_micros.till_now();
|
||||
|
||||
// Merge the contents of all the input delta layers into a new set
|
||||
// of delta layers, based on the current partitioning.
|
||||
@@ -3796,7 +3807,7 @@ impl Timeline {
|
||||
layer_paths.pop().unwrap();
|
||||
}
|
||||
|
||||
stats.write_layer_files_micros = stats.prepare_iterators_micros.till_now();
|
||||
stats.write_layer_files_micros = stats.sort_holes_micros.till_now();
|
||||
stats.new_deltas_count = Some(new_layers.len());
|
||||
stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum());
|
||||
|
||||
@@ -3835,36 +3846,9 @@ impl Timeline {
|
||||
let CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact,
|
||||
} = {
|
||||
let phase1_span = info_span!("compact_level0_phase1");
|
||||
let myself = Arc::clone(self);
|
||||
let ctx = ctx.attached_child(); // technically, the spawn_blocking can outlive this future
|
||||
let mut stats = CompactLevel0Phase1StatsBuilder {
|
||||
version: Some(2),
|
||||
tenant_id: Some(self.tenant_id),
|
||||
timeline_id: Some(self.timeline_id),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let begin = tokio::time::Instant::now();
|
||||
let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
|
||||
let now = tokio::time::Instant::now();
|
||||
stats.read_lock_acquisition_micros =
|
||||
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
|
||||
let layer_removal_cs = layer_removal_cs.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _entered = phase1_span.enter();
|
||||
myself.compact_level0_phase1(
|
||||
layer_removal_cs,
|
||||
phase1_layers_locked,
|
||||
stats,
|
||||
target_file_size,
|
||||
&ctx,
|
||||
)
|
||||
})
|
||||
.await
|
||||
.context("spawn_blocking")??
|
||||
};
|
||||
} = self
|
||||
.compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx)
|
||||
.await?;
|
||||
|
||||
if new_layers.is_empty() && deltas_to_compact.is_empty() {
|
||||
// nothing to do
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
MODULE_big = neon
|
||||
OBJS = \
|
||||
$(WIN32RES) \
|
||||
extension_server.o \
|
||||
file_cache.o \
|
||||
libpagestore.o \
|
||||
libpqwalproposer.o \
|
||||
|
||||
91
pgxn/neon/extension_server.c
Normal file
91
pgxn/neon/extension_server.c
Normal file
@@ -0,0 +1,91 @@
|
||||
|
||||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* extension_server.c
|
||||
* Request compute_ctl to download extension files.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* contrib/neon/extension_server.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
#include "tcop/pquery.h"
|
||||
#include "tcop/utility.h"
|
||||
#include "access/xact.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "miscadmin.h"
|
||||
#include "utils/acl.h"
|
||||
#include "fmgr.h"
|
||||
#include "utils/guc.h"
|
||||
#include "port.h"
|
||||
#include "fmgr.h"
|
||||
|
||||
#include <curl/curl.h>
|
||||
|
||||
static int extension_server_port = 0;
|
||||
|
||||
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
|
||||
|
||||
// curl -X POST http://localhost:8080/extension_server/postgis-3.so
|
||||
static bool
|
||||
neon_download_extension_file_http(const char *filename)
|
||||
{
|
||||
CURL *curl;
|
||||
CURLcode res;
|
||||
char *compute_ctl_url;
|
||||
char *postdata;
|
||||
bool ret = false;
|
||||
|
||||
if ((curl = curl_easy_init()) == NULL)
|
||||
{
|
||||
elog(ERROR, "Failed to initialize curl handle");
|
||||
}
|
||||
|
||||
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s", extension_server_port, filename);
|
||||
|
||||
elog(LOG, "curl_easy_perform() url: %s", compute_ctl_url);
|
||||
|
||||
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "POST");
|
||||
curl_easy_setopt(curl, CURLOPT_URL, compute_ctl_url);
|
||||
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3L /* seconds */);
|
||||
|
||||
if (curl)
|
||||
{
|
||||
/* Perform the request, res will get the return code */
|
||||
res = curl_easy_perform(curl);
|
||||
/* Check for errors */
|
||||
if (res == CURLE_OK)
|
||||
{
|
||||
elog(LOG, "curl_easy_perform() succeeded");
|
||||
ret = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(WARNING, "curl_easy_perform() failed: %s\n", curl_easy_strerror(res));
|
||||
}
|
||||
|
||||
/* always cleanup */
|
||||
curl_easy_cleanup(curl);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void pg_init_extension_server()
|
||||
{
|
||||
DefineCustomIntVariable("neon.extension_server_port",
|
||||
"connection string to the compute_ctl",
|
||||
NULL,
|
||||
&extension_server_port,
|
||||
0, 0, INT_MAX,
|
||||
PGC_POSTMASTER,
|
||||
0, /* no flags required */
|
||||
NULL, NULL, NULL);
|
||||
|
||||
// set download_extension_file_hook
|
||||
prev_download_extension_file_hook = download_extension_file_hook;
|
||||
download_extension_file_hook = neon_download_extension_file_http;
|
||||
}
|
||||
1
pgxn/neon/extension_server.h
Normal file
1
pgxn/neon/extension_server.h
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
@@ -35,8 +35,11 @@ _PG_init(void)
|
||||
{
|
||||
pg_init_libpagestore();
|
||||
pg_init_walproposer();
|
||||
|
||||
InitControlPlaneConnector();
|
||||
|
||||
pg_init_extension_server();
|
||||
|
||||
// Important: This must happen after other parts of the extension
|
||||
// are loaded, otherwise any settings to GUCs that were set before
|
||||
// the extension was loaded will be removed.
|
||||
|
||||
@@ -21,6 +21,8 @@ extern char *neon_tenant;
|
||||
extern void pg_init_libpagestore(void);
|
||||
extern void pg_init_walproposer(void);
|
||||
|
||||
extern void pg_init_extension_server(void);
|
||||
|
||||
/*
|
||||
* Returns true if we shouldn't do REDO on that block in record indicated by
|
||||
* block_id; false otherwise.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[toolchain]
|
||||
channel = "1.70.0"
|
||||
channel = "1.68.2"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
@@ -62,7 +62,6 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_getpage_reconstruct_seconds_bucket",
|
||||
"pageserver_getpage_reconstruct_seconds_count",
|
||||
"pageserver_getpage_reconstruct_seconds_sum",
|
||||
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
|
||||
)
|
||||
|
||||
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
|
||||
@@ -600,6 +600,8 @@ class NeonEnvBuilder:
|
||||
self.rust_log_override = rust_log_override
|
||||
self.port_distributor = port_distributor
|
||||
self.remote_storage = remote_storage
|
||||
self.ext_remote_storage: Optional[Any] = None
|
||||
self.remote_storage_client: Optional[Any] = None
|
||||
self.remote_storage_users = remote_storage_users
|
||||
self.broker = broker
|
||||
self.run_id = run_id
|
||||
@@ -651,13 +653,18 @@ class NeonEnvBuilder:
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
test_name: str,
|
||||
force_enable: bool = True,
|
||||
enable_remote_extensions: bool = False,
|
||||
):
|
||||
if remote_storage_kind == RemoteStorageKind.NOOP:
|
||||
return
|
||||
elif remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
||||
self.enable_local_fs_remote_storage(force_enable=force_enable)
|
||||
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
|
||||
self.enable_mock_s3_remote_storage(
|
||||
bucket_name=test_name,
|
||||
force_enable=force_enable,
|
||||
enable_remote_extensions=enable_remote_extensions,
|
||||
)
|
||||
elif remote_storage_kind == RemoteStorageKind.REAL_S3:
|
||||
self.enable_real_s3_remote_storage(test_name=test_name, force_enable=force_enable)
|
||||
else:
|
||||
@@ -673,11 +680,15 @@ class NeonEnvBuilder:
|
||||
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
|
||||
self.remote_storage = LocalFsStorage(Path(self.repo_dir / "local_fs_remote_storage"))
|
||||
|
||||
def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable: bool = True):
|
||||
def enable_mock_s3_remote_storage(
|
||||
self, bucket_name: str, force_enable: bool = True, enable_remote_extensions: bool = False
|
||||
):
|
||||
"""
|
||||
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
|
||||
Starts up the mock server, if that does not run yet.
|
||||
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
|
||||
|
||||
Also creates the bucket for extensions, self.ext_remote_storage bucket
|
||||
"""
|
||||
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
|
||||
mock_endpoint = self.mock_s3_server.endpoint()
|
||||
@@ -700,6 +711,17 @@ class NeonEnvBuilder:
|
||||
secret_key=self.mock_s3_server.secret_key(),
|
||||
)
|
||||
|
||||
if enable_remote_extensions:
|
||||
ext_bucket_name = f"ext_{bucket_name}"
|
||||
self.remote_storage_client.create_bucket(Bucket=ext_bucket_name)
|
||||
self.ext_remote_storage = S3Storage(
|
||||
bucket_name=ext_bucket_name,
|
||||
endpoint=mock_endpoint,
|
||||
bucket_region=mock_region,
|
||||
access_key=self.mock_s3_server.access_key(),
|
||||
secret_key=self.mock_s3_server.secret_key(),
|
||||
)
|
||||
|
||||
def enable_real_s3_remote_storage(self, test_name: str, force_enable: bool = True):
|
||||
"""
|
||||
Sets up configuration to use real s3 endpoint without mock server
|
||||
@@ -740,6 +762,17 @@ class NeonEnvBuilder:
|
||||
prefix_in_bucket=self.remote_storage_prefix,
|
||||
)
|
||||
|
||||
ext_bucket_name = os.getenv("EXT_REMOTE_STORAGE_S3_BUCKET")
|
||||
if ext_bucket_name is not None:
|
||||
ext_bucket_name = f"ext_{ext_bucket_name}"
|
||||
self.ext_remote_storage = S3Storage(
|
||||
bucket_name=ext_bucket_name,
|
||||
bucket_region=region,
|
||||
access_key=access_key,
|
||||
secret_key=secret_key,
|
||||
prefix_in_bucket=self.remote_storage_prefix,
|
||||
)
|
||||
|
||||
def cleanup_local_storage(self):
|
||||
if self.preserve_database_files:
|
||||
return
|
||||
@@ -773,6 +806,7 @@ class NeonEnvBuilder:
|
||||
# `self.remote_storage_prefix` is coupled with `S3Storage` storage type,
|
||||
# so this line effectively a no-op
|
||||
assert isinstance(self.remote_storage, S3Storage)
|
||||
assert self.remote_storage_client is not None
|
||||
|
||||
if self.keep_remote_storage_contents:
|
||||
log.info("keep_remote_storage_contents skipping remote storage cleanup")
|
||||
@@ -902,6 +936,8 @@ class NeonEnv:
|
||||
self.neon_binpath = config.neon_binpath
|
||||
self.pg_distrib_dir = config.pg_distrib_dir
|
||||
self.endpoint_counter = 0
|
||||
self.remote_storage_client = config.remote_storage_client
|
||||
self.ext_remote_storage = config.ext_remote_storage
|
||||
|
||||
# generate initial tenant ID here instead of letting 'neon init' generate it,
|
||||
# so that we don't need to dig it out of the config file afterwards.
|
||||
@@ -1488,6 +1524,7 @@ class NeonCli(AbstractNeonCli):
|
||||
safekeepers: Optional[List[int]] = None,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"endpoint",
|
||||
@@ -1497,6 +1534,8 @@ class NeonCli(AbstractNeonCli):
|
||||
"--pg-version",
|
||||
self.env.pg_version,
|
||||
]
|
||||
if remote_ext_config is not None:
|
||||
args.extend(["--remote-ext-config", remote_ext_config])
|
||||
if lsn is not None:
|
||||
args.append(f"--lsn={lsn}")
|
||||
args.extend(["--pg-port", str(pg_port)])
|
||||
@@ -2358,7 +2397,7 @@ class Endpoint(PgProtocol):
|
||||
|
||||
return self
|
||||
|
||||
def start(self) -> "Endpoint":
|
||||
def start(self, remote_ext_config: Optional[str] = None) -> "Endpoint":
|
||||
"""
|
||||
Start the Postgres instance.
|
||||
Returns self.
|
||||
@@ -2374,6 +2413,7 @@ class Endpoint(PgProtocol):
|
||||
http_port=self.http_port,
|
||||
tenant_id=self.tenant_id,
|
||||
safekeepers=self.active_safekeepers,
|
||||
remote_ext_config=remote_ext_config,
|
||||
)
|
||||
self.running = True
|
||||
|
||||
@@ -2463,6 +2503,7 @@ class Endpoint(PgProtocol):
|
||||
hot_standby: bool = False,
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
) -> "Endpoint":
|
||||
"""
|
||||
Create an endpoint, apply config, and start Postgres.
|
||||
@@ -2477,7 +2518,7 @@ class Endpoint(PgProtocol):
|
||||
config_lines=config_lines,
|
||||
hot_standby=hot_standby,
|
||||
lsn=lsn,
|
||||
).start()
|
||||
).start(remote_ext_config=remote_ext_config)
|
||||
|
||||
log.info(f"Postgres startup took {time.time() - started_at} seconds")
|
||||
|
||||
@@ -2511,6 +2552,7 @@ class EndpointFactory:
|
||||
lsn: Optional[Lsn] = None,
|
||||
hot_standby: bool = False,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
remote_ext_config: Optional[str] = None,
|
||||
) -> Endpoint:
|
||||
ep = Endpoint(
|
||||
self.env,
|
||||
@@ -2527,6 +2569,7 @@ class EndpointFactory:
|
||||
hot_standby=hot_standby,
|
||||
config_lines=config_lines,
|
||||
lsn=lsn,
|
||||
remote_ext_config=remote_ext_config,
|
||||
)
|
||||
|
||||
def create(
|
||||
|
||||
@@ -30,18 +30,7 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_startup")
|
||||
|
||||
def get_synced_lsn():
|
||||
"""Assert safekeepers are synced and get the LSN."""
|
||||
commit_lsns = [
|
||||
sk.http_client().timeline_status(tenant_id, timeline_id).commit_lsn
|
||||
for sk in env.safekeepers
|
||||
]
|
||||
assert len(commit_lsns) == 3
|
||||
assert len(set(commit_lsns)) == 1
|
||||
return commit_lsns[0]
|
||||
env.neon_cli.create_branch("test_startup")
|
||||
|
||||
endpoint = None
|
||||
|
||||
@@ -74,8 +63,7 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
|
||||
endpoint.stop()
|
||||
|
||||
# Imitate optimizations that console would do for the second start
|
||||
lsn = get_synced_lsn()
|
||||
endpoint.respec(skip_pg_catalog_updates=True, skip_sync_safekeepers=lsn.lsn_int)
|
||||
endpoint.respec(skip_pg_catalog_updates=True)
|
||||
|
||||
|
||||
# This test sometimes runs for longer than the global 5 minute timeout.
|
||||
|
||||
@@ -2,7 +2,6 @@ import copy
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
@@ -449,7 +448,7 @@ def dump_differs(first: Path, second: Path, output: Path) -> bool:
|
||||
"""
|
||||
|
||||
with output.open("w") as stdout:
|
||||
res = subprocess.run(
|
||||
rv = subprocess.run(
|
||||
[
|
||||
"diff",
|
||||
"--unified", # Make diff output more readable
|
||||
@@ -461,53 +460,4 @@ def dump_differs(first: Path, second: Path, output: Path) -> bool:
|
||||
stdout=stdout,
|
||||
)
|
||||
|
||||
differs = res.returncode != 0
|
||||
|
||||
# TODO: Remove after https://github.com/neondatabase/neon/pull/4425 is merged, and a couple of releases are made
|
||||
if differs:
|
||||
with tempfile.NamedTemporaryFile(mode="w") as tmp:
|
||||
tmp.write(PR4425_ALLOWED_DIFF)
|
||||
tmp.flush()
|
||||
|
||||
allowed = subprocess.run(
|
||||
[
|
||||
"diff",
|
||||
"--unified", # Make diff output more readable
|
||||
r"--ignore-matching-lines=^---", # Ignore diff headers
|
||||
r"--ignore-matching-lines=^\+\+\+", # Ignore diff headers
|
||||
"--ignore-matching-lines=^@@", # Ignore diff blocks location
|
||||
"--ignore-matching-lines=^ *$", # Ignore lines with only spaces
|
||||
"--ignore-matching-lines=^ --.*", # Ignore the " --" lines for compatibility with PG14
|
||||
"--ignore-blank-lines",
|
||||
str(output),
|
||||
str(tmp.name),
|
||||
],
|
||||
)
|
||||
|
||||
differs = allowed.returncode != 0
|
||||
|
||||
return differs
|
||||
|
||||
|
||||
PR4425_ALLOWED_DIFF = """
|
||||
--- /tmp/test_output/test_backward_compatibility[release-pg15]/compatibility_snapshot/dump.sql 2023-06-08 18:12:45.000000000 +0000
|
||||
+++ /tmp/test_output/test_backward_compatibility[release-pg15]/dump.sql 2023-06-13 07:25:35.211733653 +0000
|
||||
@@ -13,12 +13,20 @@
|
||||
|
||||
CREATE ROLE cloud_admin;
|
||||
ALTER ROLE cloud_admin WITH SUPERUSER INHERIT CREATEROLE CREATEDB LOGIN REPLICATION BYPASSRLS;
|
||||
+CREATE ROLE neon_superuser;
|
||||
+ALTER ROLE neon_superuser WITH NOSUPERUSER INHERIT CREATEROLE CREATEDB NOLOGIN NOREPLICATION NOBYPASSRLS;
|
||||
|
||||
--
|
||||
-- User Configurations
|
||||
--
|
||||
|
||||
|
||||
+--
|
||||
+-- Role memberships
|
||||
+--
|
||||
+
|
||||
+GRANT pg_read_all_data TO neon_superuser GRANTED BY cloud_admin;
|
||||
+GRANT pg_write_all_data TO neon_superuser GRANTED BY cloud_admin;
|
||||
"""
|
||||
return rv.returncode != 0
|
||||
|
||||
90
test_runner/regress/test_download_extensions.py
Normal file
90
test_runner/regress/test_download_extensions.py
Normal file
@@ -0,0 +1,90 @@
|
||||
import json
|
||||
import os
|
||||
from contextlib import closing
|
||||
from io import BytesIO
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
RemoteStorageKind,
|
||||
)
|
||||
|
||||
|
||||
def test_file_download(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Tests we can download a file
|
||||
First we set up the mock s3 bucket by uploading test_ext.control to the bucket
|
||||
Then, we download test_ext.control from the bucket to pg_install/v15/share/postgresql/extension/
|
||||
Finally, we list available extensions and assert that test_ext is present
|
||||
"""
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
test_name="test_file_download",
|
||||
enable_remote_extensions=True,
|
||||
)
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
assert env.ext_remote_storage is not None
|
||||
assert env.remote_storage_client is not None
|
||||
|
||||
TEST_EXT_PATH = "v14/share/postgresql/extension/test_ext.control"
|
||||
BUCKET_PREFIX = "5314225671" # this is the build number
|
||||
|
||||
# 4. Upload test_ext.control file to the bucket
|
||||
# In the non-mock version this is done by CI/CD
|
||||
|
||||
test_ext_file = BytesIO(
|
||||
b"""# mock extension
|
||||
comment = 'This is a mock extension'
|
||||
default_version = '1.0'
|
||||
module_pathname = '$libdir/test_ext'
|
||||
relocatable = true
|
||||
"""
|
||||
)
|
||||
env.remote_storage_client.upload_fileobj(
|
||||
test_ext_file,
|
||||
env.ext_remote_storage.bucket_name,
|
||||
os.path.join(BUCKET_PREFIX, TEST_EXT_PATH),
|
||||
)
|
||||
|
||||
# 5. Download file from the bucket to correct local location
|
||||
# Later this will be replaced by our rust code
|
||||
# resp = env.remote_storage_client.get_object(
|
||||
# Bucket=env.ext_remote_storage.bucket_name, Key=os.path.join(BUCKET_PREFIX, TEST_EXT_PATH)
|
||||
# )
|
||||
# response = resp["Body"]
|
||||
# fname = f"pg_install/{TEST_EXT_PATH}"
|
||||
# with open(fname, "wb") as f:
|
||||
# f.write(response.read())
|
||||
|
||||
tenant, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_file_download", tenant_id=tenant)
|
||||
|
||||
remote_ext_config = json.dumps(
|
||||
{
|
||||
"bucket": env.ext_remote_storage.bucket_name,
|
||||
"region": "us-east-1",
|
||||
"endpoint": env.ext_remote_storage.endpoint,
|
||||
"prefix": BUCKET_PREFIX,
|
||||
}
|
||||
)
|
||||
|
||||
# 6. Start endpoint and ensure that test_ext is present in select * from pg_available_extensions
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_file_download", tenant_id=tenant, remote_ext_config=remote_ext_config
|
||||
)
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# test query: insert some values and select them
|
||||
cur.execute("CREATE TABLE t(key int primary key, value text)")
|
||||
for i in range(100):
|
||||
cur.execute(f"insert into t values({i}, {2*i})")
|
||||
cur.execute("select * from t")
|
||||
log.info(cur.fetchall())
|
||||
|
||||
# the real test query: check that test_ext is present
|
||||
cur.execute("SELECT * FROM pg_available_extensions")
|
||||
all_extensions = [x[0] for x in cur.fetchall()]
|
||||
log.info(all_extensions)
|
||||
assert "test_ext" in all_extensions
|
||||
@@ -1,5 +1,3 @@
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
@@ -12,10 +10,9 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
) as primary:
|
||||
time.sleep(1)
|
||||
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
|
||||
primary_lsn = None
|
||||
caught_up = False
|
||||
cought_up = False
|
||||
queries = [
|
||||
"SHOW neon.timeline_id",
|
||||
"SHOW neon.tenant_id",
|
||||
@@ -59,7 +56,7 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
res = s_cur.fetchone()
|
||||
assert res is not None
|
||||
|
||||
while not caught_up:
|
||||
while not cought_up:
|
||||
with s_con.cursor() as secondary_cursor:
|
||||
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
|
||||
res = secondary_cursor.fetchone()
|
||||
@@ -69,7 +66,7 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
# due to e.g. autovacuum, but that shouldn't impact the content
|
||||
# of the tables, so we check whether we've replayed up to at
|
||||
# least after the commit of the `test` table.
|
||||
caught_up = secondary_lsn >= primary_lsn
|
||||
cought_up = secondary_lsn >= primary_lsn
|
||||
|
||||
# Explicit commit to flush any transient transaction-level state.
|
||||
s_con.commit()
|
||||
|
||||
@@ -16,7 +16,6 @@ from fixtures.pg_version import PgVersion, xfail_on_postgres
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
|
||||
@pytest.mark.xfail
|
||||
def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
env = neon_simple_env
|
||||
(tenant_id, _) = env.neon_cli.create_tenant()
|
||||
@@ -45,16 +44,12 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
# we've disabled the autovacuum and checkpoint
|
||||
# so background processes should not change the size.
|
||||
# If this test will flake we should probably loosen the check
|
||||
assert (
|
||||
size == initial_size
|
||||
), f"starting idle compute should not change the tenant size (Currently {size}, expected {initial_size})"
|
||||
assert size == initial_size, "starting idle compute should not change the tenant size"
|
||||
|
||||
# the size should be the same, until we increase the size over the
|
||||
# gc_horizon
|
||||
size, inputs = http_client.tenant_size_and_modelinputs(tenant_id)
|
||||
assert (
|
||||
size == initial_size
|
||||
), f"tenant_size should not be affected by shutdown of compute (Currently {size}, expected {initial_size})"
|
||||
assert size == initial_size, "tenant_size should not be affected by shutdown of compute"
|
||||
|
||||
expected_inputs = {
|
||||
"segments": [
|
||||
@@ -323,7 +318,6 @@ def test_only_heads_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Pa
|
||||
size_debug_file.write(size_debug)
|
||||
|
||||
|
||||
@pytest.mark.xfail
|
||||
def test_single_branch_get_tenant_size_grows(
|
||||
neon_env_builder: NeonEnvBuilder, test_output_dir: Path, pg_version: PgVersion
|
||||
):
|
||||
@@ -339,13 +333,13 @@ def test_single_branch_get_tenant_size_grows(
|
||||
# inserts is larger than gc_horizon. for example 0x20000 here hid the fact
|
||||
# that there next_gc_cutoff could be smaller than initdb_lsn, which will
|
||||
# obviously lead to issues when calculating the size.
|
||||
gc_horizon = 0x3BA00
|
||||
gc_horizon = 0x38000
|
||||
|
||||
# it's a bit of a hack, but different versions of postgres have different
|
||||
# amount of WAL generated for the same amount of data. so we need to
|
||||
# adjust the gc_horizon accordingly.
|
||||
if pg_version == PgVersion.V14:
|
||||
gc_horizon = 0x4A000
|
||||
gc_horizon = 0x40000
|
||||
|
||||
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
|
||||
|
||||
@@ -366,11 +360,11 @@ def test_single_branch_get_tenant_size_grows(
|
||||
if current_lsn - initdb_lsn >= gc_horizon:
|
||||
assert (
|
||||
size >= prev_size
|
||||
), f"tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size (Currently at: {current_lsn}, Init at: {initdb_lsn})"
|
||||
), "tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size"
|
||||
else:
|
||||
assert (
|
||||
size > prev_size
|
||||
), f"tenant_size should grow, because we continue to add WAL to initial snapshot size (Currently at: {current_lsn}, Init at: {initdb_lsn})"
|
||||
), "tenant_size should grow, because we continue to add WAL to initial snapshot size"
|
||||
|
||||
def get_current_consistent_size(
|
||||
env: NeonEnv,
|
||||
|
||||
@@ -275,6 +275,7 @@ def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str]
|
||||
assert isinstance(neon_env_builder.remote_storage, S3Storage)
|
||||
|
||||
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
|
||||
assert neon_env_builder.remote_storage_client is not None
|
||||
response = neon_env_builder.remote_storage_client.list_objects_v2(
|
||||
Bucket=neon_env_builder.remote_storage.bucket_name,
|
||||
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
|
||||
@@ -463,10 +464,10 @@ def test_concurrent_timeline_delete_stuck_on(
|
||||
|
||||
# make the second call and assert behavior
|
||||
log.info("second call start")
|
||||
error_msg_re = "Timeline deletion is already in progress"
|
||||
error_msg_re = "timeline deletion is already in progress"
|
||||
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
|
||||
assert second_call_err.value.status_code == 409
|
||||
assert second_call_err.value.status_code == 500
|
||||
env.pageserver.allowed_errors.append(f".*{child_timeline_id}.*{error_msg_re}.*")
|
||||
# the second call will try to transition the timeline into Stopping state as well
|
||||
env.pageserver.allowed_errors.append(
|
||||
@@ -518,9 +519,9 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{child_timeline_id}.*Timeline deletion is already in progress.*"
|
||||
f".*{child_timeline_id}.*timeline deletion is already in progress.*"
|
||||
)
|
||||
with pytest.raises(PageserverApiException, match="Timeline deletion is already in progress"):
|
||||
with pytest.raises(PageserverApiException, match="timeline deletion is already in progress"):
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
|
||||
|
||||
# make sure the timeout was due to the failpoint
|
||||
@@ -628,7 +629,7 @@ def test_timeline_delete_works_for_remote_smoke(
|
||||
)
|
||||
|
||||
# for some reason the check above doesnt immediately take effect for the below.
|
||||
# Assume it is mock server incosistency and check twice.
|
||||
# Assume it is mock server inconsistency and check twice.
|
||||
wait_until(
|
||||
2,
|
||||
0.5,
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: a2daebc6b4...5adfb36043
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 2df2ce3744...ff7b85cd8a
Reference in New Issue
Block a user