Send 409 HTTP responses on timeline and tenant creation for existing entity

This commit is contained in:
Kirill Bulatov
2022-03-07 23:12:36 +02:00
committed by Kirill Bulatov
parent c51d545fd9
commit 093ad8ab59
9 changed files with 126 additions and 103 deletions

View File

@@ -145,10 +145,9 @@ impl PageServerNode {
args.extend(["--create-tenant", tenant_id])
}
let initial_timeline_id_str = initial_timeline_id.map(|id| id.to_string());
if let Some(timeline_id) = initial_timeline_id_str.as_deref() {
args.extend(["--initial-timeline-id", timeline_id])
}
let initial_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate);
let initial_timeline_id_string = initial_timeline_id.to_string();
args.extend(["--initial-timeline-id", &initial_timeline_id_string]);
let init_output = fill_rust_env_vars(cmd.args(args))
.output()
@@ -158,11 +157,7 @@ impl PageServerNode {
bail!("pageserver init failed");
}
if let Some(initial_timeline_id) = initial_timeline_id {
Ok(initial_timeline_id)
} else {
extract_initial_timeline_id(init_output.stdout)
}
Ok(initial_timeline_id)
}
pub fn repo_path(&self) -> PathBuf {
@@ -337,7 +332,10 @@ impl PageServerNode {
.json()?)
}
pub fn tenant_create(&self, new_tenant_id: Option<ZTenantId>) -> anyhow::Result<ZTenantId> {
pub fn tenant_create(
&self,
new_tenant_id: Option<ZTenantId>,
) -> anyhow::Result<Option<ZTenantId>> {
let tenant_id_string = self
.http_request(Method::POST, format!("{}/tenant", self.http_base_url))
.json(&TenantCreateRequest {
@@ -345,13 +343,18 @@ impl PageServerNode {
})
.send()?
.error_from_body()?
.json::<String>()?;
tenant_id_string.parse().with_context(|| {
format!(
"Failed to parse tennat creation response as tenant id: {}",
tenant_id_string
)
})
.json::<Option<String>>()?;
tenant_id_string
.map(|id| {
id.parse().with_context(|| {
format!(
"Failed to parse tennat creation response as tenant id: {}",
id
)
})
})
.transpose()
}
pub fn timeline_list(&self, tenant_id: &ZTenantId) -> anyhow::Result<Vec<TimelineInfo>> {
@@ -376,7 +379,7 @@ impl PageServerNode {
new_timeline_id: Option<ZTimelineId>,
ancestor_start_lsn: Option<Lsn>,
ancestor_timeline_id: Option<ZTimelineId>,
) -> anyhow::Result<TimelineInfo> {
) -> anyhow::Result<Option<TimelineInfo>> {
let timeline_info_response = self
.http_request(
Method::POST,
@@ -389,36 +392,10 @@ impl PageServerNode {
})
.send()?
.error_from_body()?
.json::<TimelineInfoResponse>()?;
.json::<Option<TimelineInfoResponse>>()?;
TimelineInfo::try_from(timeline_info_response)
timeline_info_response
.map(TimelineInfo::try_from)
.transpose()
}
}
fn extract_initial_timeline_id(init_stdout: Vec<u8>) -> anyhow::Result<ZTimelineId> {
let output_string =
String::from_utf8(init_stdout).context("Init stdout is not a valid unicode")?;
let string_with_timeline_id = match output_string.split_once("created initial timeline ") {
Some((_, string_with_timeline_id)) => string_with_timeline_id,
None => bail!(
"Found no line with timeline id in the init output: '{}'",
output_string
),
};
let timeline_id_str = match string_with_timeline_id.split_once(' ') {
Some((timeline_id_str, _)) => timeline_id_str,
None => bail!(
"Found no timeline id in the init output: '{}'",
output_string
),
};
timeline_id_str.parse().with_context(|| {
format!(
"Failed to parse timeline id from string, extracted from the init output: '{}'",
timeline_id_str
)
})
}

View File

@@ -131,7 +131,9 @@ paths:
type: string
format: hex
post:
description: Create timeline
description: |
Create a timeline. Returns new timeline id on success.\
If no new timeline id is specified in parameters, it would be generated. It's an error to recreate the same timeline.
requestBody:
content:
application/json:
@@ -171,6 +173,12 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"409":
description: Timeline already exists, creation skipped
content:
application/json:
schema:
$ref: "#/components/schemas/AlreadyExistsError"
"500":
description: Generic operation error
content:
@@ -208,7 +216,9 @@ paths:
schema:
$ref: "#/components/schemas/Error"
post:
description: Create tenant
description: |
Create a tenant. Returns new tenant id on success.\
If no new tenant id is specified in parameters, it would be generated. It's an error to recreate the same tenant.
requestBody:
content:
application/json:
@@ -220,7 +230,7 @@ paths:
format: hex
responses:
"201":
description: Already exists or created
description: New tenant created successfully
content:
application/json:
schema:
@@ -244,6 +254,12 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"409":
description: Tenant already exists, creation skipped
content:
application/json:
schema:
$ref: "#/components/schemas/AlreadyExistsError"
"500":
description: Generic operation error
content:
@@ -311,6 +327,13 @@ components:
properties:
msg:
type: string
AlreadyExistsError:
type: object
required:
- msg
properties:
msg:
type: string
ForbiddenError:
type: object
required:

View File

@@ -74,7 +74,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
check_permission(&request, Some(tenant_id))?;
let response_data = tokio::task::spawn_blocking(move || {
let new_timeline_info = tokio::task::spawn_blocking(move || {
let _enter = info_span!("/timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, lsn=?request_data.ancestor_start_lsn).entered();
timelines::create_timeline(
get_config(&request),
@@ -85,8 +85,12 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
)
})
.await
.map_err(ApiError::from_err)?.map(TimelineInfoResponse::from)?;
Ok(json_response(StatusCode::CREATED, response_data)?)
.map_err(ApiError::from_err)??;
Ok(match new_timeline_info {
Some(info) => json_response(StatusCode::CREATED, TimelineInfoResponse::from(info))?,
None => json_response(StatusCode::CONFLICT, ())?,
})
}
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -220,17 +224,18 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
let new_tenant_id = tokio::task::spawn_blocking(move || {
let _enter = info_span!("tenant_create", tenant = ?request_data.new_tenant_id).entered();
tenant_mgr::create_repository_for_tenant(
tenant_mgr::create_tenant_repository(
get_config(&request),
request_data.new_tenant_id.map(ZTenantId::from),
)
})
.await
.map_err(ApiError::from_err)??;
Ok(json_response(
StatusCode::CREATED,
HexZTenantId::from(new_tenant_id),
)?)
Ok(match new_tenant_id {
Some(id) => json_response(StatusCode::CREATED, HexZTenantId::from(id))?,
None => json_response(StatusCode::CONFLICT, ())?,
})
}
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {

View File

@@ -177,10 +177,10 @@ pub fn shutdown_all_tenants() {
}
}
pub fn create_repository_for_tenant(
pub fn create_tenant_repository(
conf: &'static PageServerConf,
new_tenant_id: Option<ZTenantId>,
) -> Result<ZTenantId> {
) -> Result<Option<ZTenantId>> {
let new_tenant_id = new_tenant_id.unwrap_or_else(ZTenantId::generate);
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, new_tenant_id));
match timelines::create_repo(conf, new_tenant_id, wal_redo_manager)? {
@@ -191,11 +191,13 @@ pub fn create_repository_for_tenant(
state: TenantState::Idle,
repo,
});
Ok(Some(new_tenant_id))
}
None => {
debug!("repository already exists for tenant {}", new_tenant_id);
Ok(None)
}
None => debug!("repository already exists for tenant {}", new_tenant_id),
}
Ok(new_tenant_id)
}
pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {

View File

@@ -331,17 +331,26 @@ pub(crate) fn create_timeline(
new_timeline_id: Option<ZTimelineId>,
ancestor_timeline_id: Option<ZTimelineId>,
ancestor_start_lsn: Option<Lsn>,
) -> Result<TimelineInfo> {
) -> Result<Option<TimelineInfo>> {
let new_timeline_id = new_timeline_id.unwrap_or_else(ZTimelineId::generate);
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
if conf.timeline_path(&new_timeline_id, &tenant_id).exists() {
bail!("timeline {} already exists", new_timeline_id);
match repo.get_timeline(new_timeline_id)? {
RepositoryTimeline::Local { id, .. } => {
debug!("timeline {} already exists", id);
return Ok(None);
}
RepositoryTimeline::Remote { id, .. } => bail!(
"timeline {} already exists in pageserver's remote storage",
id
),
}
}
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let mut start_lsn = ancestor_start_lsn.unwrap_or(Lsn(0));
match ancestor_timeline_id {
let new_timeline_info = match ancestor_timeline_id {
Some(ancestor_timeline_id) => {
let ancestor_timeline = repo
.get_timeline(ancestor_timeline_id)
@@ -383,20 +392,17 @@ pub(crate) fn create_timeline(
repo.branch_timeline(ancestor_timeline_id, new_timeline_id, start_lsn)?;
// load the timeline into memory
let loaded_timeline = repo.get_timeline(new_timeline_id)?;
Ok(TimelineInfo::from_repo_timeline(
tenant_id,
loaded_timeline,
false,
))
TimelineInfo::from_repo_timeline(tenant_id, loaded_timeline, false)
}
None => {
let new_timeline = bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?;
Ok(TimelineInfo::from_dyn_timeline(
TimelineInfo::from_dyn_timeline(
tenant_id,
new_timeline_id,
new_timeline.as_ref(),
false,
))
)
}
}
};
Ok(Some(new_timeline_info))
}

View File

@@ -28,7 +28,7 @@ def check_client(client: ZenithPageserverHttpClient, initial_tenant: UUID):
# create timeline
timeline_id = uuid4()
client.timeline_create(tenant_id=tenant_id, timeline_id=timeline_id)
client.timeline_create(tenant_id=tenant_id, new_timeline_id=timeline_id)
timelines = client.timeline_list(tenant_id)
assert len(timelines) > 0

View File

@@ -57,6 +57,10 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
branch_names = [
"test_wal_acceptors_many_timelines_{}".format(tlin) for tlin in range(n_timelines)
]
# pageserver, safekeeper operate timelines via their ids (can be represented in hex as 'ad50847381e248feaac9876cc71ae418')
# that's not really human readable, so the branch names are introduced in Zenith CLI.
# Zenith CLI stores its branch <-> timeline mapping in its internals,
# but we need this to collect metrics from other servers, related to the timeline.
branch_names_to_timeline_ids = {}
# start postgres on each timeline
@@ -75,7 +79,7 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
tenant_id=tenant_id, timeline_id=branch_names_to_timeline_ids[branch_name])
for branch_name in branch_names
]
# All changes visible to pageserver (latest_valid_lsn) should be
# All changes visible to pageserver (last_record_lsn) should be
# confirmed by safekeepers first. As we cannot atomically get
# state of both pageserver and safekeepers, we should start with
# pageserver. Looking at outdated data from pageserver is ok.

View File

@@ -725,20 +725,23 @@ class ZenithPageserverHttpClient(requests.Session):
def timeline_create(
self,
tenant_id: uuid.UUID,
timeline_id: Optional[uuid.UUID] = None,
new_timeline_id: Optional[uuid.UUID] = None,
ancestor_timeline_id: Optional[uuid.UUID] = None,
ancestor_start_lsn: Optional[str] = None,
) -> Dict[Any, Any]:
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline",
json={
'new_timeline_id':
timeline_id.hex if timeline_id else None,
new_timeline_id.hex if new_timeline_id else None,
'ancestor_start_lsn':
ancestor_start_lsn,
'ancestor_timeline_id':
ancestor_timeline_id.hex if ancestor_timeline_id else None,
})
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f'could not create timeline: already exists for id {new_timeline_id}')
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
@@ -750,14 +753,16 @@ class ZenithPageserverHttpClient(requests.Session):
assert isinstance(res_json, list)
return res_json
def tenant_create(self, tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID:
def tenant_create(self, new_tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID:
res = self.post(
f"http://localhost:{self.port}/v1/tenant",
json={
'new_tenant_id': tenant_id.hex if tenant_id else None,
'new_tenant_id': new_tenant_id.hex if new_tenant_id else None,
},
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f'could not create tenant: already exists for id {new_tenant_id}')
new_tenant_id = res.json()
assert isinstance(new_tenant_id, str)
return uuid.UUID(new_tenant_id)
@@ -806,6 +811,13 @@ class S3Storage:
RemoteStorage = Union[LocalFsStorage, S3Storage]
CREATE_TIMELINE_ID_EXTRACTOR = re.compile(r"^Created timeline '(?P<timeline_id>[^']+)'",
re.MULTILINE)
CREATE_TIMELINE_ID_EXTRACTOR = re.compile(r"^Created timeline '(?P<timeline_id>[^']+)'",
re.MULTILINE)
TIMELINE_DATA_EXTRACTOR = re.compile(r"\s(?P<branch_name>[^\s]+)\s\[(?P<timeline_id>[^\]]+)\]",
re.MULTILINE)
class ZenithCli:
"""
@@ -846,18 +858,13 @@ class ZenithCli:
res = self.raw_cli(cmd)
res.check_returncode()
create_timeline_id_extractor = re.compile(r"^Created timeline '(?P<timeline_id>[^']+)'",
re.MULTILINE)
matches = create_timeline_id_extractor.search(res.stdout)
matches = CREATE_TIMELINE_ID_EXTRACTOR.search(res.stdout)
created_timeline_id = None
if matches is not None:
created_timeline_id = matches.group('timeline_id')
if created_timeline_id is None:
raise Exception('could not find timeline id after `zenith timeline create` invocation')
else:
return uuid.UUID(created_timeline_id)
return uuid.UUID(created_timeline_id)
def create_branch(self,
new_branch_name: str = DEFAULT_BRANCH_NAME,
@@ -880,9 +887,7 @@ class ZenithCli:
res = self.raw_cli(cmd)
res.check_returncode()
create_timeline_id_extractor = re.compile(r"^Created timeline '(?P<timeline_id>[^']+)'",
re.MULTILINE)
matches = create_timeline_id_extractor.search(res.stdout)
matches = CREATE_TIMELINE_ID_EXTRACTOR.search(res.stdout)
created_timeline_id = None
if matches is not None:
@@ -900,13 +905,11 @@ class ZenithCli:
# (L) main [b49f7954224a0ad25cc0013ea107b54b]
# (L) ┣━ @0/16B5A50: test_cli_branch_list_main [20f98c79111b9015d84452258b7d5540]
timeline_data_extractor = re.compile(
r"\s(?P<branch_name>[^\s]+)\s\[(?P<timeline_id>[^\]]+)\]", re.MULTILINE)
res = self.raw_cli(
['timeline', 'list', '--tenant-id', (tenant_id or self.env.initial_tenant).hex])
timelines_cli = sorted(
map(lambda branch_and_id: (branch_and_id[0], branch_and_id[1]),
timeline_data_extractor.findall(res.stdout)))
TIMELINE_DATA_EXTRACTOR.findall(res.stdout)))
return timelines_cli
def init(self,

View File

@@ -522,7 +522,11 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Re
}
Some(("create", create_match)) => {
let initial_tenant_id = parse_tenant_id(create_match)?;
let new_tenant_id = pageserver.tenant_create(initial_tenant_id)?;
let new_tenant_id = pageserver
.tenant_create(initial_tenant_id)?
.ok_or_else(|| {
anyhow!("Tenant with id {:?} was already created", initial_tenant_id)
})?;
println!(
"tenant {} successfully created on the pageserver",
new_tenant_id
@@ -548,7 +552,9 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let new_branch_name = create_match
.value_of("branch-name")
.ok_or(anyhow!("No branch name provided"))?;
let timeline = pageserver.timeline_create(tenant_id, None, None, None)?;
let timeline = pageserver
.timeline_create(tenant_id, None, None, None)?
.ok_or_else(|| anyhow!("Failed to create new timeline for tenant {}", tenant_id))?;
let new_timeline_id = timeline.timeline_id();
let last_record_lsn = match timeline {
@@ -593,12 +599,9 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
.map(Lsn::from_str)
.transpose()
.context("Failed to parse ancestor start Lsn from the request")?;
let timeline = pageserver.timeline_create(
tenant_id,
None,
start_lsn,
Some(ancestor_timeline_id),
)?;
let timeline = pageserver
.timeline_create(tenant_id, None, start_lsn, Some(ancestor_timeline_id))?
.ok_or_else(|| anyhow!("Failed to create new timeline for tenant {}", tenant_id))?;
let new_timeline_id = timeline.timeline_id();
let last_record_lsn = match timeline {