Compare commits

...

2 Commits

10 changed files with 238 additions and 299 deletions

View File

@@ -58,16 +58,12 @@ use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "5670669815";
fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
let build_tag = option_env!("BUILD_TAG")
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string();
// if there is no build tag environment variable, use a "default value"
// the use-case for this "default value" is regression tests that don't require BUILD_TAG
let build_tag = option_env!("BUILD_TAG").unwrap_or("5670669815").to_string();
info!("build_tag: {build_tag}");
let matches = cli().get_matches();

View File

@@ -1,24 +1,12 @@
{
"public_extensions": [
"anon",
"pg_buffercache"
],
"library_index": {
"anon": "anon",
"pg_buffercache": "pg_buffercache"
},
"public_extensions": [ "weighted_mean" ],
"library_index": { "weighted_mean": "weighted_mean" },
"extension_data": {
"pg_buffercache": {
"weighted_mean": {
"control_data": {
"pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
"weighted_mean.control": "# weighted_mean extension\ncomment = 'A weighted mean aggregate function extension'\ndefault_version = '1.0.1'\nmodule_pathname = '$libdir/weighted_mean'\nrelocatable = true\ntrusted = true\n"
},
"archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
},
"anon": {
"control_data": {
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
},
"archive_path": "5670669815/v14/extensions/anon.tar.zst"
"archive_path": "5670669815/v14/extensions/weighted_mean.tar.zst"
}
}
}
}

View File

