mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
mock and real tests both working
This commit is contained in:
@@ -16,6 +16,7 @@ Speicially, ext_index.json has a list of public extensions, and a list of
|
||||
extensions enabled for specific tenant-ids.
|
||||
*/
|
||||
use crate::compute::ComputeNode;
|
||||
use anyhow::Context;
|
||||
use anyhow::{self, bail, Result};
|
||||
use remote_storage::*;
|
||||
use serde_json::{self, Value};
|
||||
@@ -31,7 +32,7 @@ use std::thread;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::info;
|
||||
|
||||
// TODO: use these, it's better
|
||||
// TODO: use these crates for untarring, it's better
|
||||
// use tar::Archive;
|
||||
// use flate2::read::GzDecoder;
|
||||
|
||||
@@ -70,6 +71,7 @@ pub async fn get_available_extensions(
|
||||
custom_ext_prefixes: &Vec<String>,
|
||||
) -> Result<HashSet<String>> {
|
||||
// TODO: in this function change expect's to pass the error instead of panic-ing
|
||||
// TODO: figure out why it's calling library loads
|
||||
|
||||
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||
|
||||
@@ -147,22 +149,21 @@ pub async fn download_extension(
|
||||
pg_version: &str,
|
||||
) -> Result<()> {
|
||||
let ext_name = ext_name.replace(".so", "");
|
||||
|
||||
info!("DOWNLOAD EXTENSION {:?}", ext_name);
|
||||
let ext_name_targz = ext_name.to_owned() + ".tar.gz";
|
||||
if Path::new(&ext_name_targz).exists() {
|
||||
info!("extension {:?} already exists", ext_name_targz);
|
||||
return Ok(());
|
||||
}
|
||||
let ext_path = RemotePath::new(
|
||||
&Path::new(pg_version)
|
||||
.join("extensions")
|
||||
.join(ext_name_targz.clone()),
|
||||
)?;
|
||||
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||
let local_libdir = Path::new(&get_pg_config("--libdir", pgbin)).join("postgresql");
|
||||
info!(
|
||||
"Start downloading extension {:?} from {:?}",
|
||||
ext_name, ext_path
|
||||
);
|
||||
let mut download = remote_storage.download(&ext_path).await?;
|
||||
// TODO: skip download if files already
|
||||
let mut write_data_buffer = Vec::new();
|
||||
download
|
||||
.download_stream
|
||||
@@ -173,15 +174,13 @@ pub async fn download_extension(
|
||||
output_file.write_all(&write_data_buffer)?;
|
||||
info!("Download {:?} completed successfully", &ext_path);
|
||||
info!("Unzipping extension {:?}", zip_name);
|
||||
|
||||
// TODO unzip and place files in appropriate locations using the library suggested by some ppl
|
||||
info!("unzip {zip_name:?}");
|
||||
std::process::Command::new("tar")
|
||||
.arg("xzvf")
|
||||
.arg(zip_name)
|
||||
.spawn()?
|
||||
.wait()?;
|
||||
|
||||
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
|
||||
let zip_sharedir = format!("extensions/{ext_name}/share/extension");
|
||||
info!("mv {zip_sharedir:?}/* {local_sharedir:?}");
|
||||
for file in std::fs::read_dir(zip_sharedir)? {
|
||||
@@ -190,6 +189,7 @@ pub async fn download_extension(
|
||||
Path::new(&local_sharedir).join(old_file.file_name().expect("error parsing file"));
|
||||
std::fs::rename(old_file, new_file)?;
|
||||
}
|
||||
let local_libdir = Path::new(&get_pg_config("--libdir", pgbin)).join("postgresql");
|
||||
let zip_libdir = format!("extensions/{ext_name}/lib");
|
||||
info!("mv {zip_libdir:?}/* {local_libdir:?}");
|
||||
for file in std::fs::read_dir(zip_libdir)? {
|
||||
@@ -208,31 +208,25 @@ pub fn init_remote_storage(
|
||||
) -> anyhow::Result<GenericRemoteStorage> {
|
||||
let remote_ext_config: serde_json::Value = serde_json::from_str(remote_ext_config)?;
|
||||
|
||||
let remote_ext_bucket = match &remote_ext_config["bucket"] {
|
||||
Value::String(x) => x,
|
||||
_ => bail!("remote_ext_config missing bucket"),
|
||||
};
|
||||
let remote_ext_region = match &remote_ext_config["region"] {
|
||||
Value::String(x) => x,
|
||||
_ => bail!("remote_ext_config missing region"),
|
||||
};
|
||||
let remote_ext_endpoint = match &remote_ext_config["endpoint"] {
|
||||
Value::String(x) => Some(x.clone()),
|
||||
_ => None,
|
||||
};
|
||||
let remote_ext_prefix = match &remote_ext_config["prefix"] {
|
||||
Value::String(x) => Some(x.clone()),
|
||||
// if prefix is not provided, use default, which is the build_tag
|
||||
_ => Some(default_prefix.to_string()),
|
||||
};
|
||||
let remote_ext_bucket = remote_ext_config["bucket"]
|
||||
.as_str()
|
||||
.context("config parse error")?;
|
||||
let remote_ext_region = remote_ext_config["region"]
|
||||
.as_str()
|
||||
.context("config parse error")?;
|
||||
let remote_ext_endpoint = remote_ext_config["endpoint"].as_str();
|
||||
let remote_ext_prefix = remote_ext_config["prefix"]
|
||||
.as_str()
|
||||
.unwrap_or(default_prefix)
|
||||
.to_string();
|
||||
|
||||
// TODO: is this a valid assumption? some extensions are quite large
|
||||
// load should not be large, so default parameters are fine
|
||||
let config = S3Config {
|
||||
bucket_name: remote_ext_bucket.to_string(),
|
||||
bucket_region: remote_ext_region.to_string(),
|
||||
prefix_in_bucket: remote_ext_prefix,
|
||||
endpoint: remote_ext_endpoint,
|
||||
prefix_in_bucket: Some(remote_ext_prefix),
|
||||
endpoint: remote_ext_endpoint.map(|x| x.to_string()),
|
||||
concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
|
||||
max_keys_per_list_response: None,
|
||||
};
|
||||
|
||||
@@ -805,11 +805,11 @@ class NeonEnvBuilder:
|
||||
|
||||
if enable_remote_extensions:
|
||||
self.ext_remote_storage = S3Storage(
|
||||
bucket_name="neon-dev-extensions-us-east-2",
|
||||
bucket_region="us-east-2",
|
||||
bucket_name="neon-dev-extensions",
|
||||
bucket_region="eu-central-1",
|
||||
access_key=access_key,
|
||||
secret_key=secret_key,
|
||||
prefix_in_bucket="5412197734",
|
||||
prefix_in_bucket="5555",
|
||||
)
|
||||
|
||||
def cleanup_local_storage(self):
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import os
|
||||
import shutil
|
||||
from contextlib import closing
|
||||
|
||||
import pytest
|
||||
@@ -24,10 +26,9 @@ def test_remote_extensions(
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
# TODO: SKIP for now, infra not ready yet
|
||||
if remote_storage_kind == RemoteStorageKind.REAL_S3 or pg_version == "14":
|
||||
if pg_version != "15":
|
||||
# TODO: for right now we only have test files for v15
|
||||
return None
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_remote_extensions",
|
||||
@@ -38,31 +39,27 @@ def test_remote_extensions(
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
|
||||
|
||||
assert env.ext_remote_storage is not None
|
||||
assert env.remote_storage_client is not None
|
||||
|
||||
# For MOCK_S3 we upload some test files. for REAL_S3 we use the files created in CICD
|
||||
# For MOCK_S3 we upload test files.
|
||||
# For REAL_S3 we use the files already in the bucket
|
||||
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
log.info("Uploading test files to mock bucket")
|
||||
with open("test_runner/regress/data/extension_test/ext_index.json", "rb") as f:
|
||||
env.remote_storage_client.upload_fileobj(
|
||||
f,
|
||||
env.ext_remote_storage.bucket_name,
|
||||
f"ext/v{pg_version}/ext_index.json",
|
||||
)
|
||||
with open("test_runner/regress/data/extension_test/anon.tar.gz", "rb") as f:
|
||||
env.remote_storage_client.upload_fileobj(
|
||||
f,
|
||||
env.ext_remote_storage.bucket_name,
|
||||
f"ext/v{pg_version}/extensions/anon.tar.gz",
|
||||
)
|
||||
with open("test_runner/regress/data/extension_test/embedding.tar.gz", "rb") as f:
|
||||
env.remote_storage_client.upload_fileobj(
|
||||
f,
|
||||
env.ext_remote_storage.bucket_name,
|
||||
f"ext/v{pg_version}/extensions/embedding.tar.gz",
|
||||
)
|
||||
|
||||
def upload_test_file(from_path, to_path):
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
assert env.remote_storage_client is not None # satisfy mypy
|
||||
with open(f"test_runner/regress/data/extension_test/{from_path}", "rb") as f:
|
||||
env.remote_storage_client.upload_fileobj(
|
||||
f,
|
||||
env.ext_remote_storage.bucket_name,
|
||||
f"ext/v{pg_version}/{to_path}",
|
||||
)
|
||||
|
||||
upload_test_file("ext_index.json", "ext_index.json")
|
||||
upload_test_file("anon.tar.gz", "extensions/anon.tar.gz")
|
||||
upload_test_file("embedding.tar.gz", "extensions/embedding.tar.gz")
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
assert env.remote_storage_client is not None # satisfy mypy
|
||||
try:
|
||||
# Start a compute node and check that it can download the extensions
|
||||
# and use them to CREATE EXTENSION and LOAD
|
||||
@@ -80,7 +77,7 @@ def test_remote_extensions(
|
||||
log.info(all_extensions)
|
||||
assert "anon" in all_extensions
|
||||
assert "embedding" in all_extensions
|
||||
# TODO: check that we don't have download custom extensions for other tenant ids
|
||||
# TODO: check that we cant't download custom extensions for other tenant ids
|
||||
# TODO: not sure how private extension will work with REAL_S3 test. can we rig the tenant id?
|
||||
|
||||
# check that we can download public extension
|
||||
@@ -89,16 +86,26 @@ def test_remote_extensions(
|
||||
assert "embedding" in [x[0] for x in cur.fetchall()]
|
||||
|
||||
# check that we can download private extension
|
||||
# TODO: this will fail locally because we don't have the required dependencies
|
||||
cur.execute("CREATE EXTENSION anon")
|
||||
cur.execute("SELECT extname FROM pg_extension")
|
||||
assert "anon" in [x[0] for x in cur.fetchall()]
|
||||
try:
|
||||
cur.execute("CREATE EXTENSION anon")
|
||||
except Exception as err:
|
||||
log.info("error creating anon extension")
|
||||
assert "pgcrypto" in str(err), "unexpected error creating anon extension"
|
||||
|
||||
# TODO: try to load libraries as well
|
||||
|
||||
finally:
|
||||
cleanup_files = ["embedding.tar.gz", "anon.tar.gz"]
|
||||
_cleanup_folders = ["extensions"]
|
||||
# for file in cleanup_files:
|
||||
# os.remove(file)
|
||||
log.info(f"For now, please manually cleanup {cleanup_files}")
|
||||
cleanup_folders = ["extensions"]
|
||||
for file in cleanup_files:
|
||||
try:
|
||||
os.remove(file)
|
||||
log.info(f"removed file {file}")
|
||||
except Exception as err:
|
||||
log.info(f"error removing file {file}: {err}")
|
||||
for folder in cleanup_folders:
|
||||
try:
|
||||
shutil.rmtree(folder)
|
||||
log.info(f"removed folder {folder}")
|
||||
except Exception as err:
|
||||
log.info(f"error removing file {file}: {err}")
|
||||
|
||||
Reference in New Issue
Block a user