Compare commits

..

3 Commits

Author SHA1 Message Date
Arthur Petukhovsky
7e04ee76e4 Try to reproduce cant find data for key 2023-09-14 20:00:59 +00:00
Em Sharnoff
3895829bda vm-monitor: Fix cgroup throttling (#5303)
I believe this (not actual IO problems) is the cause of the "disk speed
issue" that we've had for VMs recently. See e.g.:

1. https://neondb.slack.com/archives/C03H1K0PGKH/p1694287808046179?thread_ts=1694271790.580099&cid=C03H1K0PGKH
2. https://neondb.slack.com/archives/C03H1K0PGKH/p1694511932560659

The vm-informant (and now, the vm-monitor, its replacement) is supposed
to gradually increase the `neon-postgres` cgroup's memory.high value,
because otherwise the kernel will throttle all the processes in the
cgroup.

This PR fixes a bug with the vm-monitor's implementation of this
behavior.

---

Other references, for the vm-informant's implementation:

- Original issue: neondatabase/autoscaling#44
- Original PR: neondatabase/autoscaling#223
2023-09-14 13:21:50 +03:00
Joonas Koivunen
ffd146c3e5 refactor: globals in tests (#5298)
Refactor tests to have less globals.

This will allow to hopefully write more complex tests for our new metric
collection requirements in #5297. Includes reverted work from #4761
related to test globals.

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
Co-authored-by: MMeent <matthias@neon.tech>
2023-09-13 22:05:30 +03:00
14 changed files with 494 additions and 262 deletions

View File

@@ -124,21 +124,8 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y ninja-build python3-dev libncurses5 binutils clang
RUN case "${PG_VERSION}" in \
"v14" | "v15") \
export PLV8_VERSION=3.1.5 \
export PLV8_CHECKSUM=1e108d5df639e4c189e1c5bdfa2432a521c126ca89e7e5a969d46899ca7bf106 \
;; \
"v16") \
export PLV8_VERSION=3.1.8 \
export PLV8_CHECKSUM=92b10c7db39afdae97ff748c9ec54713826af222c459084ad002571b79eb3f49 \
;; \
*) \
echo "Export the valid PG_VERSION variable" && exit 1 \
;; \
esac && \
wget https://github.com/plv8/plv8/archive/refs/tags/v${PLV8_VERSION}.tar.gz -O plv8.tar.gz && \
echo "${PLV8_CHECKSUM} plv8.tar.gz" | sha256sum --check && \
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.8.tar.gz -O plv8.tar.gz && \
echo "92b10c7db39afdae97ff748c9ec54713826af222c459084ad002571b79eb3f49 plv8.tar.gz" | sha256sum --check && \
mkdir plv8-src && cd plv8-src && tar xvzf ../plv8.tar.gz --strip-components=1 -C . && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
make DOCKER=1 -j $(getconf _NPROCESSORS_ONLN) install && \
@@ -429,7 +416,7 @@ RUN case "${PG_VERSION}" in \
;; \
"v16") \
export PG_HINT_PLAN_VERSION=16_1_6_0 \
export PG_HINT_PLAN_CHECKSUM=fc85a9212e7d2819d4ae4ac75817481101833c3cfa9f0fe1f980984e12347d00 \
export PG_HINT_PLAN_CHECKSUM=ce6a8040c78012000f5da7240caf6a971401412f41d33f930f09291e6c304b99 \
;; \
*) \
echo "Export the valid PG_HINT_PLAN_VERSION variable" && exit 1 \

View File

