mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-08 21:20:38 +00:00
Compare commits
3 Commits
release-39
...
conf-from-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e04ee76e4 | ||
|
|
3895829bda | ||
|
|
ffd146c3e5 |
@@ -124,21 +124,8 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
RUN apt update && \
|
||||
apt install -y ninja-build python3-dev libncurses5 binutils clang
|
||||
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14" | "v15") \
|
||||
export PLV8_VERSION=3.1.5 \
|
||||
export PLV8_CHECKSUM=1e108d5df639e4c189e1c5bdfa2432a521c126ca89e7e5a969d46899ca7bf106 \
|
||||
;; \
|
||||
"v16") \
|
||||
export PLV8_VERSION=3.1.8 \
|
||||
export PLV8_CHECKSUM=92b10c7db39afdae97ff748c9ec54713826af222c459084ad002571b79eb3f49 \
|
||||
;; \
|
||||
*) \
|
||||
echo "Export the valid PG_VERSION variable" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
wget https://github.com/plv8/plv8/archive/refs/tags/v${PLV8_VERSION}.tar.gz -O plv8.tar.gz && \
|
||||
echo "${PLV8_CHECKSUM} plv8.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.8.tar.gz -O plv8.tar.gz && \
|
||||
echo "92b10c7db39afdae97ff748c9ec54713826af222c459084ad002571b79eb3f49 plv8.tar.gz" | sha256sum --check && \
|
||||
mkdir plv8-src && cd plv8-src && tar xvzf ../plv8.tar.gz --strip-components=1 -C . && \
|
||||
export PATH="/usr/local/pgsql/bin:$PATH" && \
|
||||
make DOCKER=1 -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
@@ -429,7 +416,7 @@ RUN case "${PG_VERSION}" in \
|
||||
;; \
|
||||
"v16") \
|
||||
export PG_HINT_PLAN_VERSION=16_1_6_0 \
|
||||
export PG_HINT_PLAN_CHECKSUM=fc85a9212e7d2819d4ae4ac75817481101833c3cfa9f0fe1f980984e12347d00 \
|
||||
export PG_HINT_PLAN_CHECKSUM=ce6a8040c78012000f5da7240caf6a971401412f41d33f930f09291e6c304b99 \
|
||||
;; \
|
||||
*) \
|
||||
echo "Export the valid PG_HINT_PLAN_VERSION variable" && exit 1 \
|
||||
|
||||
@@ -45,6 +45,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use compute_api::spec::GenericOption;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
@@ -478,29 +479,192 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
// Create spec file
|
||||
let spec = ComputeSpec {
|
||||
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
|
||||
format_version: 1.0,
|
||||
operation_uuid: None,
|
||||
cluster: Cluster {
|
||||
cluster_id: None, // project ID: not used
|
||||
name: None, // project name: not used
|
||||
state: None,
|
||||
roles: vec![],
|
||||
databases: vec![],
|
||||
settings: None,
|
||||
postgresql_conf: Some(postgresql_conf),
|
||||
let raw_spec = r#"
|
||||
{
|
||||
"cluster": {
|
||||
"cluster_id": "young-forest-08365916",
|
||||
"name": "young-forest-08365916",
|
||||
"settings": [
|
||||
{
|
||||
"name": "listen_addresses",
|
||||
"value": "*",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "wal_level",
|
||||
"value": "replica",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "max_wal_size",
|
||||
"value": "1024",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_parallel_workers",
|
||||
"value": "8",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_connections",
|
||||
"value": "112",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "wal_sender_timeout",
|
||||
"value": "10000",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "synchronous_standby_names",
|
||||
"value": "walproposer",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "effective_io_concurrency",
|
||||
"value": "100",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "shared_preload_libraries",
|
||||
"value": "neon",
|
||||
"vartype": "string"
|
||||
},
|
||||
{
|
||||
"name": "max_replication_slots",
|
||||
"value": "10",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "neon.max_cluster_size",
|
||||
"value": "204800",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "fsync",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "max_replication_write_lag",
|
||||
"value": "15",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "wal_log_hints",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "maintenance_io_concurrency",
|
||||
"value": "100",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_worker_processes",
|
||||
"value": "8",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "idle_in_transaction_session_timeout",
|
||||
"value": "300000",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "password_encryption",
|
||||
"value": "scram-sha-256",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "max_replication_flush_lag",
|
||||
"value": "10240",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "max_wal_senders",
|
||||
"value": "10",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "restart_after_crash",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "shared_buffers",
|
||||
"value": "65536",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "hot_standby",
|
||||
"value": "off",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "superuser_reserved_connections",
|
||||
"value": "4",
|
||||
"vartype": "integer"
|
||||
},
|
||||
{
|
||||
"name": "maintenance_work_mem",
|
||||
"value": "65536",
|
||||
"vartype": "integer"
|
||||
}
|
||||
],
|
||||
"roles": [
|
||||
{
|
||||
"name": "arthur",
|
||||
"encrypted_password": "SCRAM-SHA-256$4096:goIf+DVnIR1HglUPCerpKw==$cCdVfO1J81t5q6Ry54Eu9wJyjtm/Q5gu281MHbV/0r8=:1vyoQkj0Z/rMEzZ96QcoSECrYckSvQNgWWIL7x2Lcyw=",
|
||||
"options": []
|
||||
}
|
||||
],
|
||||
"databases": [
|
||||
{
|
||||
"name": "neondb",
|
||||
"owner": "arthur",
|
||||
"options": null
|
||||
}
|
||||
]
|
||||
},
|
||||
delta_operations: None,
|
||||
tenant_id: Some(self.tenant_id),
|
||||
timeline_id: Some(self.timeline_id),
|
||||
mode: self.mode,
|
||||
pageserver_connstring: Some(pageserver_connstring),
|
||||
safekeeper_connstrings,
|
||||
storage_auth_token: auth_token.clone(),
|
||||
remote_extensions: None,
|
||||
};
|
||||
"skip_pg_catalog_updates": false,
|
||||
"mode": "Primary",
|
||||
"delta_operations": null,
|
||||
"format_version": 1,
|
||||
"operation_uuid": "9eaaac44-1d3b-405f-abd9-3562f633c6b5",
|
||||
"timestamp": "2023-09-14T16:10:56.936947442Z",
|
||||
"storage_auth_token": ""
|
||||
}
|
||||
"#;
|
||||
|
||||
let mut spec: ComputeSpec = serde_json::from_str(raw_spec)?;
|
||||
if let Some(ref mut settings) = spec.cluster.settings {
|
||||
settings.push(GenericOption{
|
||||
name: "neon.safekeepers".to_owned(),
|
||||
value: Some(safekeeper_connstrings.join(",")),
|
||||
vartype: "string".to_owned(),
|
||||
});
|
||||
settings.push(GenericOption{
|
||||
name: "neon.pageserver_connstring".to_owned(),
|
||||
value: Some(pageserver_connstring),
|
||||
vartype: "string".to_owned(),
|
||||
});
|
||||
settings.push(GenericOption{
|
||||
name: "neon.tenant_id".to_owned(),
|
||||
value: Some(self.tenant_id.to_string()),
|
||||
vartype: "string".to_owned(),
|
||||
});
|
||||
settings.push(GenericOption{
|
||||
name: "neon.timeline_id".to_owned(),
|
||||
value: Some(self.timeline_id.to_string()),
|
||||
vartype: "string".to_owned(),
|
||||
});
|
||||
settings.push(GenericOption{
|
||||
name: "port".to_owned(),
|
||||
value: Some(self.pg_address.port().to_string()),
|
||||
vartype: "string".to_owned(),
|
||||
});
|
||||
}
|
||||
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
|
||||
|
||||
@@ -470,14 +470,14 @@ impl<'a> WalIngest<'a> {
|
||||
// we can't validate the remaining number of bytes without parsing
|
||||
// the tuple data.
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
||||
// non-HOT update where the new tuple goes to different page than
|
||||
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
||||
// set.
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
new_heap_blkno = Some(decoded.blocks[1].blkno);
|
||||
}
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
@@ -526,14 +526,14 @@ impl<'a> WalIngest<'a> {
|
||||
// we can't validate the remaining number of bytes without parsing
|
||||
// the tuple data.
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
||||
// non-HOT update where the new tuple goes to different page than
|
||||
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
||||
// set.
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
new_heap_blkno = Some(decoded.blocks[1].blkno);
|
||||
}
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
@@ -582,14 +582,14 @@ impl<'a> WalIngest<'a> {
|
||||
// we can't validate the remaining number of bytes without parsing
|
||||
// the tuple data.
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
||||
// non-HOT update where the new tuple goes to different page than
|
||||
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
||||
// set.
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
new_heap_blkno = Some(decoded.blocks[1].blkno);
|
||||
}
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
|
||||
@@ -745,14 +745,14 @@ impl<'a> WalIngest<'a> {
|
||||
// we can't validate the remaining number of bytes without parsing
|
||||
// the tuple data.
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
|
||||
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
|
||||
old_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
}
|
||||
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
|
||||
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
|
||||
// non-HOT update where the new tuple goes to different page than
|
||||
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
|
||||
// set.
|
||||
new_heap_blkno = Some(decoded.blocks[0].blkno);
|
||||
new_heap_blkno = Some(decoded.blocks[1].blkno);
|
||||
}
|
||||
}
|
||||
pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
pytest_plugins = (
|
||||
"fixtures.pg_version",
|
||||
"fixtures.parametrize",
|
||||
"fixtures.httpserver",
|
||||
"fixtures.neon_fixtures",
|
||||
"fixtures.benchmark_fixture",
|
||||
"fixtures.pg_stats",
|
||||
|
||||
45
test_runner/fixtures/httpserver.py
Normal file
45
test_runner/fixtures/httpserver.py
Normal file
@@ -0,0 +1,45 @@
|
||||
from typing import Tuple
|
||||
|
||||
import pytest
|
||||
from pytest_httpserver import HTTPServer
|
||||
|
||||
# TODO: mypy fails with:
|
||||
# Module "fixtures.neon_fixtures" does not explicitly export attribute "PortDistributor" [attr-defined]
|
||||
# from fixtures.neon_fixtures import PortDistributor
|
||||
|
||||
# compared to the fixtures from pytest_httpserver with same names, these are
|
||||
# always function scoped, so you can check and stop the server in tests.
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def httpserver_ssl_context():
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def make_httpserver(httpserver_listen_address, httpserver_ssl_context):
|
||||
host, port = httpserver_listen_address
|
||||
if not host:
|
||||
host = HTTPServer.DEFAULT_LISTEN_HOST
|
||||
if not port:
|
||||
port = HTTPServer.DEFAULT_LISTEN_PORT
|
||||
|
||||
server = HTTPServer(host=host, port=port, ssl_context=httpserver_ssl_context)
|
||||
server.start()
|
||||
yield server
|
||||
server.clear()
|
||||
if server.is_running():
|
||||
server.stop()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def httpserver(make_httpserver):
|
||||
server = make_httpserver
|
||||
yield server
|
||||
server.clear()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def httpserver_listen_address(port_distributor) -> Tuple[str, int]:
|
||||
port = port_distributor.get_port()
|
||||
return ("localhost", port)
|
||||
@@ -223,12 +223,6 @@ def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistrib
|
||||
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def httpserver_listen_address(port_distributor: PortDistributor):
|
||||
port = port_distributor.get_port()
|
||||
return ("localhost", port)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def default_broker(
|
||||
port_distributor: PortDistributor,
|
||||
@@ -2314,7 +2308,7 @@ class Endpoint(PgProtocol):
|
||||
http_port: int,
|
||||
check_stop_result: bool = True,
|
||||
):
|
||||
super().__init__(host="localhost", port=pg_port, user="cloud_admin", dbname="postgres")
|
||||
super().__init__(host="localhost", port=pg_port, user="arthur", dbname="neondb", password="I2GH6gtFBpCz")
|
||||
self.env = env
|
||||
self.running = False
|
||||
self.branch_name: Optional[str] = None # dubious
|
||||
|
||||
@@ -42,12 +42,11 @@ def handle_role(dbs, roles, operation):
|
||||
raise ValueError("Invalid op")
|
||||
|
||||
|
||||
fail = False
|
||||
|
||||
|
||||
def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response:
|
||||
def ddl_forward_handler(
|
||||
request: Request, dbs: Dict[str, str], roles: Dict[str, str], ddl: "DdlForwardingContext"
|
||||
) -> Response:
|
||||
log.info(f"Received request with data {request.get_data(as_text=True)}")
|
||||
if fail:
|
||||
if ddl.fail:
|
||||
log.info("FAILING")
|
||||
return Response(status=500, response="Failed just cuz")
|
||||
if request.json is None:
|
||||
@@ -72,6 +71,7 @@ class DdlForwardingContext:
|
||||
self.port = port
|
||||
self.dbs: Dict[str, str] = {}
|
||||
self.roles: Dict[str, str] = {}
|
||||
self.fail = False
|
||||
endpoint = "/management/api/v2/roles_and_databases"
|
||||
ddl_url = f"http://{host}:{port}{endpoint}"
|
||||
self.pg.configure(
|
||||
@@ -82,7 +82,7 @@ class DdlForwardingContext:
|
||||
)
|
||||
log.info(f"Listening on {ddl_url}")
|
||||
self.server.expect_request(endpoint, method="PATCH").respond_with_handler(
|
||||
lambda request: ddl_forward_handler(request, self.dbs, self.roles)
|
||||
lambda request: ddl_forward_handler(request, self.dbs, self.roles, self)
|
||||
)
|
||||
|
||||
def __enter__(self):
|
||||
@@ -103,6 +103,9 @@ class DdlForwardingContext:
|
||||
def wait(self, timeout=3):
|
||||
self.server.wait(timeout=timeout)
|
||||
|
||||
def failures(self, bool):
|
||||
self.fail = bool
|
||||
|
||||
def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]:
|
||||
res = self.send(query)
|
||||
self.wait(timeout=timeout)
|
||||
@@ -203,9 +206,9 @@ def test_ddl_forwarding(ddl: DdlForwardingContext):
|
||||
assert ddl.dbs == {"stork": "cork"}
|
||||
|
||||
with pytest.raises(psycopg2.InternalError):
|
||||
global fail
|
||||
fail = True
|
||||
ddl.failures(True)
|
||||
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
|
||||
ddl.wait()
|
||||
|
||||
ddl.failures(False)
|
||||
conn.close()
|
||||
|
||||
@@ -15,45 +15,45 @@ from fixtures.types import TimelineId
|
||||
|
||||
# Test configuration
|
||||
#
|
||||
# Create a table with {num_rows} rows, and perform {updates_to_perform} random
|
||||
# UPDATEs on it, using {num_connections} separate connections.
|
||||
num_connections = 10
|
||||
num_rows = 100000
|
||||
updates_to_perform = 10000
|
||||
|
||||
updates_performed = 0
|
||||
|
||||
|
||||
# Run random UPDATEs on test table
|
||||
async def update_table(endpoint: Endpoint):
|
||||
global updates_performed
|
||||
pg_conn = await endpoint.connect_async()
|
||||
|
||||
while updates_performed < updates_to_perform:
|
||||
updates_performed += 1
|
||||
id = random.randrange(1, num_rows)
|
||||
await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}")
|
||||
|
||||
|
||||
# Perform aggressive GC with 0 horizon
|
||||
async def gc(env: NeonEnv, timeline: TimelineId):
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
def do_gc():
|
||||
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
|
||||
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
||||
while updates_performed < updates_to_perform:
|
||||
await loop.run_in_executor(pool, do_gc)
|
||||
# Create a table with {NUM_ROWS} rows, and perform {UPDATES_TO_PERFORM} random
|
||||
# UPDATEs on it, using {NUM_CONNECTIONS} separate connections.
|
||||
NUM_CONNECTIONS = 10
|
||||
NUM_ROWS = 100000
|
||||
UPDATES_TO_PERFORM = 10000
|
||||
|
||||
|
||||
# At the same time, run UPDATEs and GC
|
||||
async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId):
|
||||
workers = []
|
||||
for _ in range(num_connections):
|
||||
updates_performed = 0
|
||||
|
||||
# Perform aggressive GC with 0 horizon
|
||||
async def gc(env: NeonEnv, timeline: TimelineId):
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
nonlocal updates_performed
|
||||
global UPDATES_TO_PERFORM
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
def do_gc():
|
||||
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
|
||||
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
||||
while updates_performed < UPDATES_TO_PERFORM:
|
||||
await loop.run_in_executor(pool, do_gc)
|
||||
|
||||
# Run random UPDATEs on test table
|
||||
async def update_table(endpoint: Endpoint):
|
||||
pg_conn = await endpoint.connect_async()
|
||||
nonlocal updates_performed
|
||||
|
||||
while updates_performed < UPDATES_TO_PERFORM:
|
||||
updates_performed += 1
|
||||
id = random.randrange(1, NUM_ROWS)
|
||||
await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}")
|
||||
|
||||
for _ in range(NUM_CONNECTIONS):
|
||||
workers.append(asyncio.create_task(update_table(endpoint)))
|
||||
workers.append(asyncio.create_task(gc(env, timeline)))
|
||||
|
||||
@@ -81,7 +81,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
|
||||
f"""
|
||||
INSERT INTO foo
|
||||
SELECT g, 0, 'long string to consume some space' || g
|
||||
FROM generate_series(1, {num_rows}) g
|
||||
FROM generate_series(1, {NUM_ROWS}) g
|
||||
"""
|
||||
)
|
||||
cur.execute("CREATE INDEX ON foo(id)")
|
||||
@@ -91,7 +91,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
|
||||
cur.execute("SELECT COUNT(*), SUM(counter) FROM foo")
|
||||
r = cur.fetchone()
|
||||
assert r is not None
|
||||
assert r == (num_rows, updates_to_perform)
|
||||
assert r == (NUM_ROWS, UPDATES_TO_PERFORM)
|
||||
|
||||
|
||||
#
|
||||
@@ -99,6 +99,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
|
||||
def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
|
||||
# Disable time-based pitr, we will use LSN-based thresholds in the manual GC calls
|
||||
neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
|
||||
num_index_uploads = 0
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
@@ -160,5 +161,5 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
|
||||
log.info(f"{num_index_uploads} index uploads after GC iteration {i}")
|
||||
|
||||
after = num_index_uploads
|
||||
log.info(f"{after-before} new index uploads during test")
|
||||
log.info(f"{after - before} new index uploads during test")
|
||||
assert after - before < 5
|
||||
|
||||
@@ -3,9 +3,9 @@
|
||||
# Use mock HTTP server to receive metrics and verify that they look sane.
|
||||
#
|
||||
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Iterator
|
||||
from queue import SimpleQueue
|
||||
from typing import Any, Iterator, Set
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -18,56 +18,10 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.types import TenantId
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
# ==============================================================================
|
||||
# Storage metrics tests
|
||||
# ==============================================================================
|
||||
|
||||
initial_tenant = TenantId.generate()
|
||||
remote_uploaded = 0
|
||||
checks = {
|
||||
"written_size": lambda value: value > 0,
|
||||
"resident_size": lambda value: value >= 0,
|
||||
# >= 0 check here is to avoid race condition when we receive metrics before
|
||||
# remote_uploaded is updated
|
||||
"remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value >= 0,
|
||||
# logical size may lag behind the actual size, so allow 0 here
|
||||
"timeline_logical_size": lambda value: value >= 0,
|
||||
}
|
||||
|
||||
metric_kinds_checked = set([])
|
||||
|
||||
|
||||
#
|
||||
# verify that metrics look minilally sane
|
||||
#
|
||||
def metrics_handler(request: Request) -> Response:
|
||||
if request.json is None:
|
||||
return Response(status=400)
|
||||
|
||||
events = request.json["events"]
|
||||
log.info("received events:")
|
||||
log.info(events)
|
||||
|
||||
for event in events:
|
||||
assert event["tenant_id"] == str(
|
||||
initial_tenant
|
||||
), "Expecting metrics only from the initial tenant"
|
||||
metric_name = event["metric"]
|
||||
|
||||
check = checks.get(metric_name)
|
||||
# calm down mypy
|
||||
if check is not None:
|
||||
assert check(event["value"]), f"{metric_name} isn't valid"
|
||||
global metric_kinds_checked
|
||||
metric_kinds_checked.add(metric_name)
|
||||
|
||||
return Response(status=200)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.LOCAL_FS]
|
||||
@@ -81,6 +35,18 @@ def test_metric_collection(
|
||||
(host, port) = httpserver_listen_address
|
||||
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
|
||||
metric_kinds_checked: Set[str] = set([])
|
||||
|
||||
uploads: SimpleQueue[Any] = SimpleQueue()
|
||||
|
||||
def metrics_handler(request: Request) -> Response:
|
||||
if request.json is None:
|
||||
return Response(status=400)
|
||||
|
||||
events = request.json["events"]
|
||||
uploads.put(events)
|
||||
return Response(status=200)
|
||||
|
||||
# Require collecting metrics frequently, since we change
|
||||
# the timeline and want something to be logged about it.
|
||||
#
|
||||
@@ -90,6 +56,7 @@ def test_metric_collection(
|
||||
f"""
|
||||
metric_collection_interval="1s"
|
||||
metric_collection_endpoint="{metric_collection_endpoint}"
|
||||
cached_metric_collection_interval="0s"
|
||||
"""
|
||||
+ "tenant_config={pitr_interval = '0 sec'}"
|
||||
)
|
||||
@@ -98,9 +65,6 @@ def test_metric_collection(
|
||||
|
||||
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
|
||||
|
||||
# Set initial tenant of the test, that we expect the logs from
|
||||
global initial_tenant
|
||||
initial_tenant = neon_env_builder.initial_tenant
|
||||
# mock http server that returns OK for the metrics
|
||||
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
|
||||
metrics_handler
|
||||
@@ -108,8 +72,7 @@ def test_metric_collection(
|
||||
|
||||
# spin up neon, after http server is ready
|
||||
env = neon_env_builder.init_start()
|
||||
# Order of fixtures shutdown is not specified, and if http server gets down
|
||||
# before pageserver, pageserver log might contain such errors in the end.
|
||||
# httpserver is shut down before pageserver during passing run
|
||||
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_metric_collection")
|
||||
@@ -141,29 +104,74 @@ def test_metric_collection(
|
||||
total += sample[2]
|
||||
return int(total)
|
||||
|
||||
remote_uploaded = 0
|
||||
|
||||
# upload some data to remote storage
|
||||
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
|
||||
global remote_uploaded
|
||||
|
||||
remote_uploaded = get_num_remote_ops("index", "upload")
|
||||
assert remote_uploaded > 0
|
||||
|
||||
# wait longer than collecting interval and check that all requests are served
|
||||
time.sleep(3)
|
||||
httpserver.check()
|
||||
global metric_kinds_checked, checks
|
||||
# we expect uploads at 1Hz, on busy runners this could be too optimistic,
|
||||
# so give 5s we only want to get the following upload after "ready" value.
|
||||
# later tests will be added to ensure that the timeseries are sane.
|
||||
timeout = 5
|
||||
uploads.put("ready")
|
||||
|
||||
while True:
|
||||
# discard earlier than "ready"
|
||||
log.info("waiting for upload")
|
||||
events = uploads.get(timeout=timeout)
|
||||
import json
|
||||
|
||||
if events == "ready":
|
||||
events = uploads.get(timeout=timeout)
|
||||
httpserver.check()
|
||||
httpserver.stop()
|
||||
# if anything comes after this, we'll just ignore it
|
||||
stringified = json.dumps(events, indent=2)
|
||||
log.info(f"inspecting: {stringified}")
|
||||
break
|
||||
else:
|
||||
stringified = json.dumps(events, indent=2)
|
||||
log.info(f"discarding: {stringified}")
|
||||
|
||||
# verify that metrics look minimally sane
|
||||
checks = {
|
||||
"written_size": lambda value: value > 0,
|
||||
"resident_size": lambda value: value >= 0,
|
||||
"remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value == 0,
|
||||
# logical size may lag behind the actual size, so allow 0 here
|
||||
"timeline_logical_size": lambda value: value >= 0,
|
||||
# this can also be zero, depending on when we get the value
|
||||
"written_data_bytes_delta": lambda value: value >= 0,
|
||||
}
|
||||
|
||||
metric_kinds_checked = set()
|
||||
metric_kinds_seen = set()
|
||||
|
||||
for event in events:
|
||||
assert event["tenant_id"] == str(tenant_id)
|
||||
metric_name = event["metric"]
|
||||
metric_kinds_seen.add(metric_name)
|
||||
|
||||
check = checks.get(metric_name)
|
||||
# calm down mypy
|
||||
if check is not None:
|
||||
value = event["value"]
|
||||
log.info(f"checking {metric_name} value {value}")
|
||||
assert check(value), f"{metric_name} isn't valid"
|
||||
metric_kinds_checked.add(metric_name)
|
||||
|
||||
expected_checks = set(checks.keys())
|
||||
assert len(metric_kinds_checked) == len(
|
||||
checks
|
||||
assert (
|
||||
metric_kinds_checked == checks.keys()
|
||||
), f"Expected to receive and check all kind of metrics, but {expected_checks - metric_kinds_checked} got uncovered"
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# Proxy metrics tests
|
||||
# ==============================================================================
|
||||
assert metric_kinds_seen == metric_kinds_checked
|
||||
|
||||
|
||||
def proxy_metrics_handler(request: Request) -> Response:
|
||||
|
||||
@@ -301,6 +301,7 @@ def test_ondemand_download_timetravel(
|
||||
# they are present only in the remote storage, only locally, or both.
|
||||
# It should not change.
|
||||
assert filled_current_physical == get_api_current_physical_size()
|
||||
endpoint_old.stop()
|
||||
|
||||
|
||||
#
|
||||
|
||||
@@ -119,65 +119,6 @@ def test_tenant_reattach(
|
||||
|
||||
num_connections = 10
|
||||
num_rows = 100000
|
||||
updates_to_perform = 0
|
||||
|
||||
updates_started = 0
|
||||
updates_finished = 0
|
||||
|
||||
|
||||
# Run random UPDATEs on test table. On failure, try again.
|
||||
async def update_table(pg_conn: asyncpg.Connection):
|
||||
global updates_started, updates_finished, updates_to_perform
|
||||
|
||||
while updates_started < updates_to_perform or updates_to_perform == 0:
|
||||
updates_started += 1
|
||||
id = random.randrange(1, num_rows)
|
||||
|
||||
# Loop to retry until the UPDATE succeeds
|
||||
while True:
|
||||
try:
|
||||
await pg_conn.fetchrow(f"UPDATE t SET counter = counter + 1 WHERE id = {id}")
|
||||
updates_finished += 1
|
||||
if updates_finished % 1000 == 0:
|
||||
log.info(f"update {updates_finished} / {updates_to_perform}")
|
||||
break
|
||||
except asyncpg.PostgresError as e:
|
||||
# Received error from Postgres. Log it, sleep a little, and continue
|
||||
log.info(f"UPDATE error: {e}")
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
|
||||
async def sleep_and_reattach(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
|
||||
global updates_started, updates_finished, updates_to_perform
|
||||
|
||||
# Wait until we have performed some updates
|
||||
wait_until(20, 0.5, lambda: updates_finished > 500)
|
||||
|
||||
log.info("Detaching tenant")
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
await asyncio.sleep(1)
|
||||
log.info("Re-attaching tenant")
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
log.info("Re-attach finished")
|
||||
|
||||
# Continue with 5000 more updates
|
||||
updates_to_perform = updates_started + 5000
|
||||
|
||||
|
||||
# async guts of test_tenant_reattach_while_bysy test
|
||||
async def reattach_while_busy(
|
||||
env: NeonEnv, endpoint: Endpoint, pageserver_http: PageserverHttpClient, tenant_id: TenantId
|
||||
):
|
||||
workers = []
|
||||
for _ in range(num_connections):
|
||||
pg_conn = await endpoint.connect_async()
|
||||
workers.append(asyncio.create_task(update_table(pg_conn)))
|
||||
|
||||
workers.append(asyncio.create_task(sleep_and_reattach(pageserver_http, tenant_id)))
|
||||
await asyncio.gather(*workers)
|
||||
|
||||
assert updates_finished == updates_to_perform
|
||||
|
||||
|
||||
# Detach and re-attach tenant, while compute is busy running queries.
|
||||
#
|
||||
@@ -226,6 +167,62 @@ def test_tenant_reattach_while_busy(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
):
|
||||
updates_started = 0
|
||||
updates_finished = 0
|
||||
updates_to_perform = 0
|
||||
|
||||
# Run random UPDATEs on test table. On failure, try again.
|
||||
async def update_table(pg_conn: asyncpg.Connection):
|
||||
nonlocal updates_started, updates_finished, updates_to_perform
|
||||
|
||||
while updates_started < updates_to_perform or updates_to_perform == 0:
|
||||
updates_started += 1
|
||||
id = random.randrange(1, num_rows)
|
||||
|
||||
# Loop to retry until the UPDATE succeeds
|
||||
while True:
|
||||
try:
|
||||
await pg_conn.fetchrow(f"UPDATE t SET counter = counter + 1 WHERE id = {id}")
|
||||
updates_finished += 1
|
||||
if updates_finished % 1000 == 0:
|
||||
log.info(f"update {updates_finished} / {updates_to_perform}")
|
||||
break
|
||||
except asyncpg.PostgresError as e:
|
||||
# Received error from Postgres. Log it, sleep a little, and continue
|
||||
log.info(f"UPDATE error: {e}")
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
async def sleep_and_reattach(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
|
||||
nonlocal updates_started, updates_finished, updates_to_perform
|
||||
|
||||
# Wait until we have performed some updates
|
||||
wait_until(20, 0.5, lambda: updates_finished > 500)
|
||||
|
||||
log.info("Detaching tenant")
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
await asyncio.sleep(1)
|
||||
log.info("Re-attaching tenant")
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
log.info("Re-attach finished")
|
||||
|
||||
# Continue with 5000 more updates
|
||||
updates_to_perform = updates_started + 5000
|
||||
|
||||
# async guts of test_tenant_reattach_while_bysy test
|
||||
async def reattach_while_busy(
|
||||
env: NeonEnv, endpoint: Endpoint, pageserver_http: PageserverHttpClient, tenant_id: TenantId
|
||||
):
|
||||
nonlocal updates_to_perform, updates_finished
|
||||
workers = []
|
||||
for _ in range(num_connections):
|
||||
pg_conn = await endpoint.connect_async()
|
||||
workers.append(asyncio.create_task(update_table(pg_conn)))
|
||||
|
||||
workers.append(asyncio.create_task(sleep_and_reattach(pageserver_http, tenant_id)))
|
||||
await asyncio.gather(*workers)
|
||||
|
||||
assert updates_finished == updates_to_perform
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
|
||||
59
test_runner/regress/test_tmp.py
Normal file
59
test_runner/regress/test_tmp.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import random
|
||||
import time
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.types import TimelineId
|
||||
from fixtures.utils import print_gc_result, query_scalar
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.parametrize('execution_number', range(10000))
|
||||
def test_tmp(neon_env_builder: NeonEnvBuilder, execution_number):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
try:
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
except Exception as e:
|
||||
log.error(f"failed to create endpoint: {e}")
|
||||
time.sleep(100)
|
||||
raise e
|
||||
|
||||
log.info("postgres is running on main branch")
|
||||
|
||||
def exec(sql):
|
||||
before_ts = time.time()
|
||||
res = endpoint.safe_psql(sql)
|
||||
after_ts = time.time()
|
||||
log.info(f"executed in {after_ts - before_ts:.2f}s, result {res}")
|
||||
if after_ts - before_ts > 1:
|
||||
raise Exception(f"executed in {after_ts - before_ts:.2f}s, result {res}")
|
||||
return res
|
||||
|
||||
time.sleep(random.random() * 4)
|
||||
|
||||
exec("SELECT 1")
|
||||
exec("CREATE TABLE IF NOT EXISTS activity_v1 (\n\t\tid SERIAL PRIMARY KEY,\n\t\tnonce BIGINT,\n\t\tval FLOAT,\n\t\tcreated_at TIMESTAMP DEFAULT NOW()\n\t )")
|
||||
exec("INSERT INTO activity_v1(nonce,val) SELECT 5122547621334681000 AS nonce, avg(id) AS val FROM activity_v1 RETURNING *")
|
||||
|
||||
time.sleep(random.random() * 4)
|
||||
|
||||
exec("SELECT 1")
|
||||
exec("CREATE TABLE IF NOT EXISTS activity_v1 (\n\t\tid SERIAL PRIMARY KEY,\n\t\tnonce BIGINT,\n\t\tval FLOAT,\n\t\tcreated_at TIMESTAMP DEFAULT NOW()\n\t )")
|
||||
exec("INSERT INTO activity_v1(nonce,val) SELECT 2137073174395130327 AS nonce, avg(id) AS val FROM activity_v1 RETURNING *")
|
||||
|
||||
exec("SELECT 1")
|
||||
exec("CREATE TABLE IF NOT EXISTS activity_v1 (\n\t\tid SERIAL PRIMARY KEY,\n\t\tnonce BIGINT,\n\t\tval FLOAT,\n\t\tcreated_at TIMESTAMP DEFAULT NOW()\n\t )")
|
||||
exec("INSERT INTO activity_v1(nonce,val) SELECT (random() * 100)::int AS nonce, avg(id) AS val FROM activity_v1 RETURNING *")
|
||||
|
||||
exec("SELECT 1")
|
||||
exec("CREATE TABLE IF NOT EXISTS activity_v1 (\n\t\tid SERIAL PRIMARY KEY,\n\t\tnonce BIGINT,\n\t\tval FLOAT,\n\t\tcreated_at TIMESTAMP DEFAULT NOW()\n\t )")
|
||||
exec("INSERT INTO activity_v1(nonce,val) SELECT (random() * 100)::int AS nonce, avg(id) AS val FROM activity_v1 RETURNING *")
|
||||
|
||||
exec("SELECT 1")
|
||||
exec("CREATE TABLE IF NOT EXISTS activity_v1 (\n\t\tid SERIAL PRIMARY KEY,\n\t\tnonce BIGINT,\n\t\tval FLOAT,\n\t\tcreated_at TIMESTAMP DEFAULT NOW()\n\t )")
|
||||
exec("INSERT INTO activity_v1(nonce,val) SELECT (random() * 100)::int AS nonce, avg(id) AS val FROM activity_v1 RETURNING *")
|
||||
|
||||
# ERROR: could not read relation existence of rel 1663/16386/16427.1 from page server at lsn 0/014A0888
|
||||
# I2GH6gtFBpCz
|
||||
#
|
||||
|
||||
@@ -19,40 +19,18 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
# Install extension containing function needed for test
|
||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
# Create a test table for a few different scenarios and freeze it to set the VM bits.
|
||||
# Create a test table and freeze it to set the VM bit.
|
||||
cur.execute("CREATE TABLE vmtest_delete (id integer PRIMARY KEY)")
|
||||
cur.execute("INSERT INTO vmtest_delete VALUES (1)")
|
||||
cur.execute("VACUUM FREEZE vmtest_delete")
|
||||
|
||||
cur.execute("CREATE TABLE vmtest_hot_update (id integer PRIMARY KEY, filler text)")
|
||||
cur.execute("INSERT INTO vmtest_hot_update VALUES (1, 'x')")
|
||||
cur.execute("VACUUM FREEZE vmtest_hot_update")
|
||||
|
||||
cur.execute("CREATE TABLE vmtest_cold_update (id integer PRIMARY KEY)")
|
||||
cur.execute("INSERT INTO vmtest_cold_update SELECT g FROM generate_series(1, 1000) g")
|
||||
cur.execute("VACUUM FREEZE vmtest_cold_update")
|
||||
|
||||
cur.execute(
|
||||
"CREATE TABLE vmtest_cold_update2 (id integer PRIMARY KEY, filler text) WITH (fillfactor=100)"
|
||||
)
|
||||
cur.execute("INSERT INTO vmtest_cold_update2 SELECT g, '' FROM generate_series(1, 1000) g")
|
||||
cur.execute("VACUUM FREEZE vmtest_cold_update2")
|
||||
cur.execute("CREATE TABLE vmtest_update (id integer PRIMARY KEY)")
|
||||
cur.execute("INSERT INTO vmtest_update SELECT g FROM generate_series(1, 1000) g")
|
||||
cur.execute("VACUUM FREEZE vmtest_update")
|
||||
|
||||
# DELETE and UPDATE the rows.
|
||||
cur.execute("DELETE FROM vmtest_delete WHERE id = 1")
|
||||
cur.execute("UPDATE vmtest_hot_update SET filler='x' WHERE id = 1")
|
||||
cur.execute("UPDATE vmtest_cold_update SET id = 5000 WHERE id = 1")
|
||||
|
||||
# Clear the VM bit on the last page with an INSERT. Then clear the VM bit on
|
||||
# the page where row 1 is (block 0), by doing an UPDATE. The UPDATE is a
|
||||
# cold update, and the new tuple goes to the last page, which already had
|
||||
# its VM bit cleared. The point is that the UPDATE *only* clears the VM bit
|
||||
# on the page containing the old tuple. We had a bug where we got the old
|
||||
# and new pages mixed up, and that only shows up when one of the bits is
|
||||
# cleared, but not the other one.
|
||||
cur.execute("INSERT INTO vmtest_cold_update2 VALUES (9999, 'x')")
|
||||
# Clears the VM bit on the old page
|
||||
cur.execute("UPDATE vmtest_cold_update2 SET id = 5000, filler=repeat('x', 200) WHERE id = 1")
|
||||
cur.execute("UPDATE vmtest_update SET id = 5000 WHERE id = 1")
|
||||
|
||||
# Branch at this point, to test that later
|
||||
fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "test_vm_bit_clear")
|
||||
@@ -72,13 +50,9 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
)
|
||||
|
||||
cur.execute("SELECT id FROM vmtest_delete WHERE id = 1")
|
||||
cur.execute("SELECT * FROM vmtest_delete WHERE id = 1")
|
||||
assert cur.fetchall() == []
|
||||
cur.execute("SELECT id FROM vmtest_hot_update WHERE id = 1")
|
||||
assert cur.fetchall() == [(1,)]
|
||||
cur.execute("SELECT id FROM vmtest_cold_update WHERE id = 1")
|
||||
assert cur.fetchall() == []
|
||||
cur.execute("SELECT id FROM vmtest_cold_update2 WHERE id = 1")
|
||||
cur.execute("SELECT * FROM vmtest_update WHERE id = 1")
|
||||
assert cur.fetchall() == []
|
||||
|
||||
cur.close()
|
||||
@@ -103,11 +77,7 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
)
|
||||
|
||||
cur_new.execute("SELECT id FROM vmtest_delete WHERE id = 1")
|
||||
cur_new.execute("SELECT * FROM vmtest_delete WHERE id = 1")
|
||||
assert cur_new.fetchall() == []
|
||||
cur_new.execute("SELECT id FROM vmtest_hot_update WHERE id = 1")
|
||||
assert cur_new.fetchall() == [(1,)]
|
||||
cur_new.execute("SELECT id FROM vmtest_cold_update WHERE id = 1")
|
||||
assert cur_new.fetchall() == []
|
||||
cur_new.execute("SELECT id FROM vmtest_cold_update2 WHERE id = 1")
|
||||
cur_new.execute("SELECT * FROM vmtest_update WHERE id = 1")
|
||||
assert cur_new.fetchall() == []
|
||||
|
||||
@@ -14,6 +14,8 @@ from pathlib import Path
|
||||
from typing import Any, List, Optional
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.errors
|
||||
import psycopg2.extras
|
||||
import pytest
|
||||
from fixtures.broker import NeonBroker
|
||||
from fixtures.log_helper import log
|
||||
@@ -260,7 +262,7 @@ def test_restarts(neon_env_builder: NeonEnvBuilder):
|
||||
else:
|
||||
failed_node.start()
|
||||
failed_node = None
|
||||
assert query_scalar(cur, "SELECT sum(key) FROM t") == 500500
|
||||
assert query_scalar(cur, "SELECT sum(key) FROM t") == (n_inserts * (n_inserts + 1)) // 2
|
||||
|
||||
|
||||
# Test that safekeepers push their info to the broker and learn peer status from it
|
||||
|
||||
Reference in New Issue
Block a user