diff --git a/Cargo.lock b/Cargo.lock index a75209b8db..d605169986 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1126,6 +1126,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-compression", + "bytes", "cfg-if", "chrono", "clap", diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 6c93befaa3..47378f1910 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -38,3 +38,4 @@ toml_edit.workspace = true remote_storage = { version = "0.1", path = "../libs/remote_storage/" } vm_monitor = { version = "0.1", path = "../libs/vm_monitor/" } zstd = "0.12.4" +bytes = "1.0" diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 7f22bda13e..36e9ca0731 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -31,7 +31,7 @@ //! -C 'postgresql://cloud_admin@localhost/postgres' \ //! -S /var/db/postgres/specs/current.json \ //! -b /usr/local/bin/postgres \ -//! -r {"bucket": "neon-dev-extensions-eu-central-1", "region": "eu-central-1"} +//! -r http://pg-ext-s3-gateway //! ``` //! use std::collections::HashMap; @@ -51,7 +51,7 @@ use compute_api::responses::ComputeStatus; use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec}; use compute_tools::configurator::launch_configurator; -use compute_tools::extension_server::{get_pg_version, init_remote_storage}; +use compute_tools::extension_server::get_pg_version; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; @@ -60,7 +60,7 @@ use compute_tools::spec::*; // this is an arbitrary build tag. Fine as a default / for testing purposes // in-case of not-set environment var -const BUILD_TAG_DEFAULT: &str = "5670669815"; +const BUILD_TAG_DEFAULT: &str = "latest"; fn main() -> Result<()> { init_tracing_and_logging(DEFAULT_LOG_LEVEL)?; @@ -74,10 +74,18 @@ fn main() -> Result<()> { let pgbin_default = String::from("postgres"); let pgbin = matches.get_one::("pgbin").unwrap_or(&pgbin_default); - let remote_ext_config = matches.get_one::("remote-ext-config"); - let ext_remote_storage = remote_ext_config.map(|x| { - init_remote_storage(x).expect("cannot initialize remote extension storage from config") - }); + let ext_remote_storage = matches + .get_one::("remote-ext-config") + // Compatibility hack: if the control plane specified any remote-ext-config + // use the default value for extension storage proxy gateway. + // Remove this once the control plane is updated to pass the gateway URL + .map(|conf| { + if conf.starts_with("http") { + conf.trim_end_matches('/') + } else { + "http://pg-ext-s3-gateway" + } + }); let http_port = *matches .get_one::("http-port") @@ -198,7 +206,7 @@ fn main() -> Result<()> { live_config_allowed, state: Mutex::new(new_state), state_changed: Condvar::new(), - ext_remote_storage, + ext_remote_storage: ext_remote_storage.map(|s| s.to_string()), ext_download_progress: RwLock::new(HashMap::new()), build_tag, }; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 5ace8ca1d2..0dfacb615c 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -25,7 +25,7 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus}; use compute_api::spec::{ComputeMode, ComputeSpec}; use utils::measured_stream::MeasuredReader; -use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath}; +use remote_storage::{DownloadError, RemotePath}; use crate::checker::create_availability_check_data; use crate::pg_helpers::*; @@ -59,8 +59,8 @@ pub struct ComputeNode { pub state: Mutex, /// `Condvar` to allow notifying waiters about state changes. pub state_changed: Condvar, - /// the S3 bucket that we search for extensions in - pub ext_remote_storage: Option, + /// the address of extension storage proxy gateway + pub ext_remote_storage: Option, // key: ext_archive_name, value: started download time, download_completed? pub ext_download_progress: RwLock, bool)>>, pub build_tag: String, @@ -957,12 +957,12 @@ LIMIT 100", real_ext_name: String, ext_path: RemotePath, ) -> Result { - let remote_storage = self - .ext_remote_storage - .as_ref() - .ok_or(DownloadError::BadInput(anyhow::anyhow!( - "Remote extensions storage is not configured", - )))?; + let ext_remote_storage = + self.ext_remote_storage + .as_ref() + .ok_or(DownloadError::BadInput(anyhow::anyhow!( + "Remote extensions storage is not configured", + )))?; let ext_archive_name = ext_path.object_name().expect("bad path"); @@ -1018,7 +1018,7 @@ LIMIT 100", let download_size = extension_server::download_extension( &real_ext_name, &ext_path, - remote_storage, + ext_remote_storage, &self.pgbin, ) .await diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index 9732d8adea..2cec12119f 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -71,18 +71,16 @@ More specifically, here is an example ext_index.json } } */ -use anyhow::Context; use anyhow::{self, Result}; +use anyhow::{bail, Context}; +use bytes::Bytes; use compute_api::spec::RemoteExtSpec; use regex::Regex; use remote_storage::*; -use serde_json; -use std::io::Read; -use std::num::NonZeroUsize; +use reqwest::StatusCode; use std::path::Path; use std::str; use tar::Archive; -use tokio::io::AsyncReadExt; use tracing::info; use tracing::log::warn; use zstd::stream::read::Decoder; @@ -138,23 +136,31 @@ fn parse_pg_version(human_version: &str) -> &str { pub async fn download_extension( ext_name: &str, ext_path: &RemotePath, - remote_storage: &GenericRemoteStorage, + ext_remote_storage: &str, pgbin: &str, ) -> Result { info!("Download extension {:?} from {:?}", ext_name, ext_path); - let mut download = remote_storage.download(ext_path).await?; - let mut download_buffer = Vec::new(); - download - .download_stream - .read_to_end(&mut download_buffer) - .await?; + + // TODO add retry logic + let download_buffer = + match download_extension_tar(ext_remote_storage, &ext_path.to_string()).await { + Ok(buffer) => buffer, + Err(error_message) => { + return Err(anyhow::anyhow!( + "error downloading extension {:?}: {:?}", + ext_name, + error_message + )); + } + }; + let download_size = download_buffer.len() as u64; + info!("Download size {:?}", download_size); // it's unclear whether it is more performant to decompress into memory or not // TODO: decompressing into memory can be avoided - let mut decoder = Decoder::new(download_buffer.as_slice())?; - let mut decompress_buffer = Vec::new(); - decoder.read_to_end(&mut decompress_buffer)?; - let mut archive = Archive::new(decompress_buffer.as_slice()); + let decoder = Decoder::new(download_buffer.as_ref())?; + let mut archive = Archive::new(decoder); + let unzip_dest = pgbin .strip_suffix("/bin/postgres") .expect("bad pgbin") @@ -222,29 +228,32 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) { } } -// This function initializes the necessary structs to use remote storage -pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result { - #[derive(Debug, serde::Deserialize)] - struct RemoteExtJson { - bucket: String, - region: String, - endpoint: Option, - prefix: Option, - } - let remote_ext_json = serde_json::from_str::(remote_ext_config)?; +// Do request to extension storage proxy, i.e. +// curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst +// using HHTP GET +// and return the response body as bytes +// +async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result { + let uri = format!("{}/{}", ext_remote_storage, ext_path); - let config = S3Config { - bucket_name: remote_ext_json.bucket, - bucket_region: remote_ext_json.region, - prefix_in_bucket: remote_ext_json.prefix, - endpoint: remote_ext_json.endpoint, - concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"), - max_keys_per_list_response: None, - }; - let config = RemoteStorageConfig { - storage: RemoteStorageKind::AwsS3(config), - }; - GenericRemoteStorage::from_config(&config) + info!("Download extension {:?} from uri {:?}", ext_path, uri); + + let resp = reqwest::get(uri).await?; + + match resp.status() { + StatusCode::OK => match resp.bytes().await { + Ok(resp) => { + info!("Download extension {:?} completed successfully", ext_path); + Ok(resp) + } + Err(e) => bail!("could not deserialize remote extension response: {}", e), + }, + StatusCode::SERVICE_UNAVAILABLE => bail!("remote extension is temporarily unavailable"), + _ => bail!( + "unexpected remote extension response status code: {}", + resp.status() + ), + } } #[cfg(test)] diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 8851be1ec1..ef6ca6eee3 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -123,7 +123,7 @@ async fn routes(req: Request, compute: &Arc) -> Response { info!("serving {:?} POST request", route); info!("req.uri {:?}", req.uri()); diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 384c4ee56d..3053122f6a 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -1252,7 +1252,7 @@ fn cli() -> Command { let remote_ext_config_args = Arg::new("remote-ext-config") .long("remote-ext-config") .num_args(1) - .help("Configure the S3 bucket that we search for extensions in.") + .help("Configure the remote extensions storage proxy gateway to request for extensions.") .required(false); let lsn_arg = Arg::new("lsn") diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 4443fd8704..ae45746925 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -45,6 +45,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, bail, Context, Result}; +use compute_api::spec::RemoteExtSpec; use serde::{Deserialize, Serialize}; use utils::id::{NodeId, TenantId, TimelineId}; @@ -476,6 +477,18 @@ impl Endpoint { } } + // check for file remote_extensions_spec.json + // if it is present, read it and pass to compute_ctl + let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json"); + let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path); + let remote_extensions: Option; + + if let Ok(spec_file) = remote_extensions_spec { + remote_extensions = serde_json::from_reader(spec_file).ok(); + } else { + remote_extensions = None; + }; + // Create spec file let spec = ComputeSpec { skip_pg_catalog_updates: self.skip_pg_catalog_updates, @@ -497,7 +510,7 @@ impl Endpoint { pageserver_connstring: Some(pageserver_connstring), safekeeper_connstrings, storage_auth_token: auth_token.clone(), - remote_extensions: None, + remote_extensions, }; let spec_path = self.endpoint_path().join("spec.json"); std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index fc7e834bd2..08e1c5eacb 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -434,8 +434,6 @@ class NeonEnvBuilder: # Pageserver remote storage self.pageserver_remote_storage = pageserver_remote_storage - # Extensions remote storage - self.ext_remote_storage: Optional[S3Storage] = None # Safekeepers remote storage self.sk_remote_storage: Optional[RemoteStorage] = None @@ -534,24 +532,6 @@ class NeonEnvBuilder: ) self.pageserver_remote_storage = ret - def enable_extensions_remote_storage(self, kind: RemoteStorageKind): - assert self.ext_remote_storage is None, "already configured extensions remote storage" - - # there is an assumption that REAL_S3 for extensions is never - # cleaned up these are also special in that they have a hardcoded - # bucket and region, which is most likely the same as our normal - ext = self._configure_and_create_remote_storage( - kind, - RemoteStorageUser.EXTENSIONS, - bucket_name="neon-dev-extensions-eu-central-1", - bucket_region="eu-central-1", - ) - assert isinstance( - ext, S3Storage - ), "unsure why, but only MOCK_S3 and REAL_S3 are currently supported for extensions" - ext.cleanup = False - self.ext_remote_storage = ext - def enable_safekeeper_remote_storage(self, kind: RemoteStorageKind): assert self.sk_remote_storage is None, "sk_remote_storage already configured" @@ -608,8 +588,7 @@ class NeonEnvBuilder: directory_to_clean.rmdir() def cleanup_remote_storage(self): - # extensions are currently not cleaned up, disabled when creating - for x in [self.pageserver_remote_storage, self.ext_remote_storage, self.sk_remote_storage]: + for x in [self.pageserver_remote_storage, self.sk_remote_storage]: if isinstance(x, S3Storage): x.do_cleanup() @@ -713,7 +692,6 @@ class NeonEnv: self.pageservers: List[NeonPageserver] = [] self.broker = config.broker self.pageserver_remote_storage = config.pageserver_remote_storage - self.ext_remote_storage = config.ext_remote_storage self.safekeepers_remote_storage = config.sk_remote_storage self.pg_version = config.pg_version # Binary path for pageserver, safekeeper, etc @@ -1469,12 +1447,7 @@ class NeonCli(AbstractNeonCli): if pageserver_id is not None: args.extend(["--pageserver-id", str(pageserver_id)]) - storage = self.env.ext_remote_storage - s3_env_vars = None - if isinstance(storage, S3Storage): - s3_env_vars = storage.access_env_vars() - - res = self.raw_cli(args, extra_env_vars=s3_env_vars) + res = self.raw_cli(args) res.check_returncode() return res @@ -2582,6 +2555,17 @@ class Endpoint(PgProtocol): with open(config_path, "w") as file: json.dump(dict(data_dict, **kwargs), file, indent=4) + # Mock the extension part of spec passed from control plane for local testing + # endpooint.rs adds content of this file as a part of the spec.json + def create_remote_extension_spec(self, spec: dict[str, Any]): + """Create a remote extension spec file for the endpoint.""" + remote_extensions_spec_path = os.path.join( + self.endpoint_path(), "remote_extensions_spec.json" + ) + + with open(remote_extensions_spec_path, "w") as file: + json.dump(spec, file, indent=4) + def stop(self) -> "Endpoint": """ Stop the Postgres instance if it's running. diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 98f6677c00..9a2980280c 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -411,7 +411,6 @@ def check_neon_works( config.initial_tenant = snapshot_config["default_tenant_id"] config.pg_distrib_dir = pg_distrib_dir config.remote_storage = None - config.ext_remote_storage = None config.sk_remote_storage = None # Use the "target" binaries to launch the storage nodes diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py index 775ad10241..a975947704 100644 --- a/test_runner/regress/test_download_extensions.py +++ b/test_runner/regress/test_download_extensions.py @@ -1,316 +1,165 @@ import os -import shutil -import threading from contextlib import closing from pathlib import Path +from typing import Any, Dict, List import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, ) -from fixtures.pg_version import PgVersion, skip_on_postgres -from fixtures.remote_storage import ( - RemoteStorageKind, - S3Storage, - available_s3_storages, -) +from fixtures.pg_version import PgVersion +from pytest_httpserver import HTTPServer +from werkzeug.wrappers.request import Request +from werkzeug.wrappers.response import Response -# Cleaning up downloaded files is important for local tests -# or else one test could reuse the files from another test or another test run -def cleanup(pg_version): - PGDIR = Path(f"pg_install/v{pg_version}") +# Check that the extension is not already in the share_dir_path_ext +# if it is, skip the test +# +# After the test is done, cleanup the control file and the extension directory +@pytest.fixture(scope="function") +def ext_file_cleanup(pg_bin): + out = pg_bin.run_capture("pg_config --sharedir".split()) + share_dir_path = Path(f"{out}.stdout").read_text().strip() + log.info(f"share_dir_path: {share_dir_path}") + share_dir_path_ext = os.path.join(share_dir_path, "extension") - LIB_DIR = PGDIR / Path("lib/postgresql") - cleanup_lib_globs = ["anon*", "postgis*", "pg_buffercache*"] - cleanup_lib_glob_paths = [LIB_DIR.glob(x) for x in cleanup_lib_globs] + log.info(f"share_dir_path_ext: {share_dir_path_ext}") - SHARE_DIR = PGDIR / Path("share/postgresql/extension") - cleanup_ext_globs = [ - "anon*", - "address_standardizer*", - "postgis*", - "pageinspect*", - "pg_buffercache*", - "pgrouting*", - ] - cleanup_ext_glob_paths = [SHARE_DIR.glob(x) for x in cleanup_ext_globs] + # if file is already in the share_dir_path_ext, skip the test + if os.path.isfile(os.path.join(share_dir_path_ext, "anon.control")): + log.info("anon.control is already in the share_dir_path_ext, skipping the test") + yield False + return + else: + yield True - all_glob_paths = cleanup_lib_glob_paths + cleanup_ext_glob_paths - all_cleanup_files = [] - for file_glob in all_glob_paths: - for file in file_glob: - all_cleanup_files.append(file) + # cleanup the control file + if os.path.isfile(os.path.join(share_dir_path_ext, "anon.control")): + os.unlink(os.path.join(share_dir_path_ext, "anon.control")) + log.info("anon.control was removed from the share_dir_path_ext") - for file in all_cleanup_files: - try: - os.remove(file) - log.info(f"removed file {file}") - except Exception as err: - log.info( - f"skipping remove of file {file} because it doesn't exist.\ - this may be expected or unexpected depending on the test {err}" - ) + # remove the extension directory recursively + if os.path.isdir(os.path.join(share_dir_path_ext, "anon")): + directories_to_clean: List[Path] = [] + for f in Path(os.path.join(share_dir_path_ext, "anon")).iterdir(): + if f.is_file(): + log.info(f"Removing file {f}") + f.unlink() + elif f.is_dir(): + directories_to_clean.append(f) - cleanup_folders = [SHARE_DIR / Path("anon"), PGDIR / Path("download_extensions")] - for folder in cleanup_folders: - try: - shutil.rmtree(folder) - log.info(f"removed folder {folder}") - except Exception as err: - log.info( - f"skipping remove of folder {folder} because it doesn't exist.\ - this may be expected or unexpected depending on the test {err}" - ) + for directory_to_clean in reversed(directories_to_clean): + if not os.listdir(directory_to_clean): + log.info(f"Removing empty directory {directory_to_clean}") + directory_to_clean.rmdir() + + os.rmdir(os.path.join(share_dir_path_ext, "anon")) + log.info("anon directory was removed from the share_dir_path_ext") -def upload_files(env): - log.info("Uploading test files to mock bucket") - os.chdir("test_runner/regress/data/extension_test") - for path in os.walk("."): - prefix, _, files = path - for file in files: - # the [2:] is to remove the leading "./" - full_path = os.path.join(prefix, file)[2:] - - with open(full_path, "rb") as f: - log.info(f"UPLOAD {full_path} to ext/{full_path}") - assert isinstance(env.pageserver_remote_storage, S3Storage) - env.pageserver_remote_storage.client.upload_fileobj( - f, - env.ext_remote_storage.bucket_name, - f"ext/{full_path}", - ) - os.chdir("../../../..") - - -# Test downloading remote extension. -@skip_on_postgres(PgVersion.V16, reason="TODO: PG16 extension building") -@pytest.mark.parametrize("remote_storage_kind", available_s3_storages()) -@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949") def test_remote_extensions( + httpserver: HTTPServer, neon_env_builder: NeonEnvBuilder, - remote_storage_kind: RemoteStorageKind, - pg_version: PgVersion, + httpserver_listen_address, + pg_version, + ext_file_cleanup, ): - neon_env_builder.enable_extensions_remote_storage(remote_storage_kind) + if ext_file_cleanup is False: + log.info("test_remote_extensions skipped") + return + + if pg_version == PgVersion.V16: + pytest.skip("TODO: PG16 extension building") + + # setup mock http server + # that expects request for anon.tar.zst + # and returns the requested file + (host, port) = httpserver_listen_address + extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway" + + archive_path = f"latest/v{pg_version}/extensions/anon.tar.zst" + + def endpoint_handler_build_tag(request: Request) -> Response: + log.info(f"request: {request}") + + file_name = "anon.tar.zst" + file_path = f"test_runner/regress/data/extension_test/5670669815/v{pg_version}/extensions/anon.tar.zst" + file_size = os.path.getsize(file_path) + fh = open(file_path, "rb") + return Response( + fh, + mimetype="application/octet-stream", + headers=[ + ("Content-Length", str(file_size)), + ("Content-Disposition", 'attachment; filename="%s"' % file_name), + ], + direct_passthrough=True, + ) + + httpserver.expect_request( + f"/pg-ext-s3-gateway/{archive_path}", method="GET" + ).respond_with_handler(endpoint_handler_build_tag) + + # Start a compute node with remote_extension spec + # and check that it can download the extensions and use them to CREATE EXTENSION. env = neon_env_builder.init_start() tenant_id, _ = env.neon_cli.create_tenant() env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id) - - assert env.ext_remote_storage is not None # satisfy mypy - - # For MOCK_S3 we upload test files. - # For REAL_S3 we use the files already in the bucket - if remote_storage_kind == RemoteStorageKind.MOCK_S3: - upload_files(env) - - # Start a compute node and check that it can download the extensions - # and use them to CREATE EXTENSION and LOAD - endpoint = env.endpoints.create_start( + endpoint = env.endpoints.create( "test_remote_extensions", tenant_id=tenant_id, - remote_ext_config=env.ext_remote_storage.to_string(), - # config_lines=["log_min_messages=debug3"], + config_lines=["log_min_messages=debug3"], ) + + # mock remote_extensions spec + spec: Dict[str, Any] = { + "library_index": { + "anon": "anon", + }, + "extension_data": { + "anon": { + "archive_path": "", + "control_data": { + "anon.control": "# PostgreSQL Anonymizer (anon) extension\ncomment = 'Data anonymization tools'\ndefault_version = '1.1.0'\ndirectory='extension/anon'\nrelocatable = false\nrequires = 'pgcrypto'\nsuperuser = false\nmodule_pathname = '$libdir/anon'\ntrusted = true\n" + }, + }, + }, + } + spec["extension_data"]["anon"]["archive_path"] = archive_path + + endpoint.create_remote_extension_spec(spec) + + endpoint.start( + remote_ext_config=extensions_endpoint, + ) + + # this is expected to fail if there's no pgcrypto extension, that's ok + # we just want to check that the extension was downloaded try: with closing(endpoint.connect()) as conn: with conn.cursor() as cur: - # Check that appropriate control files were downloaded - cur.execute("SELECT * FROM pg_available_extensions") - all_extensions = [x[0] for x in cur.fetchall()] - log.info(all_extensions) - assert "anon" in all_extensions + # Check that appropriate files were downloaded + cur.execute("CREATE EXTENSION anon") + res = [x[0] for x in cur.fetchall()] + log.info(res) + except Exception as err: + assert "pgcrypto" in str(err), f"unexpected error creating anon extension {err}" - # postgis is on real s3 but not mock s3. - # it's kind of a big file, would rather not upload to github - if remote_storage_kind == RemoteStorageKind.REAL_S3: - assert "postgis" in all_extensions - # this may fail locally if dependency is missing - # we don't really care about the error, - # we just want to make sure it downloaded - try: - cur.execute("CREATE EXTENSION postgis") - except Exception as err: - log.info(f"(expected) error creating postgis extension: {err}") - # we do not check the error, so this is basically a NO-OP - # however checking the log you can make sure that it worked - # and also get valuable information about how long loading the extension took - - # this is expected to fail on my computer because I don't have the pgcrypto extension - try: - cur.execute("CREATE EXTENSION anon") - except Exception as err: - log.info("error creating anon extension") - assert "pgcrypto" in str(err), "unexpected error creating anon extension" - finally: - cleanup(pg_version) + httpserver.check() -# Test downloading remote library. -@skip_on_postgres(PgVersion.V16, reason="TODO: PG16 extension building") -@pytest.mark.parametrize("remote_storage_kind", available_s3_storages()) -@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949") -def test_remote_library( - neon_env_builder: NeonEnvBuilder, - remote_storage_kind: RemoteStorageKind, - pg_version: PgVersion, -): - neon_env_builder.enable_extensions_remote_storage(remote_storage_kind) - env = neon_env_builder.init_start() - tenant_id, _ = env.neon_cli.create_tenant() - env.neon_cli.create_timeline("test_remote_library", tenant_id=tenant_id) - - assert env.ext_remote_storage is not None # satisfy mypy - - # For MOCK_S3 we upload test files. - # For REAL_S3 we use the files already in the bucket - if remote_storage_kind == RemoteStorageKind.MOCK_S3: - upload_files(env) - - # and use them to run LOAD library - endpoint = env.endpoints.create_start( - "test_remote_library", - tenant_id=tenant_id, - remote_ext_config=env.ext_remote_storage.to_string(), - # config_lines=["log_min_messages=debug3"], - ) - try: - with closing(endpoint.connect()) as conn: - with conn.cursor() as cur: - # try to load library - try: - cur.execute("LOAD 'anon'") - except Exception as err: - log.info(f"error loading anon library: {err}") - raise AssertionError("unexpected error loading anon library") from err - - # test library which name is different from extension name - # this may fail locally if dependency is missing - # however, it does successfully download the postgis archive - if remote_storage_kind == RemoteStorageKind.REAL_S3: - try: - cur.execute("LOAD 'postgis_topology-3'") - except Exception as err: - log.info("error loading postgis_topology-3") - assert "No such file or directory" in str( - err - ), "unexpected error loading postgis_topology-3" - finally: - cleanup(pg_version) - - -# Here we test a complex extension -# which has multiple extensions in one archive -# using postgis as an example -# @pytest.mark.skipif( -# RemoteStorageKind.REAL_S3 not in available_s3_storages(), -# reason="skipping test because real s3 not enabled", -# ) -@skip_on_postgres(PgVersion.V16, reason="TODO: PG16 extension building") -@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949") -def test_multiple_extensions_one_archive( - neon_env_builder: NeonEnvBuilder, - pg_version: PgVersion, -): - neon_env_builder.enable_extensions_remote_storage(RemoteStorageKind.REAL_S3) - env = neon_env_builder.init_start() - tenant_id, _ = env.neon_cli.create_tenant() - env.neon_cli.create_timeline("test_multiple_extensions_one_archive", tenant_id=tenant_id) - - assert env.ext_remote_storage is not None # satisfy mypy - - endpoint = env.endpoints.create_start( - "test_multiple_extensions_one_archive", - tenant_id=tenant_id, - remote_ext_config=env.ext_remote_storage.to_string(), - ) - with closing(endpoint.connect()) as conn: - with conn.cursor() as cur: - cur.execute("CREATE EXTENSION address_standardizer;") - cur.execute("CREATE EXTENSION address_standardizer_data_us;") - # execute query to ensure that it works - cur.execute( - "SELECT house_num, name, suftype, city, country, state, unit \ - FROM standardize_address('us_lex', 'us_gaz', 'us_rules', \ - 'One Rust Place, Boston, MA 02109');" - ) - res = cur.fetchall() - log.info(res) - assert len(res) > 0 - - cleanup(pg_version) - - -# Test that extension is downloaded after endpoint restart, -# when the library is used in the query. +# TODO +# 1. Test downloading remote library. # +# 2. Test a complex extension, which has multiple extensions in one archive +# using postgis as an example +# +# 3.Test that extension is downloaded after endpoint restart, +# when the library is used in the query. # Run the test with mutliple simultaneous connections to an endpoint. # to ensure that the extension is downloaded only once. # -@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949") -def test_extension_download_after_restart( - neon_env_builder: NeonEnvBuilder, - pg_version: PgVersion, -): - # TODO: PG15 + PG16 extension building - if "v14" not in pg_version: # test set only has extension built for v14 - return None - - neon_env_builder.enable_extensions_remote_storage(RemoteStorageKind.MOCK_S3) - env = neon_env_builder.init_start() - tenant_id, _ = env.neon_cli.create_tenant() - env.neon_cli.create_timeline("test_extension_download_after_restart", tenant_id=tenant_id) - - assert env.ext_remote_storage is not None # satisfy mypy - - # For MOCK_S3 we upload test files. - upload_files(env) - - endpoint = env.endpoints.create_start( - "test_extension_download_after_restart", - tenant_id=tenant_id, - remote_ext_config=env.ext_remote_storage.to_string(), - config_lines=["log_min_messages=debug3"], - ) - with closing(endpoint.connect()) as conn: - with conn.cursor() as cur: - cur.execute("CREATE extension pg_buffercache;") - cur.execute("SELECT * from pg_buffercache;") - res = cur.fetchall() - assert len(res) > 0 - log.info(res) - - # shutdown compute node - endpoint.stop() - # remove extension files locally - cleanup(pg_version) - - # spin up compute node again (there are no extension files available, because compute is stateless) - endpoint = env.endpoints.create_start( - "test_extension_download_after_restart", - tenant_id=tenant_id, - remote_ext_config=env.ext_remote_storage.to_string(), - config_lines=["log_min_messages=debug3"], - ) - - # connect to compute node and run the query - # that will trigger the download of the extension - def run_query(endpoint, thread_id: int): - log.info("thread_id {%d} starting", thread_id) - with closing(endpoint.connect()) as conn: - with conn.cursor() as cur: - cur.execute("SELECT * from pg_buffercache;") - res = cur.fetchall() - assert len(res) > 0 - log.info("thread_id {%d}, res = %s", thread_id, res) - - threads = [threading.Thread(target=run_query, args=(endpoint, i)) for i in range(2)] - - for thread in threads: - thread.start() - for thread in threads: - thread.join() - - cleanup(pg_version) +# 4. Test that private extensions are only downloaded when they are present in the spec. +#