diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index bdc08fdeab..e83746e71a 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -16,6 +16,7 @@ Speicially, ext_index.json has a list of public extensions, and a list of extensions enabled for specific tenant-ids. */ use crate::compute::ComputeNode; +use anyhow::Context; use anyhow::{self, bail, Result}; use remote_storage::*; use serde_json::{self, Value}; @@ -31,7 +32,7 @@ use std::thread; use tokio::io::AsyncReadExt; use tracing::info; -// TODO: use these, it's better +// TODO: use these crates for untarring, it's better // use tar::Archive; // use flate2::read::GzDecoder; @@ -70,6 +71,7 @@ pub async fn get_available_extensions( custom_ext_prefixes: &Vec, ) -> Result> { // TODO: in this function change expect's to pass the error instead of panic-ing + // TODO: figure out why it's calling library loads let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); @@ -147,22 +149,21 @@ pub async fn download_extension( pg_version: &str, ) -> Result<()> { let ext_name = ext_name.replace(".so", ""); - - info!("DOWNLOAD EXTENSION {:?}", ext_name); let ext_name_targz = ext_name.to_owned() + ".tar.gz"; + if Path::new(&ext_name_targz).exists() { + info!("extension {:?} already exists", ext_name_targz); + return Ok(()); + } let ext_path = RemotePath::new( &Path::new(pg_version) .join("extensions") .join(ext_name_targz.clone()), )?; - let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); - let local_libdir = Path::new(&get_pg_config("--libdir", pgbin)).join("postgresql"); info!( "Start downloading extension {:?} from {:?}", ext_name, ext_path ); let mut download = remote_storage.download(&ext_path).await?; - // TODO: skip download if files already let mut write_data_buffer = Vec::new(); download .download_stream @@ -173,15 +174,13 @@ pub async fn download_extension( output_file.write_all(&write_data_buffer)?; info!("Download {:?} completed successfully", &ext_path); info!("Unzipping extension {:?}", zip_name); - - // TODO unzip and place files in appropriate locations using the library suggested by some ppl - info!("unzip {zip_name:?}"); std::process::Command::new("tar") .arg("xzvf") .arg(zip_name) .spawn()? .wait()?; + let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); let zip_sharedir = format!("extensions/{ext_name}/share/extension"); info!("mv {zip_sharedir:?}/* {local_sharedir:?}"); for file in std::fs::read_dir(zip_sharedir)? { @@ -190,6 +189,7 @@ pub async fn download_extension( Path::new(&local_sharedir).join(old_file.file_name().expect("error parsing file")); std::fs::rename(old_file, new_file)?; } + let local_libdir = Path::new(&get_pg_config("--libdir", pgbin)).join("postgresql"); let zip_libdir = format!("extensions/{ext_name}/lib"); info!("mv {zip_libdir:?}/* {local_libdir:?}"); for file in std::fs::read_dir(zip_libdir)? { @@ -208,31 +208,25 @@ pub fn init_remote_storage( ) -> anyhow::Result { let remote_ext_config: serde_json::Value = serde_json::from_str(remote_ext_config)?; - let remote_ext_bucket = match &remote_ext_config["bucket"] { - Value::String(x) => x, - _ => bail!("remote_ext_config missing bucket"), - }; - let remote_ext_region = match &remote_ext_config["region"] { - Value::String(x) => x, - _ => bail!("remote_ext_config missing region"), - }; - let remote_ext_endpoint = match &remote_ext_config["endpoint"] { - Value::String(x) => Some(x.clone()), - _ => None, - }; - let remote_ext_prefix = match &remote_ext_config["prefix"] { - Value::String(x) => Some(x.clone()), - // if prefix is not provided, use default, which is the build_tag - _ => Some(default_prefix.to_string()), - }; + let remote_ext_bucket = remote_ext_config["bucket"] + .as_str() + .context("config parse error")?; + let remote_ext_region = remote_ext_config["region"] + .as_str() + .context("config parse error")?; + let remote_ext_endpoint = remote_ext_config["endpoint"].as_str(); + let remote_ext_prefix = remote_ext_config["prefix"] + .as_str() + .unwrap_or(default_prefix) + .to_string(); // TODO: is this a valid assumption? some extensions are quite large // load should not be large, so default parameters are fine let config = S3Config { bucket_name: remote_ext_bucket.to_string(), bucket_region: remote_ext_region.to_string(), - prefix_in_bucket: remote_ext_prefix, - endpoint: remote_ext_endpoint, + prefix_in_bucket: Some(remote_ext_prefix), + endpoint: remote_ext_endpoint.map(|x| x.to_string()), concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"), max_keys_per_list_response: None, }; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index bec3ba7cd7..f3bfe830ad 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -805,11 +805,11 @@ class NeonEnvBuilder: if enable_remote_extensions: self.ext_remote_storage = S3Storage( - bucket_name="neon-dev-extensions-us-east-2", - bucket_region="us-east-2", + bucket_name="neon-dev-extensions", + bucket_region="eu-central-1", access_key=access_key, secret_key=secret_key, - prefix_in_bucket="5412197734", + prefix_in_bucket="5555", ) def cleanup_local_storage(self): diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py index 6ab46f0163..fa21e898f2 100644 --- a/test_runner/regress/test_download_extensions.py +++ b/test_runner/regress/test_download_extensions.py @@ -1,3 +1,5 @@ +import os +import shutil from contextlib import closing import pytest @@ -24,10 +26,9 @@ def test_remote_extensions( remote_storage_kind: RemoteStorageKind, pg_version: PgVersion, ): - # TODO: SKIP for now, infra not ready yet - if remote_storage_kind == RemoteStorageKind.REAL_S3 or pg_version == "14": + if pg_version != "15": + # TODO: for right now we only have test files for v15 return None - neon_env_builder.enable_remote_storage( remote_storage_kind=remote_storage_kind, test_name="test_remote_extensions", @@ -38,31 +39,27 @@ def test_remote_extensions( tenant_id, _ = env.neon_cli.create_tenant() env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id) - assert env.ext_remote_storage is not None - assert env.remote_storage_client is not None - - # For MOCK_S3 we upload some test files. for REAL_S3 we use the files created in CICD + # For MOCK_S3 we upload test files. + # For REAL_S3 we use the files already in the bucket if remote_storage_kind == RemoteStorageKind.MOCK_S3: log.info("Uploading test files to mock bucket") - with open("test_runner/regress/data/extension_test/ext_index.json", "rb") as f: - env.remote_storage_client.upload_fileobj( - f, - env.ext_remote_storage.bucket_name, - f"ext/v{pg_version}/ext_index.json", - ) - with open("test_runner/regress/data/extension_test/anon.tar.gz", "rb") as f: - env.remote_storage_client.upload_fileobj( - f, - env.ext_remote_storage.bucket_name, - f"ext/v{pg_version}/extensions/anon.tar.gz", - ) - with open("test_runner/regress/data/extension_test/embedding.tar.gz", "rb") as f: - env.remote_storage_client.upload_fileobj( - f, - env.ext_remote_storage.bucket_name, - f"ext/v{pg_version}/extensions/embedding.tar.gz", - ) + def upload_test_file(from_path, to_path): + assert env.ext_remote_storage is not None # satisfy mypy + assert env.remote_storage_client is not None # satisfy mypy + with open(f"test_runner/regress/data/extension_test/{from_path}", "rb") as f: + env.remote_storage_client.upload_fileobj( + f, + env.ext_remote_storage.bucket_name, + f"ext/v{pg_version}/{to_path}", + ) + + upload_test_file("ext_index.json", "ext_index.json") + upload_test_file("anon.tar.gz", "extensions/anon.tar.gz") + upload_test_file("embedding.tar.gz", "extensions/embedding.tar.gz") + + assert env.ext_remote_storage is not None # satisfy mypy + assert env.remote_storage_client is not None # satisfy mypy try: # Start a compute node and check that it can download the extensions # and use them to CREATE EXTENSION and LOAD @@ -80,7 +77,7 @@ def test_remote_extensions( log.info(all_extensions) assert "anon" in all_extensions assert "embedding" in all_extensions - # TODO: check that we don't have download custom extensions for other tenant ids + # TODO: check that we cant't download custom extensions for other tenant ids # TODO: not sure how private extension will work with REAL_S3 test. can we rig the tenant id? # check that we can download public extension @@ -89,16 +86,26 @@ def test_remote_extensions( assert "embedding" in [x[0] for x in cur.fetchall()] # check that we can download private extension - # TODO: this will fail locally because we don't have the required dependencies - cur.execute("CREATE EXTENSION anon") - cur.execute("SELECT extname FROM pg_extension") - assert "anon" in [x[0] for x in cur.fetchall()] + try: + cur.execute("CREATE EXTENSION anon") + except Exception as err: + log.info("error creating anon extension") + assert "pgcrypto" in str(err), "unexpected error creating anon extension" # TODO: try to load libraries as well finally: cleanup_files = ["embedding.tar.gz", "anon.tar.gz"] - _cleanup_folders = ["extensions"] - # for file in cleanup_files: - # os.remove(file) - log.info(f"For now, please manually cleanup {cleanup_files}") + cleanup_folders = ["extensions"] + for file in cleanup_files: + try: + os.remove(file) + log.info(f"removed file {file}") + except Exception as err: + log.info(f"error removing file {file}: {err}") + for folder in cleanup_folders: + try: + shutil.rmtree(folder) + log.info(f"removed folder {folder}") + except Exception as err: + log.info(f"error removing file {file}: {err}")