Compare commits

..

6 Commits

Author SHA1 Message Date
BodoBolero
e808179cb2 allow all users in role pg_monitor to monitor their LFC working set 2024-02-26 15:01:50 +01:00
Konstantin Knizhnik
cf73c3c79e Remove unneded header include 2023-04-05 21:47:15 +03:00
Konstantin Knizhnik
31950850c4 Use stadnard Postgres HyperLogLog algorithm for estimation of working set size 2023-03-31 19:52:09 +03:00
Konstantin Knizhnik
77d113518c Fix pow_2_32 defintion 2023-03-31 17:14:24 +03:00
Konstantin Knizhnik
cb9ac4ccca Use standard Postgres hash function for HyperLogLog 2023-03-31 10:01:26 +03:00
Konstantin Knizhnik
bf9de03865 Calculate approximate number of accessed unique pages to estimate woring set size at compute 2023-03-30 16:55:39 +03:00
20 changed files with 157 additions and 129 deletions

View File

@@ -30,9 +30,10 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: dev
neon_region: eu-west-1
zenith_service: proxy-scram
zenith_env: dev
zenith_region: eu-west-1
zenith_region_slug: eu-west-1
exposedService:
annotations:

View File

@@ -15,9 +15,10 @@ settings:
# -- Additional labels for neon-proxy-link pods
podLabels:
neon_service: proxy
neon_env: dev
neon_region: us-east-2
zenith_service: proxy
zenith_env: dev
zenith_region: us-east-2
zenith_region_slug: us-east-2
service:
type: LoadBalancer

View File

@@ -15,9 +15,10 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram-legacy
neon_env: dev
neon_region: us-east-2
zenith_service: proxy-scram-legacy
zenith_env: dev
zenith_region: us-east-2
zenith_region_slug: us-east-2
exposedService:
annotations:

View File

@@ -30,9 +30,10 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: dev
neon_region: us-east-2
zenith_service: proxy-scram
zenith_env: dev
zenith_region: us-east-2
zenith_region_slug: us-east-2
exposedService:
annotations:

View File

@@ -31,9 +31,10 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: prod
neon_region: ap-southeast-1
zenith_service: proxy-scram
zenith_env: prod
zenith_region: ap-southeast-1
zenith_region_slug: ap-southeast-1
exposedService:
annotations:

View File

@@ -31,9 +31,10 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: prod
neon_region: eu-central-1
zenith_service: proxy-scram
zenith_env: prod
zenith_region: eu-central-1
zenith_region_slug: eu-central-1
exposedService:
annotations:

View File

@@ -13,9 +13,10 @@ settings:
# -- Additional labels for zenith-proxy pods
podLabels:
neon_service: proxy
neon_env: production
neon_region: us-east-2
zenith_service: proxy
zenith_env: production
zenith_region: us-east-2
zenith_region_slug: us-east-2
service:
type: LoadBalancer

View File

@@ -31,9 +31,10 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: prod
neon_region: us-east-2
zenith_service: proxy-scram
zenith_env: prod
zenith_region: us-east-2
zenith_region_slug: us-east-2
exposedService:
annotations:

View File

@@ -31,9 +31,10 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: prod
neon_region: us-west-2
zenith_service: proxy-scram
zenith_env: prod
zenith_region: us-west-2
zenith_region_slug: us-west-2
exposedService:
annotations:

View File

@@ -31,9 +31,10 @@ settings:
# -- Additional labels for neon-proxy pods
podLabels:
neon_service: proxy-scram
neon_env: prod
neon_region: us-west-2
zenith_service: proxy-scram
zenith_env: prod
zenith_region: us-west-2
zenith_region_slug: us-west-2
exposedService:
annotations:

View File

@@ -203,14 +203,13 @@ fn main() -> Result<()> {
if delay_exit {
info!("giving control plane 30s to collect the error before shutdown");
thread::sleep(Duration::from_secs(30));
info!("shutting down");
}
info!("shutting down tracing");
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit.
tracing_utils::shutdown_tracing();
info!("shutting down");
exit(exit_code.unwrap_or(1))
}

View File