@@ -45,6 +45,7 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use compute_api::spec::GenericOption;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::{NodeId, TenantId, TimelineId};
@@ -478,29 +479,192 @@ impl Endpoint {
}
}
// Create spec file
let spec = ComputeSpec {
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
format_version: 1.0,
operation_uuid: None,
cluster: Cluster {
cluster_id: None, // project ID: not used
name: None, // project name: not used
state: None,
roles: vec![],
databases: vec![],
settings: None,
postgresql_conf: Some(postgresql_conf),
let raw_spec = r#"
{
"cluster": {
"cluster_id": "young-forest-08365916",
"name": "young-forest-08365916",
"settings": [
{
"name": "listen_addresses",
"value": "*",
"vartype": "string"
},
{
"name": "wal_level",
"value": "replica",
"vartype": "enum"
},
{
"name": "max_wal_size",
"value": "1024",
"vartype": "integer"
},
{
"name": "max_parallel_workers",
"value": "8",
"vartype": "integer"
},
{
"name": "max_connections",
"value": "112",
"vartype": "integer"
},
{
"name": "wal_sender_timeout",
"value": "10000",
"vartype": "integer"
},
{
"name": "synchronous_standby_names",
"value": "walproposer",
"vartype": "string"
},
{
"name": "effective_io_concurrency",
"value": "100",
"vartype": "integer"
},
{
"name": "shared_preload_libraries",
"value": "neon",
"vartype": "string"
},
{
"name": "max_replication_slots",
"value": "10",
"vartype": "integer"
},
{
"name": "neon.max_cluster_size",
"value": "204800",
"vartype": "integer"
},
{
"name": "fsync",
"value": "off",
"vartype": "bool"
},
{
"name": "max_replication_write_lag",
"value": "15",
"vartype": "integer"
},
{
"name": "wal_log_hints",
"value": "off",
"vartype": "bool"
},
{
"name": "maintenance_io_concurrency",
"value": "100",
"vartype": "integer"
},
{
"name": "max_worker_processes",
"value": "8",
"vartype": "integer"
},
{
"name": "idle_in_transaction_session_timeout",
"value": "300000",
"vartype": "integer"
},
{
"name": "password_encryption",
"value": "scram-sha-256",
"vartype": "enum"
},
{
"name": "max_replication_flush_lag",
"value": "10240",
"vartype": "integer"
},
{
"name": "max_wal_senders",
"value": "10",
"vartype": "integer"
},
{
"name": "restart_after_crash",
"value": "off",
"vartype": "bool"
},
{
"name": "shared_buffers",
"value": "65536",
"vartype": "integer"
},
{
"name": "hot_standby",
"value": "off",
"vartype": "bool"
},
{
"name": "superuser_reserved_connections",
"value": "4",
"vartype": "integer"
},
{
"name": "maintenance_work_mem",
"value": "65536",
"vartype": "integer"
}
],
"roles": [
{
"name": "arthur",
"encrypted_password": "SCRAM-SHA-256$4096:goIf+DVnIR1HglUPCerpKw==$cCdVfO1J81t5q6Ry54Eu9wJyjtm/Q5gu281MHbV/0r8=:1vyoQkj0Z/rMEzZ96QcoSECrYckSvQNgWWIL7x2Lcyw=",
"options": []
}
],
"databases": [
{
"name": "neondb",
"owner": "arthur",
"options": null
}
]
},
delta_operations: None,
tenant_id: Some(self.tenant_id),
timeline_id: Some(self.timeline_id),
mode: self.mode,
pageserver_connstring: Some(pageserver_connstring),
safekeeper_connstrings,
storage_auth_token: auth_token.clone(),
remote_extensions: None,
};
"skip_pg_catalog_updates": false,
"mode": "Primary",
"delta_operations": null,
"format_version": 1,
"operation_uuid": "9eaaac44-1d3b-405f-abd9-3562f633c6b5",
"timestamp": "2023-09-14T16:10:56.936947442Z",
"storage_auth_token": ""
}
"#;
let mut spec: ComputeSpec = serde_json::from_str(raw_spec)?;
if let Some(ref mut settings) = spec.cluster.settings {
settings.push(GenericOption{
name: "neon.safekeepers".to_owned(),
value: Some(safekeeper_connstrings.join(",")),
vartype: "string".to_owned(),
});
settings.push(GenericOption{
name: "neon.pageserver_connstring".to_owned(),
value: Some(pageserver_connstring),
vartype: "string".to_owned(),
});
settings.push(GenericOption{
name: "neon.tenant_id".to_owned(),
value: Some(self.tenant_id.to_string()),
vartype: "string".to_owned(),
});
settings.push(GenericOption{
name: "neon.timeline_id".to_owned(),
value: Some(self.timeline_id.to_string()),
vartype: "string".to_owned(),
});
settings.push(GenericOption{
name: "port".to_owned(),
value: Some(self.pg_address.port().to_string()),
vartype: "string".to_owned(),
});
}
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;

View File

@@ -470,14 +470,14 @@ impl<'a> WalIngest<'a> {
// we can't validate the remaining number of bytes without parsing
// the tuple data.
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
old_heap_blkno = Some(decoded.blocks[0].blkno);
}
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
// non-HOT update where the new tuple goes to different page than
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
// set.
new_heap_blkno = Some(decoded.blocks[0].blkno);
new_heap_blkno = Some(decoded.blocks[1].blkno);
}
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
@@ -526,14 +526,14 @@ impl<'a> WalIngest<'a> {
// we can't validate the remaining number of bytes without parsing
// the tuple data.
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
old_heap_blkno = Some(decoded.blocks[0].blkno);
}
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
// non-HOT update where the new tuple goes to different page than
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
// set.
new_heap_blkno = Some(decoded.blocks[0].blkno);
new_heap_blkno = Some(decoded.blocks[1].blkno);
}
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
@@ -582,14 +582,14 @@ impl<'a> WalIngest<'a> {
// we can't validate the remaining number of bytes without parsing
// the tuple data.
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
old_heap_blkno = Some(decoded.blocks[0].blkno);
}
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
// non-HOT update where the new tuple goes to different page than
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
// set.
new_heap_blkno = Some(decoded.blocks[0].blkno);
new_heap_blkno = Some(decoded.blocks[1].blkno);
}
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
@@ -745,14 +745,14 @@ impl<'a> WalIngest<'a> {
// we can't validate the remaining number of bytes without parsing
// the tuple data.
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
old_heap_blkno = Some(decoded.blocks[0].blkno);
}
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
// non-HOT update where the new tuple goes to different page than
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
// set.
new_heap_blkno = Some(decoded.blocks[0].blkno);
new_heap_blkno = Some(decoded.blocks[1].blkno);
}
}
pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {

View File

@@ -1,6 +1,7 @@
pytest_plugins = (
"fixtures.pg_version",
"fixtures.parametrize",
"fixtures.httpserver",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",

View File

@@ -0,0 +1,45 @@
from typing import Tuple
import pytest
from pytest_httpserver import HTTPServer
# TODO: mypy fails with:
# Module "fixtures.neon_fixtures" does not explicitly export attribute "PortDistributor" [attr-defined]
# from fixtures.neon_fixtures import PortDistributor
# compared to the fixtures from pytest_httpserver with same names, these are
# always function scoped, so you can check and stop the server in tests.
@pytest.fixture(scope="function")
def httpserver_ssl_context():
return None
@pytest.fixture(scope="function")
def make_httpserver(httpserver_listen_address, httpserver_ssl_context):
host, port = httpserver_listen_address
if not host:
host = HTTPServer.DEFAULT_LISTEN_HOST
if not port:
port = HTTPServer.DEFAULT_LISTEN_PORT
server = HTTPServer(host=host, port=port, ssl_context=httpserver_ssl_context)
server.start()
yield server
server.clear()
if server.is_running():
server.stop()
@pytest.fixture(scope="function")
def httpserver(make_httpserver):
server = make_httpserver
yield server
server.clear()
@pytest.fixture(scope="function")
def httpserver_listen_address(port_distributor) -> Tuple[str, int]:
port = port_distributor.get_port()
return ("localhost", port)

View File

@@ -223,12 +223,6 @@ def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistrib
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
@pytest.fixture(scope="session")
def httpserver_listen_address(port_distributor: PortDistributor):
port = port_distributor.get_port()
return ("localhost", port)
@pytest.fixture(scope="function")
def default_broker(
port_distributor: PortDistributor,
@@ -2314,7 +2308,7 @@ class Endpoint(PgProtocol):
http_port: int,
check_stop_result: bool = True,
):
super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres")
super().__init__(host="localhost", port=pg_port, user="arthur", dbname="neondb", password="I2GH6gtFBpCz")
self.env = env
self.running = False
self.branch_name: Optional[str] = None # dubious

View File

@@ -42,12 +42,11 @@ def handle_role(dbs, roles, operation):
raise ValueError("Invalid op")
fail = False
def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response:
def ddl_forward_handler(
request: Request, dbs: Dict[str, str], roles: Dict[str, str], ddl: "DdlForwardingContext"
) -> Response:
log.info(f"Received request with data {request.get_data(as_text=True)}")
if fail:
if ddl.fail:
log.info("FAILING")
return Response(status=500, response="Failed just cuz")
if request.json is None:
@@ -72,6 +71,7 @@ class DdlForwardingContext:
self.port = port
self.dbs: Dict[str, str] = {}
self.roles: Dict[str, str] = {}
self.fail = False
endpoint = "/management/api/v2/roles_and_databases"
ddl_url = f"http://{host}:{port}{endpoint}"
self.pg.configure(
@@ -82,7 +82,7 @@ class DdlForwardingContext:
)
log.info(f"Listening on {ddl_url}")
self.server.expect_request(endpoint, method="PATCH").respond_with_handler(
lambda request: ddl_forward_handler(request, self.dbs, self.roles)
lambda request: ddl_forward_handler(request, self.dbs, self.roles, self)
)
def __enter__(self):
@@ -103,6 +103,9 @@ class DdlForwardingContext:
def wait(self, timeout=3):
self.server.wait(timeout=timeout)
def failures(self, bool):
self.fail = bool
def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]:
res = self.send(query)
self.wait(timeout=timeout)
@@ -203,9 +206,9 @@ def test_ddl_forwarding(ddl: DdlForwardingContext):
assert ddl.dbs == {"stork": "cork"}
with pytest.raises(psycopg2.InternalError):
global fail
fail = True
ddl.failures(True)
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
ddl.wait()
ddl.failures(False)
conn.close()

View File

@@ -15,45 +15,45 @@ from fixtures.types import TimelineId
# Test configuration
#
# Create a table with {num_rows} rows, and perform {updates_to_perform} random
# UPDATEs on it, using {num_connections} separate connections.
num_connections = 10
num_rows = 100000
updates_to_perform = 10000
updates_performed = 0
# Run random UPDATEs on test table
async def update_table(endpoint: Endpoint):
global updates_performed
pg_conn = await endpoint.connect_async()
while updates_performed < updates_to_perform:
updates_performed += 1
id = random.randrange(1, num_rows)
await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}")
# Perform aggressive GC with 0 horizon
async def gc(env: NeonEnv, timeline: TimelineId):
pageserver_http = env.pageserver.http_client()
loop = asyncio.get_running_loop()
def do_gc():
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
with concurrent.futures.ThreadPoolExecutor() as pool:
while updates_performed < updates_to_perform:
await loop.run_in_executor(pool, do_gc)
# Create a table with {NUM_ROWS} rows, and perform {UPDATES_TO_PERFORM} random
# UPDATEs on it, using {NUM_CONNECTIONS} separate connections.
NUM_CONNECTIONS = 10
NUM_ROWS = 100000
UPDATES_TO_PERFORM = 10000
# At the same time, run UPDATEs and GC
async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId):
workers = []
for _ in range(num_connections):
updates_performed = 0
# Perform aggressive GC with 0 horizon
async def gc(env: NeonEnv, timeline: TimelineId):
pageserver_http = env.pageserver.http_client()
nonlocal updates_performed
global UPDATES_TO_PERFORM
loop = asyncio.get_running_loop()
def do_gc():
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
with concurrent.futures.ThreadPoolExecutor() as pool:
while updates_performed < UPDATES_TO_PERFORM:
await loop.run_in_executor(pool, do_gc)
# Run random UPDATEs on test table
async def update_table(endpoint: Endpoint):
pg_conn = await endpoint.connect_async()
nonlocal updates_performed
while updates_performed < UPDATES_TO_PERFORM:
updates_performed += 1
id = random.randrange(1, NUM_ROWS)
await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}")
for _ in range(NUM_CONNECTIONS):
workers.append(asyncio.create_task(update_table(endpoint)))
workers.append(asyncio.create_task(gc(env, timeline)))
@@ -81,7 +81,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
f"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, {num_rows}) g
FROM generate_series(1, {NUM_ROWS}) g
"""
)
cur.execute("CREATE INDEX ON foo(id)")
@@ -91,7 +91,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
cur.execute("SELECT COUNT(*), SUM(counter) FROM foo")
r = cur.fetchone()
assert r is not None
assert r == (num_rows, updates_to_perform)
assert r == (NUM_ROWS, UPDATES_TO_PERFORM)
#
@@ -99,6 +99,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
# Disable time-based pitr, we will use LSN-based thresholds in the manual GC calls
neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
num_index_uploads = 0
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
@@ -160,5 +161,5 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
log.info(f"{num_index_uploads} index uploads after GC iteration {i}")
after = num_index_uploads
log.info(f"{after-before} new index uploads during test")
log.info(f"{after - before} new index uploads during test")
assert after - before < 5

View File

@@ -3,9 +3,9 @@
# Use mock HTTP server to receive metrics and verify that they look sane.
#
import time
from pathlib import Path
from typing import Iterator
from queue import SimpleQueue
from typing import Any, Iterator, Set
import pytest
from fixtures.log_helper import log
@@ -18,56 +18,10 @@ from fixtures.neon_fixtures import (
)
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
# ==============================================================================
# Storage metrics tests
# ==============================================================================
initial_tenant = TenantId.generate()
remote_uploaded = 0
checks = {
"written_size": lambda value: value > 0,
"resident_size": lambda value: value >= 0,
# >= 0 check here is to avoid race condition when we receive metrics before
# remote_uploaded is updated
"remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value >= 0,
# logical size may lag behind the actual size, so allow 0 here
"timeline_logical_size": lambda value: value >= 0,
}
metric_kinds_checked = set([])
#
# verify that metrics look minilally sane
#
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
log.info("received events:")
log.info(events)
for event in events:
assert event["tenant_id"] == str(
initial_tenant
), "Expecting metrics only from the initial tenant"
metric_name = event["metric"]
check = checks.get(metric_name)
# calm down mypy
if check is not None:
assert check(event["value"]), f"{metric_name} isn't valid"
global metric_kinds_checked
metric_kinds_checked.add(metric_name)
return Response(status=200)
@pytest.mark.parametrize(
"remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.LOCAL_FS]
@@ -81,6 +35,18 @@ def test_metric_collection(
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
metric_kinds_checked: Set[str] = set([])
uploads: SimpleQueue[Any] = SimpleQueue()
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
uploads.put(events)
return Response(status=200)
# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
@@ -90,6 +56,7 @@ def test_metric_collection(
f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
"""
+ "tenant_config={pitr_interval = '0 sec'}"
)
@@ -98,9 +65,6 @@ def test_metric_collection(
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
# Set initial tenant of the test, that we expect the logs from
global initial_tenant
initial_tenant = neon_env_builder.initial_tenant
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
@@ -108,8 +72,7 @@ def test_metric_collection(
# spin up neon, after http server is ready
env = neon_env_builder.init_start()
# Order of fixtures shutdown is not specified, and if http server gets down
# before pageserver, pageserver log might contain such errors in the end.
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_metric_collection")
@@ -141,29 +104,74 @@ def test_metric_collection(
total += sample[2]
return int(total)
remote_uploaded = 0
# upload some data to remote storage
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
global remote_uploaded
remote_uploaded = get_num_remote_ops("index", "upload")
assert remote_uploaded > 0
# wait longer than collecting interval and check that all requests are served
time.sleep(3)
httpserver.check()
global metric_kinds_checked, checks
# we expect uploads at 1Hz, on busy runners this could be too optimistic,
# so give 5s we only want to get the following upload after "ready" value.
# later tests will be added to ensure that the timeseries are sane.
timeout = 5
uploads.put("ready")
while True:
# discard earlier than "ready"
log.info("waiting for upload")
events = uploads.get(timeout=timeout)
import json
if events == "ready":
events = uploads.get(timeout=timeout)
httpserver.check()
httpserver.stop()
# if anything comes after this, we'll just ignore it
stringified = json.dumps(events, indent=2)
log.info(f"inspecting: {stringified}")
break
else:
stringified = json.dumps(events, indent=2)
log.info(f"discarding: {stringified}")
# verify that metrics look minimally sane
checks = {
"written_size": lambda value: value > 0,
"resident_size": lambda value: value >= 0,
"remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value == 0,
# logical size may lag behind the actual size, so allow 0 here
"timeline_logical_size": lambda value: value >= 0,
# this can also be zero, depending on when we get the value
"written_data_bytes_delta": lambda value: value >= 0,
}
metric_kinds_checked = set()
metric_kinds_seen = set()
for event in events:
assert event["tenant_id"] == str(tenant_id)
metric_name = event["metric"]
metric_kinds_seen.add(metric_name)
check = checks.get(metric_name)
# calm down mypy
if check is not None:
value = event["value"]
log.info(f"checking {metric_name} value {value}")
assert check(value), f"{metric_name} isn't valid"
metric_kinds_checked.add(metric_name)
expected_checks = set(checks.keys())
assert len(metric_kinds_checked) == len(
checks
assert (
metric_kinds_checked == checks.keys()
), f"Expected to receive and check all kind of metrics, but {expected_checks - metric_kinds_checked} got uncovered"
# ==============================================================================
# Proxy metrics tests
# ==============================================================================
assert metric_kinds_seen == metric_kinds_checked
def proxy_metrics_handler(request: Request) -> Response:

View File

@@ -301,6 +301,7 @@ def test_ondemand_download_timetravel(
# they are present only in the remote storage, only locally, or both.
# It should not change.
assert filled_current_physical == get_api_current_physical_size()
endpoint_old.stop()
#

View File

@@ -119,65 +119,6 @@ def test_tenant_reattach(
num_connections = 10
num_rows = 100000
updates_to_perform = 0
updates_started = 0
updates_finished = 0
# Run random UPDATEs on test table. On failure, try again.
async def update_table(pg_conn: asyncpg.Connection):
global updates_started, updates_finished, updates_to_perform
while updates_started < updates_to_perform or updates_to_perform == 0:
updates_started += 1
id = random.randrange(1, num_rows)
# Loop to retry until the UPDATE succeeds
while True:
try:
await pg_conn.fetchrow(f"UPDATE t SET counter = counter + 1 WHERE id = {id}")
updates_finished += 1
if updates_finished % 1000 == 0:
log.info(f"update {updates_finished} / {updates_to_perform}")
break
except asyncpg.PostgresError as e:
# Received error from Postgres. Log it, sleep a little, and continue
log.info(f"UPDATE error: {e}")
await asyncio.sleep(0.1)
async def sleep_and_reattach(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
global updates_started, updates_finished, updates_to_perform
# Wait until we have performed some updates
wait_until(20, 0.5, lambda: updates_finished > 500)
log.info("Detaching tenant")
pageserver_http.tenant_detach(tenant_id)
await asyncio.sleep(1)
log.info("Re-attaching tenant")
pageserver_http.tenant_attach(tenant_id)
log.info("Re-attach finished")
# Continue with 5000 more updates
updates_to_perform = updates_started + 5000
# async guts of test_tenant_reattach_while_bysy test
async def reattach_while_busy(
env: NeonEnv, endpoint: Endpoint, pageserver_http: PageserverHttpClient, tenant_id: TenantId
):
workers = []
for _ in range(num_connections):
pg_conn = await endpoint.connect_async()
workers.append(asyncio.create_task(update_table(pg_conn)))
workers.append(asyncio.create_task(sleep_and_reattach(pageserver_http, tenant_id)))
await asyncio.gather(*workers)
assert updates_finished == updates_to_perform
# Detach and re-attach tenant, while compute is busy running queries.
#
@@ -226,6 +167,62 @@ def test_tenant_reattach_while_busy(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
updates_started = 0
updates_finished = 0
updates_to_perform = 0
# Run random UPDATEs on test table. On failure, try again.
async def update_table(pg_conn: asyncpg.Connection):
nonlocal updates_started, updates_finished, updates_to_perform
while updates_started < updates_to_perform or updates_to_perform == 0:
updates_started += 1
id = random.randrange(1, num_rows)
# Loop to retry until the UPDATE succeeds
while True:
try:
await pg_conn.fetchrow(f"UPDATE t SET counter = counter + 1 WHERE id = {id}")
updates_finished += 1
if updates_finished % 1000 == 0:
log.info(f"update {updates_finished} / {updates_to_perform}")
break
except asyncpg.PostgresError as e:
# Received error from Postgres. Log it, sleep a little, and continue
log.info(f"UPDATE error: {e}")
await asyncio.sleep(0.1)
async def sleep_and_reattach(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
nonlocal updates_started, updates_finished, updates_to_perform
# Wait until we have performed some updates
wait_until(20, 0.5, lambda: updates_finished > 500)
log.info("Detaching tenant")
pageserver_http.tenant_detach(tenant_id)
await asyncio.sleep(1)
log.info("Re-attaching tenant")
pageserver_http.tenant_attach(tenant_id)
log.info("Re-attach finished")
# Continue with 5000 more updates
updates_to_perform = updates_started + 5000
# async guts of test_tenant_reattach_while_bysy test
async def reattach_while_busy(
env: NeonEnv, endpoint: Endpoint, pageserver_http: PageserverHttpClient, tenant_id: TenantId
):
nonlocal updates_to_perform, updates_finished
workers = []
for _ in range(num_connections):
pg_conn = await endpoint.connect_async()
workers.append(asyncio.create_task(update_table(pg_conn)))
workers.append(asyncio.create_task(sleep_and_reattach(pageserver_http, tenant_id)))
await asyncio.gather(*workers)
assert updates_finished == updates_to_perform
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()

View File

@@ -0,0 +1,59 @@
import random
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.types import TimelineId
from fixtures.utils import print_gc_result, query_scalar
import pytest
@pytest.mark.parametrize('execution_number', range(10000))
def test_tmp(neon_env_builder: NeonEnvBuilder, execution_number):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
try:
endpoint = env.endpoints.create_start("main")
except Exception as e:
log.error(f"failed to create endpoint: {e}")
time.sleep(100)
raise e
log.info("postgres is running on main branch")
def exec(sql):
before_ts = time.time()
res = endpoint.safe_psql(sql)
after_ts = time.time()
log.info(f"executed in {after_ts - before_ts:.2f}s, result {res}")
if after_ts - before_ts > 1:
raise Exception(f"executed in {after_ts - before_ts:.2f}s, result {res}")
return res
time.sleep(random.random() * 4)
exec("SELECT 1")
exec("CREATE TABLE IF NOT EXISTS activity_v1 (\n\t\tid SERIAL PRIMARY KEY,\n\t\tnonce BIGINT,\n\t\tval FLOAT,\n\t\tcreated_at TIMESTAMP DEFAULT NOW()\n\t )")
exec("INSERT INTO activity_v1(nonce,val) SELECT 5122547621334681000 AS nonce, avg(id) AS val FROM activity_v1 RETURNING *")
time.sleep(random.random() * 4)
exec("SELECT 1")
exec("CREATE TABLE IF NOT EXISTS activity_v1 (\n\t\tid SERIAL PRIMARY KEY,\n\t\tnonce BIGINT,\n\t\tval FLOAT,\n\t\tcreated_at TIMESTAMP DEFAULT NOW()\n\t )")
exec("INSERT INTO activity_v1(nonce,val) SELECT 2137073174395130327 AS nonce, avg(id) AS val FROM activity_v1 RETURNING *")
exec("SELECT 1")
exec("CREATE TABLE IF NOT EXISTS activity_v1 (\n\t\tid SERIAL PRIMARY KEY,\n\t\tnonce BIGINT,\n\t\tval FLOAT,\n\t\tcreated_at TIMESTAMP DEFAULT NOW()\n\t )")
exec("INSERT INTO activity_v1(nonce,val) SELECT (random() * 100)::int AS nonce, avg(id) AS val FROM activity_v1 RETURNING *")
exec("SELECT 1")
exec("CREATE TABLE IF NOT EXISTS activity_v1 (\n\t\tid SERIAL PRIMARY KEY,\n\t\tnonce BIGINT,\n\t\tval FLOAT,\n\t\tcreated_at TIMESTAMP DEFAULT NOW()\n\t )")
exec("INSERT INTO activity_v1(nonce,val) SELECT (random() * 100)::int AS nonce, avg(id) AS val FROM activity_v1 RETURNING *")
exec("SELECT 1")
exec("CREATE TABLE IF NOT EXISTS activity_v1 (\n\t\tid SERIAL PRIMARY KEY,\n\t\tnonce BIGINT,\n\t\tval FLOAT,\n\t\tcreated_at TIMESTAMP DEFAULT NOW()\n\t )")
exec("INSERT INTO activity_v1(nonce,val) SELECT (random() * 100)::int AS nonce, avg(id) AS val FROM activity_v1 RETURNING *")
# ERROR: could not read relation existence of rel 1663/16386/16427.1 from page server at lsn 0/014A0888
# I2GH6gtFBpCz
#

View File

@@ -19,40 +19,18 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
# Install extension containing function needed for test
cur.execute("CREATE EXTENSION neon_test_utils")
# Create a test table for a few different scenarios and freeze it to set the VM bits.
# Create a test table and freeze it to set the VM bit.
cur.execute("CREATE TABLE vmtest_delete (id integer PRIMARY KEY)")
cur.execute("INSERT INTO vmtest_delete VALUES (1)")
cur.execute("VACUUM FREEZE vmtest_delete")
cur.execute("CREATE TABLE vmtest_hot_update (id integer PRIMARY KEY, filler text)")
cur.execute("INSERT INTO vmtest_hot_update VALUES (1, 'x')")
cur.execute("VACUUM FREEZE vmtest_hot_update")
cur.execute("CREATE TABLE vmtest_cold_update (id integer PRIMARY KEY)")
cur.execute("INSERT INTO vmtest_cold_update SELECT g FROM generate_series(1, 1000) g")
cur.execute("VACUUM FREEZE vmtest_cold_update")
cur.execute(
"CREATE TABLE vmtest_cold_update2 (id integer PRIMARY KEY, filler text) WITH (fillfactor=100)"
)
cur.execute("INSERT INTO vmtest_cold_update2 SELECT g, '' FROM generate_series(1, 1000) g")
cur.execute("VACUUM FREEZE vmtest_cold_update2")
cur.execute("CREATE TABLE vmtest_update (id integer PRIMARY KEY)")
cur.execute("INSERT INTO vmtest_update SELECT g FROM generate_series(1, 1000) g")
cur.execute("VACUUM FREEZE vmtest_update")
# DELETE and UPDATE the rows.
cur.execute("DELETE FROM vmtest_delete WHERE id = 1")
cur.execute("UPDATE vmtest_hot_update SET filler='x' WHERE id = 1")
cur.execute("UPDATE vmtest_cold_update SET id = 5000 WHERE id = 1")
# Clear the VM bit on the last page with an INSERT. Then clear the VM bit on
# the page where row 1 is (block 0), by doing an UPDATE. The UPDATE is a
# cold update, and the new tuple goes to the last page, which already had
# its VM bit cleared. The point is that the UPDATE *only* clears the VM bit
# on the page containing the old tuple. We had a bug where we got the old
# and new pages mixed up, and that only shows up when one of the bits is
# cleared, but not the other one.
cur.execute("INSERT INTO vmtest_cold_update2 VALUES (9999, 'x')")
# Clears the VM bit on the old page
cur.execute("UPDATE vmtest_cold_update2 SET id = 5000, filler=repeat('x', 200) WHERE id = 1")
cur.execute("UPDATE vmtest_update SET id = 5000 WHERE id = 1")
# Branch at this point, to test that later
fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "test_vm_bit_clear")
@@ -72,13 +50,9 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
"""
)
cur.execute("SELECT id FROM vmtest_delete WHERE id = 1")
cur.execute("SELECT * FROM vmtest_delete WHERE id = 1")
assert cur.fetchall() == []
cur.execute("SELECT id FROM vmtest_hot_update WHERE id = 1")
assert cur.fetchall() == [(1,)]
cur.execute("SELECT id FROM vmtest_cold_update WHERE id = 1")
assert cur.fetchall() == []
cur.execute("SELECT id FROM vmtest_cold_update2 WHERE id = 1")
cur.execute("SELECT * FROM vmtest_update WHERE id = 1")
assert cur.fetchall() == []
cur.close()
@@ -103,11 +77,7 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
"""
)
cur_new.execute("SELECT id FROM vmtest_delete WHERE id = 1")
cur_new.execute("SELECT * FROM vmtest_delete WHERE id = 1")
assert cur_new.fetchall() == []
cur_new.execute("SELECT id FROM vmtest_hot_update WHERE id = 1")
assert cur_new.fetchall() == [(1,)]
cur_new.execute("SELECT id FROM vmtest_cold_update WHERE id = 1")
assert cur_new.fetchall() == []
cur_new.execute("SELECT id FROM vmtest_cold_update2 WHERE id = 1")
cur_new.execute("SELECT * FROM vmtest_update WHERE id = 1")
assert cur_new.fetchall() == []

View File

@@ -14,6 +14,8 @@ from pathlib import Path
from typing import Any, List, Optional
import psycopg2
import psycopg2.errors
import psycopg2.extras
import pytest
from fixtures.broker import NeonBroker
from fixtures.log_helper import log
@@ -260,7 +262,7 @@ def test_restarts(neon_env_builder: NeonEnvBuilder):
else:
failed_node.start()
failed_node = None
assert query_scalar(cur, "SELECT sum(key) FROM t") == 500500
assert query_scalar(cur, "SELECT sum(key) FROM t") == (n_inserts * (n_inserts + 1)) // 2
# Test that safekeepers push their info to the broker and learn peer status from it