fix: add tailing separator to prefix (#4078)

* fix: add tailing separator to prefix

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* project select result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-05-30 23:29:26 +08:00
committed by GitHub
parent f024054ed3
commit 85a231850d
7 changed files with 115 additions and 28 deletions

View File

@@ -143,8 +143,6 @@ fn clamp_impl<T: LogicalPrimitiveType, const CLAMP_MIN: bool, const CLAMP_MAX: b
min: T::Native,
max: T::Native,
) -> Result<VectorRef> {
common_telemetry::info!("[DEBUG] min {min:?}, max {max:?}");
let iter = ArrayIter::new(input);
let result = iter.map(|x| {
x.map(|x| {

View File

@@ -72,12 +72,8 @@ impl DatanodeTableKey {
}
}
fn prefix(datanode_id: DatanodeId) -> String {
format!("{}/{datanode_id}", DATANODE_TABLE_KEY_PREFIX)
}
pub fn range_start_key(datanode_id: DatanodeId) -> String {
format!("{}/", Self::prefix(datanode_id))
pub fn prefix(datanode_id: DatanodeId) -> String {
format!("{}/{datanode_id}/", DATANODE_TABLE_KEY_PREFIX)
}
}
@@ -114,7 +110,7 @@ impl<'a> MetaKey<'a, DatanodeTableKey> for DatanodeTableKey {
impl Display for DatanodeTableKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", Self::prefix(self.datanode_id), self.table_id)
write!(f, "{}{}", Self::prefix(self.datanode_id), self.table_id)
}
}
@@ -164,7 +160,7 @@ impl DatanodeTableManager {
&self,
datanode_id: DatanodeId,
) -> BoxStream<'static, Result<DatanodeTableValue>> {
let start_key = DatanodeTableKey::range_start_key(datanode_id);
let start_key = DatanodeTableKey::prefix(datanode_id);
let req = RangeRequest::new().with_prefix(start_key.as_bytes());
let stream = PaginationStream::new(

View File

@@ -69,8 +69,7 @@ impl FlownodeFlowKey {
/// The prefix used to retrieve all [FlownodeFlowKey]s with the specified `flownode_id`.
pub fn range_start_key(flownode_id: FlownodeId) -> Vec<u8> {
let inner =
BytesAdapter::from(FlownodeFlowKeyInner::range_start_key(flownode_id).into_bytes());
let inner = BytesAdapter::from(FlownodeFlowKeyInner::prefix(flownode_id).into_bytes());
FlowScoped::new(inner).to_bytes()
}
@@ -108,13 +107,8 @@ impl FlownodeFlowKeyInner {
}
}
fn prefix(flownode_id: FlownodeId) -> String {
format!("{}/{flownode_id}", FLOWNODE_FLOW_KEY_PREFIX)
}
/// The prefix used to retrieve all [FlownodeFlowKey]s with the specified `flownode_id`.
fn range_start_key(flownode_id: FlownodeId) -> String {
format!("{}/", Self::prefix(flownode_id))
pub fn prefix(flownode_id: FlownodeId) -> String {
format!("{}/{flownode_id}/", FLOWNODE_FLOW_KEY_PREFIX)
}
}

View File

@@ -80,7 +80,7 @@ impl TableFlowKey {
/// The prefix used to retrieve all [TableFlowKey]s with the specified `table_id`.
pub fn range_start_key(table_id: TableId) -> Vec<u8> {
let inner = BytesAdapter::from(TableFlowKeyInner::range_start_key(table_id).into_bytes());
let inner = BytesAdapter::from(TableFlowKeyInner::prefix(table_id).into_bytes());
FlowScoped::new(inner).to_bytes()
}
@@ -123,12 +123,7 @@ impl TableFlowKeyInner {
}
fn prefix(table_id: TableId) -> String {
format!("{}/{table_id}", TABLE_FLOW_KEY_PREFIX)
}
/// The prefix used to retrieve all [TableFlowKey]s with the specified `table_id`.
fn range_start_key(table_id: TableId) -> String {
format!("{}/", Self::prefix(table_id))
format!("{}/{table_id}/", TABLE_FLOW_KEY_PREFIX)
}
}

View File

@@ -48,7 +48,7 @@ impl<'a> TableNameKey<'a> {
}
pub fn prefix_to_table(catalog: &str, schema: &str) -> String {
format!("{}/{}/{}", TABLE_NAME_KEY_PREFIX, catalog, schema)
format!("{}/{}/{}/", TABLE_NAME_KEY_PREFIX, catalog, schema)
}
}
@@ -56,7 +56,7 @@ impl Display for TableNameKey<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}/{}",
"{}{}",
Self::prefix_to_table(self.catalog, self.schema),
self.table
)
@@ -268,7 +268,11 @@ impl TableNameManager {
#[cfg(test)]
mod tests {
use futures::StreamExt;
use super::*;
use crate::kv_backend::KvBackend;
use crate::rpc::store::PutRequest;
#[test]
fn test_strip_table_name() {
@@ -324,4 +328,39 @@ mod tests {
assert_eq!(value.try_as_raw_value().unwrap(), literal);
assert_eq!(TableNameValue::try_from_raw_value(literal).unwrap(), value);
}
#[tokio::test]
async fn test_prefix_scan_tables() {
let memory_kv = Arc::new(MemoryKvBackend::<crate::error::Error>::new());
memory_kv
.put(PutRequest {
key: TableNameKey {
catalog: "greptime",
schema: "👉",
table: "t",
}
.to_bytes(),
value: vec![],
prev_kv: false,
})
.await
.unwrap();
memory_kv
.put(PutRequest {
key: TableNameKey {
catalog: "greptime",
schema: "👉👈",
table: "t",
}
.to_bytes(),
value: vec![],
prev_kv: false,
})
.await
.unwrap();
let manager = TableNameManager::new(memory_kv);
let items = manager.tables("greptime", "👉").collect::<Vec<_>>().await;
assert_eq!(items.len(), 1);
}
}

View File

@@ -0,0 +1,46 @@
create schema abc;
Affected Rows: 1
use abc;
Affected Rows: 0
create table t (ts timestamp time index);
Affected Rows: 0
create schema abcde;
Affected Rows: 1
use abcde;
Affected Rows: 0
create table t (ts timestamp time index);
Affected Rows: 0
select table_catalog, table_schema, table_name from information_schema.tables where table_schema != 'information_schema';
+---------------+--------------+------------+
| table_catalog | table_schema | table_name |
+---------------+--------------+------------+
| greptime | abc | t |
| greptime | abcde | t |
| greptime | public | numbers |
+---------------+--------------+------------+
use public;
Affected Rows: 0
drop schema abc;
Affected Rows: 0
drop schema abcde;
Affected Rows: 0

View File

@@ -0,0 +1,19 @@
create schema abc;
use abc;
create table t (ts timestamp time index);
create schema abcde;
use abcde;
create table t (ts timestamp time index);
select table_catalog, table_schema, table_name from information_schema.tables where table_schema != 'information_schema';
use public;
drop schema abc;
drop schema abcde;