support tenants

this patch adds support for tenants. This touches mostly pageserver.
Directory layout on disk is changed to contain new layer of indirection.
Now path to particular repository has the following structure: <pageserver workdir>/tenants/<tenant
id>. Tenant id has the same format as timeline id. Tenant id is included in
pageserver commands when needed. Also new commands are available in
pageserver: tenant_list, tenant_create. This is also reflected CLI.
During init default tenant is created and it's id is saved in CLI config,
so following commands can use it without extra options. Tenant id is also included in
compute postgres configuration, so it can be passed via ServerInfo to
safekeeper and in connection string to pageserver.
For more info see docs/multitenancy.md.
This commit is contained in:
Dmitry Rodionov
2021-07-15 16:30:18 +03:00
committed by Dmitry
parent d210ba5fdb
commit 767590bbd5
40 changed files with 1145 additions and 399 deletions

View File

@@ -1,10 +1,13 @@
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures")
#
# Create a couple of branches off the main branch, at a historical point in time.
#
def test_branch_behind(zenith_cli, pageserver, postgres, pg_bin):
def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin):
# Branch at the point where only 100 rows were inserted
zenith_cli.run(["branch", "test_branch_behind", "empty"])

View File

@@ -1,12 +1,14 @@
from contextlib import closing
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test starting Postgres with custom options
#
def test_config(zenith_cli, pageserver, postgres, pg_bin):
def test_config(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin):
# Create a branch for us
zenith_cli.run(["branch", "test_config", "empty"])

View File

@@ -1,4 +1,5 @@
from contextlib import closing
from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory, ZenithCli
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -6,7 +7,12 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test CREATE DATABASE when there have been relmapper changes
#
def test_createdb(zenith_cli, pageserver, postgres, pg_bin):
def test_createdb(
zenith_cli: ZenithCli,
pageserver: ZenithPageserver,
postgres: PostgresFactory,
pg_bin,
):
zenith_cli.run(["branch", "test_createdb", "empty"])
pg = postgres.create_start('test_createdb')

View File

@@ -1,12 +1,14 @@
from contextlib import closing
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test CREATE USER to check shared catalog restore
#
def test_createuser(zenith_cli, pageserver, postgres, pg_bin):
def test_createuser(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin):
zenith_cli.run(["branch", "test_createuser", "empty"])
pg = postgres.create_start('test_createuser')

View File

@@ -1,4 +1,5 @@
from contextlib import closing
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
import psycopg2.extras
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -9,7 +10,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
# This test is pretty tightly coupled with the current implementation of page version storage
# and garbage collection in object_repository.rs.
#
def test_gc(zenith_cli, pageserver, postgres, pg_bin):
def test_gc(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin):
zenith_cli.run(["branch", "test_gc", "empty"])
pg = postgres.create_start('test_gc')
@@ -30,7 +31,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin):
# before running the actual tests below, otherwise the counts won't match
# what we expect.
print("Running GC before test")
pscur.execute(f"do_gc {timeline} 0")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
# remember the number of relations
@@ -42,7 +43,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin):
n_relations += 1;
print("Inserting one row and running GC")
cur.execute("INSERT INTO foo VALUES (1)")
pscur.execute(f"do_gc {timeline} 0")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
assert row['n_relations'] == n_relations
@@ -55,7 +56,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin):
cur.execute("INSERT INTO foo VALUES (2)")
cur.execute("INSERT INTO foo VALUES (3)")
pscur.execute(f"do_gc {timeline} 0")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
assert row['n_relations'] == n_relations
@@ -68,7 +69,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin):
print("Inserting one more row")
cur.execute("INSERT INTO foo VALUES (3)")
pscur.execute(f"do_gc {timeline} 0")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
assert row['n_relations'] == n_relations
@@ -77,7 +78,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin):
assert row['deleted'] == 1
# Run GC again, with no changes in the database. Should not remove anything.
pscur.execute(f"do_gc {timeline} 0")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
assert row['n_relations'] == n_relations
@@ -90,7 +91,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin):
#
cur.execute("DROP TABLE foo")
pscur.execute(f"do_gc {timeline} 0")
pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0")
row = pscur.fetchone()
print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row))
# Each relation fork is counted separately, hence 3.