@@ -618,9 +618,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.copied()
.context("Failed to parse postgres version from the argument string")?;
let node =
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?;
println!("{}", node.pgdata().display());
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?;
}
"start" => {
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();

View File

@@ -244,12 +244,14 @@ pub(crate) async fn random_init_delay(
) -> Result<(), Cancelled> {
use rand::Rng;
if period == Duration::ZERO {
return Ok(());
}
let d = {
let mut rng = rand::thread_rng();
// gen_range asserts that the range cannot be empty, which it could be because period can
// be set to zero to disable gc or compaction, so lets set it to be at least 10s.
let period = std::cmp::max(period, Duration::from_secs(10));
// semi-ok default as the source of jitter
rng.gen_range(Duration::ZERO..=period)
};

View File

@@ -24,6 +24,8 @@
#include "pgstat.h"
#include "pagestore_client.h"
#include "access/parallel.h"
#include "common/hashfn.h"
#include "lib/hyperloglog.h"
#include "postmaster/bgworker.h"
#include "storage/relfilenode.h"
#include "storage/buf_internals.h"
@@ -65,6 +67,7 @@
#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */
#define MAX_DISK_WRITE_RATE 1000 /* MB/sec */
#define HYPER_LOG_LOG_BIT_WIDTH 10
typedef struct FileCacheEntry
{
@@ -77,9 +80,11 @@ typedef struct FileCacheEntry
typedef struct FileCacheControl
{
uint32 size; /* size of cache file in chunks */
uint32 used; /* number of used chunks */
dlist_head lru; /* double linked list for LRU replacement algorithm */
uint32 size; /* size of cache file in chunks */
uint32 used; /* number of used chunks */
dlist_head lru; /* double linked list for LRU replacement algorithm */
hyperLogLogState wss_estimation; /* estimation of wroking set size */
char hyperloglog_hashes[(1 << HYPER_LOG_LOG_BIT_WIDTH) + 1];
} FileCacheControl;
static HTAB* lfc_hash;
@@ -124,6 +129,12 @@ lfc_shmem_startup(void)
lfc_ctl->size = 0;
lfc_ctl->used = 0;
dlist_init(&lfc_ctl->lru);
initHyperLogLog(&lfc_ctl->wss_estimation, HYPER_LOG_LOG_BIT_WIDTH);
/* We need hashes in shared memory */
pfree(lfc_ctl->wss_estimation.hashesArr);
memset(lfc_ctl->hyperloglog_hashes, 0, sizeof lfc_ctl->hyperloglog_hashes);
lfc_ctl->wss_estimation.hashesArr = lfc_ctl->hyperloglog_hashes;
/* Remove file cache on restart */
(void)unlink(lfc_path);
@@ -396,6 +407,11 @@ lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
/* Approximate working set */
tag.blockNum = blkno;
addHyperLogLog(&lfc_ctl->wss_estimation, hash_bytes((char const*)&tag, sizeof(tag)));
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
{
/* Page is not cached */
@@ -698,3 +714,21 @@ local_cache_pages(PG_FUNCTION_ARGS)
else
SRF_RETURN_DONE(funcctx);
}
PG_FUNCTION_INFO_V1(approximate_working_set_size);
Datum
approximate_working_set_size(PG_FUNCTION_ARGS)
{
int32 dc = -1;
if (lfc_size_limit != 0)
{
bool reset = PG_GETARG_BOOL(0);
LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED);
dc = (int32) estimateHyperLogLog(&lfc_ctl->wss_estimation);
if (reset)
memset(lfc_ctl->hyperloglog_hashes, 0, sizeof lfc_ctl->hyperloglog_hashes);
LWLockRelease(lfc_lock);
}
PG_RETURN_INT32(dc);
}

View File

@@ -27,6 +27,13 @@ RETURNS SETOF RECORD
AS 'MODULE_PATHNAME', 'local_cache_pages'
LANGUAGE C PARALLEL SAFE;
CREATE FUNCTION approximate_working_set_size(reset bool)
RETURNS integer
AS 'MODULE_PATHNAME', 'approximate_working_set_size'
LANGUAGE C PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION approximate_working_set_size() TO pg_monitor;
-- Create a view for convenient access.
CREATE VIEW local_cache AS
SELECT P.* FROM local_cache_pages() AS P

View File

@@ -598,6 +598,7 @@ class NeonEnvBuilder:
rust_log_override: Optional[str] = None,
default_branch_name: str = DEFAULT_BRANCH_NAME,
preserve_database_files: bool = False,
initial_tenant: Optional[TenantId] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -613,32 +614,39 @@ class NeonEnvBuilder:
self.safekeepers_enable_fsync = safekeepers_enable_fsync
self.auth_enabled = auth_enabled
self.default_branch_name = default_branch_name
self.env: Optional[NeonEnvWithoutInitialTenant] = None
self.env: Optional[NeonEnv] = None
self.remote_storage_prefix: Optional[str] = None
self.keep_remote_storage_contents: bool = True
self.neon_binpath = neon_binpath
self.pg_distrib_dir = pg_distrib_dir
self.pg_version = pg_version
self.preserve_database_files = preserve_database_files
self.initial_tenant = initial_tenant or TenantId.generate()
def init_configs(self) -> NeonEnvWithoutInitialTenant:
def init_configs(self) -> NeonEnv:
# Cannot create more than one environment from one builder
assert self.env is None, "environment already initialized"
self.env = NeonEnvWithoutInitialTenant(self)
self.env = NeonEnv(self)
return self.env
def start(self):
assert self.env is not None, "environment is not already initialized, call init() first"
self.env.start()
def init_start_no_initial_tenant(self) -> NeonEnvWithoutInitialTenant:
self.env = self.init_configs()
self.start()
return self.env
def init_start(self) -> NeonEnv:
env_without_initial_tenant = self.init_start_no_initial_tenant()
return NeonEnv(env_without_initial_tenant)
env = self.init_configs()
self.start()
# Prepare the default branch to start the postgres on later.
# Pageserver itself does not create tenants and timelines, until started first and asked via HTTP API.
log.info(
f"Services started, creating initial tenant {env.initial_tenant} and its initial timeline"
)
initial_tenant, initial_timeline = env.neon_cli.create_tenant(tenant_id=env.initial_tenant)
env.initial_timeline = initial_timeline
log.info(f"Initial timeline {initial_tenant}/{initial_timeline} created successfully")
return env
def enable_remote_storage(
self,
@@ -846,7 +854,7 @@ class NeonEnvBuilder:
self.env.pageserver.assert_no_errors()
class NeonEnvWithoutInitialTenant:
class NeonEnv:
"""
An object representing the Neon runtime environment. It consists of
the page server, 0-N safekeepers, and the compute nodes.
@@ -896,11 +904,17 @@ class NeonEnvWithoutInitialTenant:
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
# self.initial_tenant: TenantId = TenantId.generate()
# self.initial_timeline: Optional[TimelineId] = None
self.initial_tenant = config.initial_tenant
self.initial_timeline: Optional[TimelineId] = None
# Create a config file corresponding to the options
toml = textwrap.dedent(
f"""
default_tenant_id = '{config.initial_tenant}'
"""
)
toml += textwrap.dedent(
f"""
[broker]
listen_addr = '{self.broker.listen_addr()}'
@@ -1001,32 +1015,6 @@ class NeonEnvWithoutInitialTenant:
return AuthKeys(pub=pub, priv=priv)
class NeonEnv(NeonEnvWithoutInitialTenant):
"""Wrapper class around NeonEnvWithoutInitialTenant that provides a default tenant & timeline"""
initial_tenant: TenantId
initial_timeline: TimelineId
def __init__(self, baseObject: NeonEnvWithoutInitialTenant):
# https://stackoverflow.com/a/1445289
self.__class__ = type(
baseObject.__class__.__name__, (self.__class__, baseObject.__class__), {}
)
self.__dict__ = baseObject.__dict__
# Prepare the default branch to start the postgres on later.
# Pageserver itself does not create tenants and timelines, until started first and asked via HTTP API.
initial_tenant = TenantId.generate()
log.info(f"Creating initial tenant {initial_tenant} and its initial timeline")
initial_tenant2, initial_timeline = self.neon_cli.create_tenant(
tenant_id=initial_tenant, set_default=True
)
assert initial_tenant == initial_tenant2
self.initial_tenant = initial_tenant
self.initial_timeline = initial_timeline
log.info(f"Initial timeline {initial_tenant}/{initial_timeline} created successfully")
@pytest.fixture(scope=shareable_scope)
def _shared_simple_env(
request: FixtureRequest,
@@ -1649,7 +1637,7 @@ class AbstractNeonCli(abc.ABC):
Do not use directly, use specific subclasses instead.
"""
def __init__(self, env: NeonEnvWithoutInitialTenant):
def __init__(self, env: NeonEnv):
self.env = env
COMMAND: str = cast(str, None) # To be overwritten by the derived class.
@@ -1808,7 +1796,8 @@ class NeonCli(AbstractNeonCli):
"create",
"--branch-name",
new_branch_name,
*(["--tenant-id", str(tenant_id)] if tenant_id is not None else []),
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--pg-version",
self.env.pg_version,
]
@@ -1836,7 +1825,8 @@ class NeonCli(AbstractNeonCli):
"branch",
"--branch-name",
new_branch_name,
*(["--tenant-id", str(tenant_id)] if tenant_id is not None else []),
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
]
if ancestor_branch_name is not None:
cmd.extend(["--ancestor-branch-name", ancestor_branch_name])
@@ -1865,11 +1855,7 @@ class NeonCli(AbstractNeonCli):
# main [b49f7954224a0ad25cc0013ea107b54b]
# ┣━ @0/16B5A50: test_cli_branch_list_main [20f98c79111b9015d84452258b7d5540]
res = self.raw_cli(
[
"timeline",
"list",
*(["--tenant-id", str(tenant_id)] if tenant_id is not None else []),
]
["timeline", "list", "--tenant-id", str(tenant_id or self.env.initial_tenant)]
)
timelines_cli = sorted(
map(
@@ -1960,7 +1946,8 @@ class NeonCli(AbstractNeonCli):
args = [
"pg",
"create",
*(["--tenant-id", str(tenant_id)] if tenant_id is not None else []),
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--branch-name",
branch_name,
"--pg-version",
@@ -1987,7 +1974,8 @@ class NeonCli(AbstractNeonCli):
args = [
"pg",
"start",
*(["--tenant-id", str(tenant_id)] if tenant_id is not None else []),
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--pg-version",
self.env.pg_version,
]
@@ -2012,7 +2000,8 @@ class NeonCli(AbstractNeonCli):
args = [
"pg",
"stop",
*(["--tenant-id", str(tenant_id)] if tenant_id is not None else []),
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
]
if destroy:
args.append("--destroy")
@@ -2061,12 +2050,7 @@ class NeonPageserver(PgProtocol):
TEMP_FILE_SUFFIX = "___temp"
def __init__(
self,
env: NeonEnvWithoutInitialTenant,
port: PageserverPort,
config_override: Optional[str] = None,
):
def __init__(self, env: NeonEnv, port: PageserverPort, config_override: Optional[str] = None):
super().__init__(host="localhost", port=port.pg, user="cloud_admin")
self.env = env
self.running = False
@@ -2678,17 +2662,13 @@ class Postgres(PgProtocol):
"""An object representing a running postgres daemon."""
def __init__(
self,
env: NeonEnvWithoutInitialTenant,
tenant_id: Optional[TenantId],
port: int,
check_stop_result: bool = True,
self, env: NeonEnv, tenant_id: TenantId, port: int, check_stop_result: bool = True
):
super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres")
self.env = env
self.running = False
self.node_name: Optional[str] = None # dubious, see asserts below
self.pgdata_dir: Optional[Path] = None # Path to computenode PGDATA
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
self.tenant_id = tenant_id
self.port = port
self.check_stop_result = check_stop_result
@@ -2710,12 +2690,11 @@ class Postgres(PgProtocol):
config_lines = []
self.node_name = node_name or f"{branch_name}_pg_node"
output = self.env.neon_cli.pg_create(
self.env.neon_cli.pg_create(
branch_name, node_name=self.node_name, tenant_id=self.tenant_id, lsn=lsn, port=self.port
)
self.pgdata_dir = Path(output.stdout.strip())
assert self.pgdata_dir.is_dir()
assert Path(self.config_file_path()).is_file()
path = Path("pgdatadirs") / "tenants" / str(self.tenant_id) / self.node_name
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
if config_lines is None:
config_lines = []
@@ -2744,7 +2723,9 @@ class Postgres(PgProtocol):
def pg_data_dir_path(self) -> str:
"""Path to data directory"""
return str(self.pgdata_dir)
assert self.node_name
path = Path("pgdatadirs") / "tenants" / str(self.tenant_id) / self.node_name
return os.path.join(self.env.repo_dir, path)
def pg_xact_dir_path(self) -> str:
"""Path to pg_xact dir"""
@@ -2867,7 +2848,7 @@ class Postgres(PgProtocol):
class PostgresFactory:
"""An object representing multiple running postgres daemons."""
def __init__(self, env: NeonEnvWithoutInitialTenant):
def __init__(self, env: NeonEnv):
self.env = env
self.num_instances: int = 0
self.instances: List[Postgres] = []
@@ -2882,7 +2863,7 @@ class PostgresFactory:
) -> Postgres:
pg = Postgres(
self.env,
tenant_id=tenant_id,
tenant_id=tenant_id or self.env.initial_tenant,
port=self.env.port_distributor.get_port(),
)
self.num_instances += 1
@@ -2905,7 +2886,7 @@ class PostgresFactory:
) -> Postgres:
pg = Postgres(
self.env,
tenant_id=tenant_id,
tenant_id=tenant_id or self.env.initial_tenant,
port=self.env.port_distributor.get_port(),
)
@@ -2936,7 +2917,7 @@ class SafekeeperPort:
class Safekeeper:
"""An object representing a running safekeeper daemon."""
env: NeonEnvWithoutInitialTenant
env: NeonEnv
port: SafekeeperPort
id: int
running: bool = False
@@ -3299,7 +3280,6 @@ def check_restored_datadir_content(
pg: Postgres,
):
# Get the timeline ID. We need it for the 'basebackup' command
tenant = TenantId(pg.safe_psql("SHOW neon.tenant_id")[0][0])
timeline = TimelineId(pg.safe_psql("SHOW neon.timeline_id")[0][0])
# stop postgres to ensure that files won't change
@@ -3316,7 +3296,7 @@ def check_restored_datadir_content(
{psql_path} \
--no-psqlrc \
postgres://localhost:{env.pageserver.service_port.pg} \
-c 'basebackup {tenant} {timeline}' \
-c 'basebackup {pg.tenant_id} {timeline}' \
| tar -x -C {restored_dir_path}
"""
@@ -3510,7 +3490,7 @@ def wait_for_last_record_lsn(
def wait_for_last_flush_lsn(
env: NeonEnvWithoutInitialTenant, pg: Postgres, tenant: TenantId, timeline: TimelineId
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
@@ -3518,7 +3498,7 @@ def wait_for_last_flush_lsn(
def wait_for_wal_insert_lsn(
env: NeonEnvWithoutInitialTenant, pg: Postgres, tenant: TenantId, timeline: TimelineId
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
@@ -3526,7 +3506,7 @@ def wait_for_wal_insert_lsn(
def fork_at_current_lsn(
env: NeonEnvWithoutInitialTenant,
env: NeonEnv,
pg: Postgres,
new_branch_name: str,
ancestor_branch_name: str,

View File

@@ -35,7 +35,7 @@ def httpserver_listen_address(port_distributor: PortDistributor):
# Storage metrics tests
# ==============================================================================
metrics_tenant_id = TenantId.generate()
initial_tenant = TenantId.generate()
remote_uploaded = 0
checks = {
"written_size": lambda value: value > 0,
@@ -63,7 +63,7 @@ def metrics_handler(request: Request) -> Response:
for event in events:
assert event["tenant_id"] == str(
metrics_tenant_id
initial_tenant
), "Expecting metrics only from the initial tenant"
metric_name = event["metric"]
@@ -110,18 +110,15 @@ def test_metric_collection(
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
# Set initial tenant of the test, that we expect the logs from
global metrics_tenant_id
metrics_tenant_id = TenantId.generate()
global initial_tenant
initial_tenant = neon_env_builder.initial_tenant
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)
# spin up neon, after http server is ready
env = neon_env_builder.init_start_no_initial_tenant()
env.neon_cli.create_tenant(tenant_id=metrics_tenant_id, set_default=True)
env = neon_env_builder.init_start()
# Order of fixtures shutdown is not specified, and if http server gets down
# before pageserver, pageserver log might contain such errors in the end.
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")

View File

@@ -5,9 +5,10 @@ from fixtures.neon_fixtures import NeonEnvBuilder, PortDistributor
# Repeats the example from README.md as close as it can
def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: PortDistributor):
env = neon_env_builder.init_configs()
# Skipping the init step that creates a local tenant in Pytest tests
try:
env.neon_cli.start()
env.neon_cli.create_tenant(set_default=True)
env.neon_cli.create_tenant(tenant_id=env.initial_tenant, set_default=True)
env.neon_cli.pg_start(node_name="main", port=port_distributor.get_port())
env.neon_cli.create_branch(new_branch_name="migration_check")