@@ -1,17 +1,16 @@
{
"public_extensions": [
"anon"
"weighted_mean"
],
"library_index": {
"anon": "anon"
"weighted_mean": "weighted_mean"
},
"extension_data": {
"anon": {
"weighted_mean": {
"control_data": {
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
"weighted_mean.control": "# weighted_mean extension\ncomment = 'A weighted mean aggregate function extension'\ndefault_version = '1.0.1'\nmodule_pathname = '$libdir/weighted_mean'\nrelocatable = true\ntrusted = true\n"
},
"archive_path": "5670669815/v15/extensions/anon.tar.zst"
"archive_path": "5670669815/v15/extensions/weighted_mean.tar.zst"
}
}
}

View File

@@ -0,0 +1,51 @@
This is some data for testing remote extensions
It was generated by this docker code
Basically just make installing the extension.
```
#########################################################################################
#
# Layer "weighted-mean-pg-build"
# compile weighted-mean extension
# Note: Please do not include this in the compute image,
# it is used for testing remote extension functionality
# so the extension *must* not be part of the compute image already
#
#########################################################################################
FROM build-deps AS weighted-mean-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN wget https://github.com/Kozea/weighted_mean/archive/master.tar.gz -O weighted_mean.tar.gz &&\
mkdir weighted_mean-src && cd weighted_mean-src && tar xvzf ../weighted_mean.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt &&\
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/weighted_mean.control && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /after.txt &&\
mkdir -p /extensions/weighted_mean && cp /usr/local/pgsql/share/extension/weighted_mean.control /extensions/weighted_mean && \
sort -o /before.txt /before.txt && sort -o /after.txt /after.txt && \
comm -13 /before.txt /after.txt | tar --directory=/usr/local/pgsql --zstd -cf /extensions/weighted_mean.tar.zst -T -
```
weighted_mean
Copyright (c) Ronan Dunklau 2012-2015
Permission to use, copy, modify, and distribute this software and its
documentation for any purpose, without fee, and without a written agreement
is hereby granted, provided that the above copyright notice and this
paragraph and the following two paragraphs appear in all copies.
IN NO EVENT SHALL RONAN DUNKLAU BE LIABLE TO ANY PARTY FOR
DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
DOCUMENTATION, EVEN IF RONAN DUNKLAU HAS BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
RONAN DUNKLAU SPECIFICALLY DISCLAIMS ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
ON AN "AS IS" BASIS, AND RONAN DUNKLAU HAS NO OBLIGATIONS TO
PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.

View File

@@ -1,64 +1,23 @@
import os
import shutil
import threading
import uuid
from contextlib import closing
from pathlib import Path
from typing import Optional
import pytest
from fixtures.broker import NeonBroker
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import RemoteStorageKind, available_s3_storages
# Cleaning up downloaded files is important for local tests
# or else one test could reuse the files from another test or another test run
def cleanup(pg_version):
PGDIR = Path(f"pg_install/v{pg_version}")
LIB_DIR = PGDIR / Path("lib/postgresql")
cleanup_lib_globs = ["anon*", "postgis*", "pg_buffercache*"]
cleanup_lib_glob_paths = [LIB_DIR.glob(x) for x in cleanup_lib_globs]
SHARE_DIR = PGDIR / Path("share/postgresql/extension")
cleanup_ext_globs = [
"anon*",
"address_standardizer*",
"postgis*",
"pageinspect*",
"pg_buffercache*",
"pgrouting*",
]
cleanup_ext_glob_paths = [SHARE_DIR.glob(x) for x in cleanup_ext_globs]
all_glob_paths = cleanup_lib_glob_paths + cleanup_ext_glob_paths
all_cleanup_files = []
for file_glob in all_glob_paths:
for file in file_glob:
all_cleanup_files.append(file)
for file in all_cleanup_files:
try:
os.remove(file)
log.info(f"removed file {file}")
except Exception as err:
log.info(
f"skipping remove of file {file} because it doesn't exist.\
this may be expected or unexpected depending on the test {err}"
)
cleanup_folders = [SHARE_DIR / Path("anon"), PGDIR / Path("download_extensions")]
for folder in cleanup_folders:
try:
shutil.rmtree(folder)
log.info(f"removed folder {folder}")
except Exception as err:
log.info(
f"skipping remove of folder {folder} because it doesn't exist.\
this may be expected or unexpected depending on the test {err}"
)
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import (
MockS3Server,
RemoteStorageKind,
available_s3_storages,
)
def upload_files(env):
@@ -80,248 +39,194 @@ def upload_files(env):
os.chdir("../../../..")
# creates the weighted_mean extension and runs test queries to verify it works
def weighted_mean_test(endpoint, message: Optional[str] = None, create: bool = True):
if message is not None:
log.info(message)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# Check that appropriate control files were downloaded
cur.execute("SELECT * FROM pg_available_extensions")
all_extensions = [x[0] for x in cur.fetchall()]
log.info(all_extensions)
assert "weighted_mean" in all_extensions
# test is from: https://github.com/Kozea/weighted_mean/tree/master/test
if create:
cur.execute("CREATE extension weighted_mean")
cur.execute(
"create temp table test as (\
select a::numeric, b::numeric\
from\
generate_series(1, 100) as a(a),\
generate_series(1, 100) as b(b));"
)
cur.execute("select weighted_mean(a,b) from test;")
x = cur.fetchone()
log.info(x)
assert str(x) == "(Decimal('50.5000000000000000'),)"
cur.execute("update test set b = 0;")
cur.execute("select weighted_mean(a,b) from test;")
x = cur.fetchone()
log.info(x)
assert str(x) == "(Decimal('0'),)"
# Test downloading remote extension.
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
def test_remote_extensions(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_version: PgVersion,
test_output_dir: Path,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
neon_binpath: Path,
pg_distrib_dir: Path,
default_broker: NeonBroker,
run_id: uuid.UUID,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_remote_extensions",
enable_remote_extensions=True,
)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
alt_pgdir = test_output_dir / "pg_install"
log.info(f"Copying {pg_distrib_dir} (which is big) to {alt_pgdir}")
shutil.copytree(pg_distrib_dir / pg_version.v_prefixed, alt_pgdir / pg_version.v_prefixed)
assert env.ext_remote_storage is not None # satisfy mypy
assert env.remote_storage_client is not None # satisfy mypy
with NeonEnvBuilder(
repo_dir=test_output_dir / "repo",
port_distributor=port_distributor,
mock_s3_server=mock_s3_server,
neon_binpath=neon_binpath,
pg_distrib_dir=alt_pgdir,
pg_version=pg_version,
broker=default_broker,
run_id=run_id,
preserve_database_files=False,
) as neon_env_builder:
log.info(port_distributor)
log.info(mock_s3_server)
log.info(neon_binpath)
log.info(pg_distrib_dir)
log.info(pg_version)
log.info(default_broker)
log.info(run_id)
# For MOCK_S3 we upload test files.
# For REAL_S3 we use the files already in the bucket
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
test_name="test_remote_extensions",
enable_remote_extensions=True,
)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
assert env.ext_remote_storage is not None # satisfy mypy
upload_files(env)
# Start a compute node and check that it can download the extensions
# and use them to CREATE EXTENSION and LOAD
endpoint = env.endpoints.create_start(
"test_remote_extensions",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
# config_lines=["log_min_messages=debug3"],
)
try:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# Check that appropriate control files were downloaded
cur.execute("SELECT * FROM pg_available_extensions")
all_extensions = [x[0] for x in cur.fetchall()]
log.info(all_extensions)
assert "anon" in all_extensions
# Start a compute node and check that it can download the extensions
# and use them to CREATE EXTENSION and LOAD
endpoint = env.endpoints.create_start(
"test_remote_extensions",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
# config_lines=["log_min_messages=debug3"],
)
weighted_mean_test(endpoint)
# postgis is on real s3 but not mock s3.
# it's kind of a big file, would rather not upload to github
if remote_storage_kind == RemoteStorageKind.REAL_S3:
assert "postgis" in all_extensions
# this may fail locally if dependency is missing
# we don't really care about the error,
# we just want to make sure it downloaded
try:
cur.execute("CREATE EXTENSION postgis")
except Exception as err:
log.info(f"(expected) error creating postgis extension: {err}")
# we do not check the error, so this is basically a NO-OP
# however checking the log you can make sure that it worked
# and also get valuable information about how long loading the extension took
# # Test that extension is downloaded after endpoint restart,
# # when the library is used in the query.
# #
# # Run the test with mutliple simultaneous connections to an endpoint.
# # to ensure that the extension is downloaded only once.
# this is expected to fail on my computer because I don't have the pgcrypto extension
try:
cur.execute("CREATE EXTENSION anon")
except Exception as err:
log.info("error creating anon extension")
assert "pgcrypto" in str(err), "unexpected error creating anon extension"
finally:
cleanup(pg_version)
# import subprocess
# import threading
# # shutdown compute node
# endpoint.stop()
# # remove extension files locally
# SHAREDIR = subprocess.check_output(
# [alt_pgdir / f"{pg_version.v_prefixed}/bin/pg_config", "--sharedir"]
# )
# SHAREDIRPATH = Path(SHAREDIR.decode("utf-8").strip())
# log.info("SHAREDIRPATH: %s", SHAREDIRPATH)
# (SHAREDIRPATH / "extension/weighted_mean--1.0.1.sql").unlink()
# (SHAREDIRPATH / "extension/weighted_mean.control").unlink()
# # spin up compute node again (there are no extension files available, because compute is stateless)
# endpoint = env.endpoints.create_start(
# "test_remote_extensions",
# tenant_id=tenant_id,
# remote_ext_config=env.ext_remote_storage.to_string(),
# # config_lines=["log_min_messages=debug3"],
# )
# Test downloading remote library.
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
def test_remote_library(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_version: PgVersion,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_remote_library",
enable_remote_extensions=True,
)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_remote_library", tenant_id=tenant_id)
assert env.ext_remote_storage is not None # satisfy mypy
assert env.remote_storage_client is not None # satisfy mypy
# For MOCK_S3 we upload test files.
# For REAL_S3 we use the files already in the bucket
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
upload_files(env)
# and use them to run LOAD library
endpoint = env.endpoints.create_start(
"test_remote_library",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
# config_lines=["log_min_messages=debug3"],
)
try:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# try to load library
try:
cur.execute("LOAD 'anon'")
except Exception as err:
log.info(f"error loading anon library: {err}")
raise AssertionError("unexpected error loading anon library") from err
# test library which name is different from extension name
# this may fail locally if dependency is missing
# however, it does successfully download the postgis archive
if remote_storage_kind == RemoteStorageKind.REAL_S3:
try:
cur.execute("LOAD 'postgis_topology-3'")
except Exception as err:
log.info("error loading postgis_topology-3")
assert "No such file or directory" in str(
err
), "unexpected error loading postgis_topology-3"
finally:
cleanup(pg_version)
# # connect to compute node and run the query
# # that will trigger the download of the extension
# threads = [
# threading.Thread(
# target=weighted_mean_test, args=(endpoint, f"this is thread {i}", False)
# )
# for i in range(2)
# ]
# for thread in threads:
# thread.start()
# for thread in threads:
# thread.join()
# Here we test a complex extension
# which has multiple extensions in one archive
# using postgis as an example
# @pytest.mark.skipif(
# RemoteStorageKind.REAL_S3 not in available_s3_storages(),
# reason="skipping test because real s3 not enabled",
# )
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
@pytest.mark.skipif(
RemoteStorageKind.REAL_S3 not in available_s3_storages(),
reason="skipping test because real s3 not enabled",
)
def test_multiple_extensions_one_archive(
neon_env_builder: NeonEnvBuilder,
pg_version: PgVersion,
test_output_dir: Path,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
neon_binpath: Path,
pg_distrib_dir: Path,
default_broker: NeonBroker,
run_id: uuid.UUID,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.REAL_S3,
test_name="test_multiple_extensions_one_archive",
enable_remote_extensions=True,
)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_multiple_extensions_one_archive", tenant_id=tenant_id)
alt_pgdir = test_output_dir / "pg_install"
log.info(f"Copying {pg_distrib_dir} (which is big) to {alt_pgdir}.")
shutil.copytree(pg_distrib_dir / pg_version.v_prefixed, alt_pgdir / pg_version.v_prefixed)
assert env.ext_remote_storage is not None # satisfy mypy
assert env.remote_storage_client is not None # satisfy mypy
with NeonEnvBuilder(
repo_dir=test_output_dir / "repo",
port_distributor=port_distributor,
mock_s3_server=mock_s3_server,
neon_binpath=neon_binpath,
pg_distrib_dir=alt_pgdir,
pg_version=pg_version,
broker=default_broker,
run_id=run_id,
preserve_database_files=False,
) as neon_env_builder:
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.REAL_S3,
test_name="test_multiple_extensions_one_archive",
enable_remote_extensions=True,
)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_multiple_extensions_one_archive", tenant_id=tenant_id)
endpoint = env.endpoints.create_start(
"test_multiple_extensions_one_archive",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE EXTENSION address_standardizer;")
cur.execute("CREATE EXTENSION address_standardizer_data_us;")
# execute query to ensure that it works
cur.execute(
"SELECT house_num, name, suftype, city, country, state, unit \
FROM standardize_address('us_lex', 'us_gaz', 'us_rules', \
'One Rust Place, Boston, MA 02109');"
)
res = cur.fetchall()
log.info(res)
assert len(res) > 0
assert env.ext_remote_storage is not None # satisfy mypy
cleanup(pg_version)
# Test that extension is downloaded after endpoint restart,
# when the library is used in the query.
#
# Run the test with mutliple simultaneous connections to an endpoint.
# to ensure that the extension is downloaded only once.
#
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
def test_extension_download_after_restart(
neon_env_builder: NeonEnvBuilder,
pg_version: PgVersion,
):
if "15" in pg_version: # SKIP v15 for now because test set only has extension built for v14
return None
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
test_name="test_extension_download_after_restart",
enable_remote_extensions=True,
)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_extension_download_after_restart", tenant_id=tenant_id)
assert env.ext_remote_storage is not None # satisfy mypy
assert env.remote_storage_client is not None # satisfy mypy
# For MOCK_S3 we upload test files.
upload_files(env)
endpoint = env.endpoints.create_start(
"test_extension_download_after_restart",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
config_lines=["log_min_messages=debug3"],
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE extension pg_buffercache;")
cur.execute("SELECT * from pg_buffercache;")
res = cur.fetchall()
assert len(res) > 0
log.info(res)
# shutdown compute node
endpoint.stop()
# remove extension files locally
cleanup(pg_version)
# spin up compute node again (there are no extension files available, because compute is stateless)
endpoint = env.endpoints.create_start(
"test_extension_download_after_restart",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
config_lines=["log_min_messages=debug3"],
)
# connect to compute node and run the query
# that will trigger the download of the extension
def run_query(endpoint, thread_id: int):
log.info("thread_id {%d} starting", thread_id)
endpoint = env.endpoints.create_start(
"test_multiple_extensions_one_archive",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SELECT * from pg_buffercache;")
cur.execute("CREATE EXTENSION address_standardizer;")
cur.execute("CREATE EXTENSION address_standardizer_data_us;")
# execute query to ensure that it works
cur.execute(
"SELECT house_num, name, suftype, city, country, state, unit \
FROM standardize_address('us_lex', 'us_gaz', 'us_rules', \
'One Rust Place, Boston, MA 02109');"
)
res = cur.fetchall()
log.info(res)
assert len(res) > 0
log.info("thread_id {%d}, res = %s", thread_id, res)
threads = [threading.Thread(target=run_query, args=(endpoint, i)) for i in range(2)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
cleanup(pg_version)