View File

@@ -1,3 +1,5 @@
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -7,7 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
# it only checks next_multixact_id field in restored pg_control,
# since we don't have functions to check multixact internals.
#
def test_multixact(pageserver, postgres, pg_bin, zenith_cli, base_dir):
def test_multixact(pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin, zenith_cli, base_dir):
# Create a branch for us
zenith_cli.run(["branch", "test_multixact", "empty"])
pg = postgres.create_start('test_multixact')

View File

@@ -1,4 +1,8 @@
import json
import uuid
import pytest
import psycopg2
from fixtures.zenith_fixtures import ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -9,14 +13,14 @@ def test_status(pageserver):
]
def test_branch_list(pageserver, zenith_cli):
def test_branch_list(pageserver: ZenithPageserver, zenith_cli):
# Create a branch for us
zenith_cli.run(["branch", "test_branch_list_main", "empty"])
conn = pageserver.connect()
cur = conn.cursor()
cur.execute('branch_list')
cur.execute(f'branch_list {pageserver.initial_tenant}')
branches = json.loads(cur.fetchone()[0])
# Filter out branches created by other tests
branches = [x for x in branches if x['name'].startswith('test_branch_list')]
@@ -32,7 +36,7 @@ def test_branch_list(pageserver, zenith_cli):
zenith_cli.run(['branch', 'test_branch_list_experimental', 'test_branch_list_main'])
zenith_cli.run(['pg', 'create', 'test_branch_list_experimental'])
cur.execute('branch_list')
cur.execute(f'branch_list {pageserver.initial_tenant}')
new_branches = json.loads(cur.fetchone()[0])
# Filter out branches created by other tests
new_branches = [x for x in new_branches if x['name'].startswith('test_branch_list')]
@@ -46,3 +50,27 @@ def test_branch_list(pageserver, zenith_cli):
assert new_branches[1] == branches[0]
conn.close()
def test_tenant_list(pageserver: ZenithPageserver, zenith_cli):
res = zenith_cli.run(["tenant", "list"])
res.check_returncode()
tenants = res.stdout.splitlines()
assert tenants == [pageserver.initial_tenant]
conn = pageserver.connect()
cur = conn.cursor()
# check same tenant cannot be created twice
with pytest.raises(psycopg2.DatabaseError, match=f'repo for {pageserver.initial_tenant} already exists'):
cur.execute(f'tenant_create {pageserver.initial_tenant}')
# create one more tenant
tenant1 = uuid.uuid4().hex
cur.execute(f'tenant_create {tenant1}')
cur.execute('tenant_list')
# compare tenants list
new_tenants = sorted(json.loads(cur.fetchone()[0]))
assert sorted([pageserver.initial_tenant, tenant1]) == new_tenants

View File

@@ -1,8 +1,9 @@
from fixtures.zenith_fixtures import PostgresFactory
pytest_plugins = ("fixtures.zenith_fixtures")
def test_pgbench(pageserver, postgres, pg_bin, zenith_cli):
def test_pgbench(postgres: PostgresFactory, pg_bin, zenith_cli):
# Create a branch for us
zenith_cli.run(["branch", "test_pgbench", "empty"])

View File

@@ -1,4 +1,5 @@
from contextlib import closing
from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory
pytest_plugins = ("fixtures.zenith_fixtures")
@@ -6,7 +7,7 @@ pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test restarting and recreating a postgres instance
#
def test_restart_compute(zenith_cli, pageserver, postgres, pg_bin):
def test_restart_compute(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin):
zenith_cli.run(["branch", "test_restart_compute", "empty"])
pg = postgres.create_start('test_restart_compute')

View File

@@ -0,0 +1,48 @@
from contextlib import closing
import pytest
from fixtures.zenith_fixtures import (
TenantFactory,
ZenithCli,
PostgresFactory,
)
@pytest.mark.parametrize('with_wal_acceptors', [False, True])
def test_tenants_normal_work(
zenith_cli: ZenithCli,
tenant_factory: TenantFactory,
postgres: PostgresFactory,
wa_factory,
with_wal_acceptors: bool,
):
"""Tests tenants with and without wal acceptors"""
tenant_1 = tenant_factory.create()
tenant_2 = tenant_factory.create()
zenith_cli.run(["branch", f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", "main", f"--tenantid={tenant_1}"])
zenith_cli.run(["branch", f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}", "main", f"--tenantid={tenant_2}"])
if with_wal_acceptors:
wa_factory.start_n_new(3)
pg_tenant1 = postgres.create_start(
f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}",
tenant_1,
wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None,
)
pg_tenant2 = postgres.create_start(
f"test_tenants_normal_work_with_wal_acceptors{with_wal_acceptors}",
tenant_2,
wal_acceptors=wa_factory.get_connstrs() if with_wal_acceptors else None,
)
for pg in [pg_tenant1, pg_tenant2]:
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'")
cur.execute("SELECT sum(key) FROM t")
assert cur.fetchone() == (5000050000,)

View File

@@ -1,10 +1,13 @@
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test branching, when a transaction is in prepared state
#
def test_twophase(zenith_cli, pageserver, postgres, pg_bin):
def test_twophase(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin):
zenith_cli.run(["branch", "test_twophase", "empty"])
pg = postgres.create_start('test_twophase', config_lines=['max_prepared_transactions=5'])
@@ -28,8 +31,10 @@ def test_twophase(zenith_cli, pageserver, postgres, pg_bin):
# Create a branch with the transaction in prepared state
zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase"])
pg2 = postgres.create_start('test_twophase_prepared',
config_lines=['max_prepared_transactions=5'])
pg2 = postgres.create_start(
'test_twophase_prepared',
config_lines=['max_prepared_transactions=5'],
)
conn2 = pg2.connect()
cur2 = conn2.cursor()

View File

@@ -4,13 +4,14 @@ import time
from contextlib import closing
from multiprocessing import Process, Value
from fixtures.zenith_fixtures import ZenithPageserver, PostgresFactory
pytest_plugins = ("fixtures.zenith_fixtures")
# basic test, write something in setup with wal acceptors, ensure that commits
# succeed and data is written
def test_normal_work(zenith_cli, pageserver, postgres, wa_factory):
def test_normal_work(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory):
zenith_cli.run(["branch", "test_wal_acceptors_normal_work", "empty"])
wa_factory.start_n_new(3)
pg = postgres.create_start('test_wal_acceptors_normal_work',
@@ -28,7 +29,7 @@ def test_normal_work(zenith_cli, pageserver, postgres, wa_factory):
# Run page server and multiple acceptors, and multiple compute nodes running
# against different timelines.
def test_many_timelines(zenith_cli, pageserver, postgres, wa_factory):
def test_many_timelines(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory):
n_timelines = 2
wa_factory.start_n_new(3)
@@ -60,7 +61,7 @@ def test_many_timelines(zenith_cli, pageserver, postgres, wa_factory):
# Check that dead minority doesn't prevent the commits: execute insert n_inserts
# times, with fault_probability chance of getting a wal acceptor down or up
# along the way. 2 of 3 are always alive, so the work keeps going.
def test_restarts(zenith_cli, pageserver, postgres, wa_factory):
def test_restarts(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory):
fault_probability = 0.01
n_inserts = 1000
n_acceptors = 3
@@ -101,7 +102,7 @@ def delayed_wal_acceptor_start(wa):
# When majority of acceptors is offline, commits are expected to be frozen
def test_unavailability(zenith_cli, pageserver, postgres, wa_factory):
def test_unavailability(zenith_cli, postgres: PostgresFactory, wa_factory):
wa_factory.start_n_new(2)
zenith_cli.run(["branch", "test_wal_acceptors_unavailability", "empty"])
@@ -171,7 +172,7 @@ def stop_value():
# do inserts while concurrently getting up/down subsets of acceptors
def test_race_conditions(zenith_cli, pageserver, postgres, wa_factory, stop_value):
def test_race_conditions(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, wa_factory, stop_value):
wa_factory.start_n_new(3)

View File

@@ -1,43 +1,50 @@
import json
import uuid
from fixtures.zenith_fixtures import ZenithCli, ZenithPageserver
pytest_plugins = ("fixtures.zenith_fixtures")
def helper_compare_branch_list(page_server_cur, zenith_cli):
def helper_compare_branch_list(page_server_cur, zenith_cli, initial_tenant: str):
"""
Compare branches list returned by CLI and directly via API.
Filters out branches created by other tests.
"""
page_server_cur.execute('branch_list')
page_server_cur.execute(f'branch_list {initial_tenant}')
branches_api = sorted(map(lambda b: b['name'], json.loads(page_server_cur.fetchone()[0])))
branches_api = [b for b in branches_api if b.startswith('test_cli_') or b in ('empty', 'main')]
res = zenith_cli.run(["branch"])
assert res.stderr == ''
res.check_returncode()
branches_cli = sorted(map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n")))
branches_cli = [b for b in branches_cli if b.startswith('test_cli_') or b in ('empty', 'main')]
assert branches_api == branches_cli
res = zenith_cli.run(["branch", f"--tenantid={initial_tenant}"])
res.check_returncode()
branches_cli_with_tenant_arg = sorted(map(lambda b: b.split(':')[-1].strip(), res.stdout.strip().split("\n")))
branches_cli_with_tenant_arg = [b for b in branches_cli if b.startswith('test_cli_') or b in ('empty', 'main')]
assert branches_api == branches_cli == branches_cli_with_tenant_arg
def test_cli_branch_list(pageserver, zenith_cli):
def test_cli_branch_list(pageserver: ZenithPageserver, zenith_cli):
page_server_conn = pageserver.connect()
page_server_cur = page_server_conn.cursor()
# Initial sanity check
helper_compare_branch_list(page_server_cur, zenith_cli)
helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant)
# Create a branch for us
res = zenith_cli.run(["branch", "test_cli_branch_list_main", "main"])
assert res.stderr == ''
helper_compare_branch_list(page_server_cur, zenith_cli)
helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant)
# Create a nested branch
res = zenith_cli.run(["branch", "test_cli_branch_list_nested", "test_cli_branch_list_main"])
assert res.stderr == ''
helper_compare_branch_list(page_server_cur, zenith_cli)
helper_compare_branch_list(page_server_cur, zenith_cli, pageserver.initial_tenant)
# Check that all new branches are visible via CLI
res = zenith_cli.run(["branch"])
@@ -46,3 +53,45 @@ def test_cli_branch_list(pageserver, zenith_cli):
assert 'test_cli_branch_list_main' in branches_cli
assert 'test_cli_branch_list_nested' in branches_cli
def helper_compare_tenant_list(page_server_cur, zenith_cli: ZenithCli):
page_server_cur.execute(f'tenant_list')
tenants_api = sorted(json.loads(page_server_cur.fetchone()[0]))
res = zenith_cli.run(["tenant", "list"])
assert res.stderr == ''
tenants_cli = sorted(res.stdout.splitlines())
assert tenants_api == tenants_cli
def test_cli_tenant_list(pageserver: ZenithPageserver, zenith_cli: ZenithCli):
page_server_conn = pageserver.connect()
page_server_cur = page_server_conn.cursor()
# Initial sanity check
helper_compare_tenant_list(page_server_cur, zenith_cli)
# Create new tenant
tenant1 = uuid.uuid4().hex
res = zenith_cli.run(["tenant", "create", tenant1])
res.check_returncode()
# check tenant1 appeared
helper_compare_tenant_list(page_server_cur, zenith_cli)
# Create new tenant
tenant2 = uuid.uuid4().hex
res = zenith_cli.run(["tenant", "create", tenant2])
res.check_returncode()
# check tenant2 appeared
helper_compare_tenant_list(page_server_cur, zenith_cli)
res = zenith_cli.run(["tenant", "list"])
res.check_returncode()
tenants = sorted(res.stdout.splitlines())
assert pageserver.initial_tenant in tenants
assert tenant1 in tenants
assert tenant2 in tenants