mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 15:22:56 +00:00
chore: deny unused results (#1825)
* chore: deny unused results * rebase
This commit is contained in:
@@ -13,4 +13,5 @@ rustflags = [
|
||||
"-Wclippy::print_stderr",
|
||||
"-Wclippy::implicit_clone",
|
||||
"-Aclippy::items_after_test_module",
|
||||
"-Wunused_results",
|
||||
]
|
||||
|
||||
@@ -114,7 +114,7 @@ async fn write_data(
|
||||
};
|
||||
|
||||
let now = Instant::now();
|
||||
db.insert(requests).await.unwrap();
|
||||
let _ = db.insert(requests).await.unwrap();
|
||||
let elapsed = now.elapsed();
|
||||
total_rpc_elapsed_ms += elapsed.as_millis();
|
||||
progress_bar.inc(row_count as _);
|
||||
@@ -377,19 +377,16 @@ fn create_table_expr() -> CreateTableExpr {
|
||||
}
|
||||
|
||||
fn query_set() -> HashMap<String, String> {
|
||||
let mut ret = HashMap::new();
|
||||
|
||||
ret.insert(
|
||||
"count_all".to_string(),
|
||||
format!("SELECT COUNT(*) FROM {TABLE_NAME};"),
|
||||
);
|
||||
|
||||
ret.insert(
|
||||
"fare_amt_by_passenger".to_string(),
|
||||
format!("SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM {TABLE_NAME} GROUP BY passenger_count")
|
||||
);
|
||||
|
||||
ret
|
||||
HashMap::from([
|
||||
(
|
||||
"count_all".to_string(),
|
||||
format!("SELECT COUNT(*) FROM {TABLE_NAME};"),
|
||||
),
|
||||
(
|
||||
"fare_amt_by_passenger".to_string(),
|
||||
format!("SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM {TABLE_NAME} GROUP BY passenger_count"),
|
||||
)
|
||||
])
|
||||
}
|
||||
|
||||
async fn do_write(args: &Args, db: &Database) {
|
||||
@@ -414,7 +411,8 @@ async fn do_write(args: &Args, db: &Database) {
|
||||
let db = db.clone();
|
||||
let mpb = multi_progress_bar.clone();
|
||||
let pb_style = progress_bar_style.clone();
|
||||
write_jobs.spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await });
|
||||
let _ = write_jobs
|
||||
.spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await });
|
||||
}
|
||||
}
|
||||
while write_jobs.join_next().await.is_some() {
|
||||
@@ -423,7 +421,8 @@ async fn do_write(args: &Args, db: &Database) {
|
||||
let db = db.clone();
|
||||
let mpb = multi_progress_bar.clone();
|
||||
let pb_style = progress_bar_style.clone();
|
||||
write_jobs.spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await });
|
||||
let _ = write_jobs
|
||||
.spawn(async move { write_data(batch_size, &db, path, mpb, pb_style).await });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -392,6 +392,6 @@ mod tests {
|
||||
#[test]
|
||||
fn test_table_global_value_compatibility() {
|
||||
let s = r#"{"node_id":1,"regions_id_map":{"1":[0]},"table_info":{"ident":{"table_id":1098,"version":1},"name":"container_cpu_limit","desc":"Created on insertion","catalog_name":"greptime","schema_name":"dd","meta":{"schema":{"column_schemas":[{"name":"container_id","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"container_name","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"docker_image","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"host","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"image_name","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"image_tag","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"interval","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"runtime","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"short_image","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"type","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"dd_value","data_type":{"Float64":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}},{"name":"ts","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"metadata":{"greptime:time_index":"true"}},{"name":"git.repository_url","data_type":{"String":null},"is_nullable":true,"is_time_index":false,"default_constraint":null,"metadata":{}}],"timestamp_index":11,"version":1},"primary_key_indices":[0,1,2,3,4,5,6,7,8,9,12],"value_indices":[10,11],"engine":"mito","next_column_id":12,"region_numbers":[],"engine_options":{},"options":{},"created_on":"1970-01-01T00:00:00Z"},"table_type":"Base"}}"#;
|
||||
TableGlobalValue::parse(s).unwrap();
|
||||
assert!(TableGlobalValue::parse(s).is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -180,7 +180,7 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
|
||||
table_name,
|
||||
),
|
||||
})?;
|
||||
manager
|
||||
let _ = manager
|
||||
.register_table(RegisterTableRequest {
|
||||
catalog: catalog_name.clone(),
|
||||
schema: schema_name.clone(),
|
||||
|
||||
@@ -118,9 +118,10 @@ impl LocalCatalogManager {
|
||||
|
||||
async fn init_system_catalog(&self) -> Result<()> {
|
||||
// register SystemCatalogTable
|
||||
self.catalogs
|
||||
let _ = self
|
||||
.catalogs
|
||||
.register_catalog_sync(SYSTEM_CATALOG_NAME.to_string())?;
|
||||
self.catalogs.register_schema_sync(RegisterSchemaRequest {
|
||||
let _ = self.catalogs.register_schema_sync(RegisterSchemaRequest {
|
||||
catalog: SYSTEM_CATALOG_NAME.to_string(),
|
||||
schema: INFORMATION_SCHEMA_NAME.to_string(),
|
||||
})?;
|
||||
@@ -131,12 +132,13 @@ impl LocalCatalogManager {
|
||||
table_id: SYSTEM_CATALOG_TABLE_ID,
|
||||
table: self.system.information_schema.system.clone(),
|
||||
};
|
||||
self.catalogs.register_table(register_table_req).await?;
|
||||
let _ = self.catalogs.register_table(register_table_req).await?;
|
||||
|
||||
// register default catalog and default schema
|
||||
self.catalogs
|
||||
let _ = self
|
||||
.catalogs
|
||||
.register_catalog_sync(DEFAULT_CATALOG_NAME.to_string())?;
|
||||
self.catalogs.register_schema_sync(RegisterSchemaRequest {
|
||||
let _ = self.catalogs.register_schema_sync(RegisterSchemaRequest {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
})?;
|
||||
@@ -151,7 +153,8 @@ impl LocalCatalogManager {
|
||||
table: numbers_table,
|
||||
};
|
||||
|
||||
self.catalogs
|
||||
let _ = self
|
||||
.catalogs
|
||||
.register_table(register_number_table_req)
|
||||
.await?;
|
||||
|
||||
@@ -226,7 +229,8 @@ impl LocalCatalogManager {
|
||||
for entry in entries {
|
||||
match entry {
|
||||
Entry::Catalog(c) => {
|
||||
self.catalogs
|
||||
let _ = self
|
||||
.catalogs
|
||||
.register_catalog_if_absent(c.catalog_name.clone());
|
||||
info!("Register catalog: {}", c.catalog_name);
|
||||
}
|
||||
@@ -235,7 +239,7 @@ impl LocalCatalogManager {
|
||||
catalog: s.catalog_name.clone(),
|
||||
schema: s.schema_name.clone(),
|
||||
};
|
||||
self.catalogs.register_schema_sync(req)?;
|
||||
let _ = self.catalogs.register_schema_sync(req)?;
|
||||
info!("Registered schema: {:?}", s);
|
||||
}
|
||||
Entry::Table(t) => {
|
||||
@@ -297,7 +301,7 @@ impl LocalCatalogManager {
|
||||
table_id: t.table_id,
|
||||
table: table_ref,
|
||||
};
|
||||
self.catalogs.register_table(register_request).await?;
|
||||
let _ = self.catalogs.register_table(register_request).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -389,8 +393,9 @@ impl CatalogManager for LocalCatalogManager {
|
||||
let engine = request.table.table_info().meta.engine.to_string();
|
||||
let table_name = request.table_name.clone();
|
||||
let table_id = request.table_id;
|
||||
self.catalogs.register_table(request).await?;
|
||||
self.system
|
||||
let _ = self.catalogs.register_table(request).await?;
|
||||
let _ = self
|
||||
.system
|
||||
.register_table(
|
||||
catalog_name.clone(),
|
||||
schema_name.clone(),
|
||||
@@ -438,7 +443,8 @@ impl CatalogManager for LocalCatalogManager {
|
||||
|
||||
let engine = old_table.table_info().meta.engine.to_string();
|
||||
// rename table in system catalog
|
||||
self.system
|
||||
let _ = self
|
||||
.system
|
||||
.register_table(
|
||||
catalog_name.clone(),
|
||||
schema_name.clone(),
|
||||
@@ -499,7 +505,8 @@ impl CatalogManager for LocalCatalogManager {
|
||||
schema: schema_name,
|
||||
}
|
||||
);
|
||||
self.system
|
||||
let _ = self
|
||||
.system
|
||||
.register_schema(request.catalog.clone(), schema_name.clone())
|
||||
.await?;
|
||||
self.catalogs.register_schema_sync(request)
|
||||
|
||||
@@ -49,9 +49,8 @@ impl Default for MemoryCatalogManager {
|
||||
catalogs: Default::default(),
|
||||
};
|
||||
|
||||
let mut catalog = HashMap::with_capacity(1);
|
||||
catalog.insert(DEFAULT_SCHEMA_NAME.to_string(), HashMap::new());
|
||||
manager
|
||||
let catalog = HashMap::from([(DEFAULT_SCHEMA_NAME.to_string(), HashMap::new())]);
|
||||
let _ = manager
|
||||
.catalogs
|
||||
.write()
|
||||
.unwrap()
|
||||
@@ -115,7 +114,7 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
}
|
||||
|
||||
let table = schema.remove(&request.table_name).unwrap();
|
||||
schema.insert(request.new_table_name, table);
|
||||
let _ = schema.insert(request.new_table_name, table);
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
@@ -144,9 +143,11 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
}
|
||||
|
||||
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
|
||||
self.register_schema_sync(request)?;
|
||||
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0);
|
||||
Ok(true)
|
||||
let registered = self.register_schema_sync(request)?;
|
||||
if registered {
|
||||
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_SCHEMA_COUNT, 1.0);
|
||||
}
|
||||
Ok(registered)
|
||||
}
|
||||
|
||||
async fn register_system_table(&self, _request: RegisterSystemTableRequest) -> Result<()> {
|
||||
@@ -234,9 +235,11 @@ impl CatalogManager for MemoryCatalogManager {
|
||||
}
|
||||
|
||||
async fn register_catalog(&self, name: String) -> Result<bool> {
|
||||
self.register_catalog_sync(name)?;
|
||||
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
|
||||
Ok(true)
|
||||
let registered = self.register_catalog_sync(name)?;
|
||||
if registered {
|
||||
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
|
||||
}
|
||||
Ok(registered)
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
@@ -252,7 +255,7 @@ impl MemoryCatalogManager {
|
||||
match entry {
|
||||
Entry::Occupied(_) => true,
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(HashMap::new());
|
||||
let _ = v.insert(HashMap::new());
|
||||
false
|
||||
}
|
||||
}
|
||||
@@ -273,7 +276,7 @@ impl MemoryCatalogManager {
|
||||
if catalog.contains_key(&request.schema) {
|
||||
return Ok(false);
|
||||
}
|
||||
catalog.insert(request.schema, HashMap::new());
|
||||
let _ = catalog.insert(request.schema, HashMap::new());
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@@ -310,7 +313,7 @@ impl MemoryCatalogManager {
|
||||
table_id: table.table_info().ident.table_id,
|
||||
table,
|
||||
};
|
||||
manager.register_table_sync(request).unwrap();
|
||||
let _ = manager.register_table_sync(request).unwrap();
|
||||
manager
|
||||
}
|
||||
}
|
||||
@@ -341,7 +344,7 @@ mod tests {
|
||||
table: Arc::new(NumbersTable::default()),
|
||||
};
|
||||
|
||||
catalog_list.register_table(register_request).await.unwrap();
|
||||
assert!(catalog_list.register_table(register_request).await.is_ok());
|
||||
let table = catalog_list
|
||||
.table(
|
||||
DEFAULT_CATALOG_NAME,
|
||||
@@ -390,7 +393,7 @@ mod tests {
|
||||
new_table_name: new_table_name.to_string(),
|
||||
table_id,
|
||||
};
|
||||
catalog.rename_table(rename_request).await.unwrap();
|
||||
assert!(catalog.rename_table(rename_request).await.is_ok());
|
||||
|
||||
// test old table name not exist
|
||||
assert!(!catalog
|
||||
@@ -492,7 +495,7 @@ mod tests {
|
||||
table_id: 2333,
|
||||
table: Arc::new(NumbersTable::default()),
|
||||
};
|
||||
catalog.register_table(register_table_req).await.unwrap();
|
||||
assert!(catalog.register_table(register_table_req).await.is_ok());
|
||||
assert!(catalog
|
||||
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name)
|
||||
.await
|
||||
|
||||
@@ -240,7 +240,7 @@ impl KvBackend for MetaKvBackend {
|
||||
|
||||
async fn move_value(&self, from_key: &[u8], to_key: &[u8]) -> Result<()> {
|
||||
let req = MoveValueRequest::new(from_key, to_key);
|
||||
self.client.move_value(req).await.context(MetaSrvSnafu)?;
|
||||
let _ = self.client.move_value(req).await.context(MetaSrvSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -112,7 +112,7 @@ impl RemoteCatalogManager {
|
||||
joins.push(self.initiate_schemas(node_id, backend, engine_manager, catalog_name));
|
||||
}
|
||||
|
||||
futures::future::try_join_all(joins).await?;
|
||||
let _ = futures::future::try_join_all(joins).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -623,13 +623,14 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
self.check_catalog_schema_exist(&catalog_name, &schema_name)
|
||||
.await?;
|
||||
|
||||
self.register_table(
|
||||
catalog_name.clone(),
|
||||
schema_name.clone(),
|
||||
request.table_name,
|
||||
request.table.clone(),
|
||||
)
|
||||
.await?;
|
||||
let _ = self
|
||||
.register_table(
|
||||
catalog_name.clone(),
|
||||
schema_name.clone(),
|
||||
request.table_name,
|
||||
request.table.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let table_info = request.table.table_info();
|
||||
let table_ident = TableIdent {
|
||||
@@ -680,7 +681,8 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
table_id: table_info.ident.table_id,
|
||||
engine: table_info.meta.engine.clone(),
|
||||
};
|
||||
self.region_alive_keepers
|
||||
let _ = self
|
||||
.region_alive_keepers
|
||||
.deregister_table(&table_ident)
|
||||
.await;
|
||||
}
|
||||
@@ -846,7 +848,7 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
let catalog_key = String::from_utf8_lossy(&catalog.0);
|
||||
|
||||
if let Ok(key) = CatalogKey::parse(&catalog_key) {
|
||||
catalogs.insert(key.catalog_name);
|
||||
let _ = catalogs.insert(key.catalog_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -865,7 +867,7 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
let schema_key = String::from_utf8_lossy(&schema.0);
|
||||
|
||||
if let Ok(key) = SchemaKey::parse(&schema_key) {
|
||||
schemas.insert(key.schema_name);
|
||||
let _ = schemas.insert(key.schema_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -886,7 +888,7 @@ impl CatalogManager for RemoteCatalogManager {
|
||||
let table_key = String::from_utf8_lossy(&table.0);
|
||||
|
||||
if let Ok(key) = TableRegionalKey::parse(&table_key) {
|
||||
tables.insert(key.table_name);
|
||||
let _ = tables.insert(key.table_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,7 +45,6 @@ pub struct MockKvBackend {
|
||||
|
||||
impl Default for MockKvBackend {
|
||||
fn default() -> Self {
|
||||
let mut map = BTreeMap::default();
|
||||
let catalog_value = CatalogValue {}.as_bytes().unwrap();
|
||||
let schema_value = SchemaValue {}.as_bytes().unwrap();
|
||||
|
||||
@@ -60,11 +59,11 @@ impl Default for MockKvBackend {
|
||||
}
|
||||
.to_string();
|
||||
|
||||
// create default catalog and schema
|
||||
map.insert(default_catalog_key.into(), catalog_value);
|
||||
map.insert(default_schema_key.into(), schema_value);
|
||||
|
||||
let map = RwLock::new(map);
|
||||
let map = RwLock::new(BTreeMap::from([
|
||||
// create default catalog and schema
|
||||
(default_catalog_key.into(), catalog_value),
|
||||
(default_schema_key.into(), schema_value),
|
||||
]));
|
||||
Self { map }
|
||||
}
|
||||
}
|
||||
@@ -109,7 +108,7 @@ impl KvBackend for MockKvBackend {
|
||||
|
||||
async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error> {
|
||||
let mut map = self.map.write().await;
|
||||
map.insert(key.to_vec(), val.to_vec());
|
||||
let _ = map.insert(key.to_vec(), val.to_vec());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -124,7 +123,7 @@ impl KvBackend for MockKvBackend {
|
||||
match existing {
|
||||
Entry::Vacant(e) => {
|
||||
if expect.is_empty() {
|
||||
e.insert(val.to_vec());
|
||||
let _ = e.insert(val.to_vec());
|
||||
Ok(Ok(()))
|
||||
} else {
|
||||
Ok(Err(None))
|
||||
@@ -132,7 +131,7 @@ impl KvBackend for MockKvBackend {
|
||||
}
|
||||
Entry::Occupied(mut existing) => {
|
||||
if existing.get() == expect {
|
||||
existing.insert(val.to_vec());
|
||||
let _ = existing.insert(val.to_vec());
|
||||
Ok(Ok(()))
|
||||
} else {
|
||||
Ok(Err(Some(existing.get().clone())))
|
||||
@@ -201,7 +200,7 @@ impl TableEngine for MockTableEngine {
|
||||
)) as Arc<_>;
|
||||
|
||||
let mut tables = self.tables.write().unwrap();
|
||||
tables.insert(table_id, table.clone() as TableRef);
|
||||
let _ = tables.insert(table_id, table.clone() as TableRef);
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
|
||||
@@ -92,7 +92,7 @@ impl RegionAliveKeepers {
|
||||
}
|
||||
|
||||
let mut keepers = self.keepers.lock().await;
|
||||
keepers.insert(table_ident.clone(), keeper.clone());
|
||||
let _ = keepers.insert(table_ident.clone(), keeper.clone());
|
||||
|
||||
if self.started.load(Ordering::Relaxed) {
|
||||
keeper.start().await;
|
||||
@@ -237,7 +237,7 @@ impl RegionAliveKeeper {
|
||||
let countdown_task_handles = Arc::downgrade(&self.countdown_task_handles);
|
||||
let on_task_finished = async move {
|
||||
if let Some(x) = countdown_task_handles.upgrade() {
|
||||
x.lock().await.remove(®ion);
|
||||
let _ = x.lock().await.remove(®ion);
|
||||
} // Else the countdown task handles map could be dropped because the keeper is dropped.
|
||||
};
|
||||
let handle = Arc::new(CountdownTaskHandle::new(
|
||||
@@ -248,7 +248,7 @@ impl RegionAliveKeeper {
|
||||
));
|
||||
|
||||
let mut handles = self.countdown_task_handles.lock().await;
|
||||
handles.insert(region, handle.clone());
|
||||
let _ = handles.insert(region, handle.clone());
|
||||
|
||||
if self.started.load(Ordering::Relaxed) {
|
||||
handle.start(self.heartbeat_interval_millis).await;
|
||||
@@ -772,7 +772,7 @@ mod test {
|
||||
};
|
||||
|
||||
let table_engine = Arc::new(MockTableEngine::default());
|
||||
table_engine.create_table(ctx, request).await.unwrap();
|
||||
assert!(table_engine.create_table(ctx, request).await.is_ok());
|
||||
|
||||
let table_ident = TableIdent {
|
||||
catalog: catalog.to_string(),
|
||||
@@ -788,7 +788,7 @@ mod test {
|
||||
region: 1,
|
||||
rx,
|
||||
};
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
task.run().await;
|
||||
});
|
||||
|
||||
|
||||
@@ -228,21 +228,21 @@ pub(crate) fn build_table_deletion_request(
|
||||
}
|
||||
|
||||
fn build_primary_key_columns(entry_type: EntryType, key: &[u8]) -> HashMap<String, VectorRef> {
|
||||
let mut m = HashMap::with_capacity(3);
|
||||
m.insert(
|
||||
"entry_type".to_string(),
|
||||
Arc::new(UInt8Vector::from_slice([entry_type as u8])) as _,
|
||||
);
|
||||
m.insert(
|
||||
"key".to_string(),
|
||||
Arc::new(BinaryVector::from_slice(&[key])) as _,
|
||||
);
|
||||
// Timestamp in key part is intentionally left to 0
|
||||
m.insert(
|
||||
"timestamp".to_string(),
|
||||
Arc::new(TimestampMillisecondVector::from_slice([0])) as _,
|
||||
);
|
||||
m
|
||||
HashMap::from([
|
||||
(
|
||||
"entry_type".to_string(),
|
||||
Arc::new(UInt8Vector::from_slice([entry_type as u8])) as VectorRef,
|
||||
),
|
||||
(
|
||||
"key".to_string(),
|
||||
Arc::new(BinaryVector::from_slice(&[key])) as VectorRef,
|
||||
),
|
||||
(
|
||||
"timestamp".to_string(),
|
||||
// Timestamp in key part is intentionally left to 0
|
||||
Arc::new(TimestampMillisecondVector::from_slice([0])) as VectorRef,
|
||||
),
|
||||
])
|
||||
}
|
||||
|
||||
pub fn build_schema_insert_request(catalog_name: String, schema_name: String) -> InsertRequest {
|
||||
@@ -262,18 +262,18 @@ pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) ->
|
||||
let mut columns_values = HashMap::with_capacity(6);
|
||||
columns_values.extend(primary_key_columns.into_iter());
|
||||
|
||||
columns_values.insert(
|
||||
let _ = columns_values.insert(
|
||||
"value".to_string(),
|
||||
Arc::new(BinaryVector::from_slice(&[value])) as _,
|
||||
);
|
||||
|
||||
let now = util::current_time_millis();
|
||||
columns_values.insert(
|
||||
let _ = columns_values.insert(
|
||||
"gmt_created".to_string(),
|
||||
Arc::new(TimestampMillisecondVector::from_slice([now])) as _,
|
||||
);
|
||||
|
||||
columns_values.insert(
|
||||
let _ = columns_values.insert(
|
||||
"gmt_modified".to_string(),
|
||||
Arc::new(TimestampMillisecondVector::from_slice([now])) as _,
|
||||
);
|
||||
@@ -482,14 +482,13 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
pub fn test_decode_mismatch() {
|
||||
decode_system_catalog(
|
||||
assert!(decode_system_catalog(
|
||||
Some(EntryType::Table as u8),
|
||||
Some("some_catalog.some_schema.42".as_bytes()),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -504,7 +503,7 @@ mod tests {
|
||||
let dir = create_temp_dir("system-table-test");
|
||||
let store_dir = dir.path().to_string_lossy();
|
||||
let mut builder = object_store::services::Fs::default();
|
||||
builder.root(&store_dir);
|
||||
let _ = builder.root(&store_dir);
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
let noop_compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
|
||||
let table_engine = Arc::new(MitoEngine::new(
|
||||
|
||||
@@ -111,7 +111,7 @@ impl DfTableSourceProvider {
|
||||
|
||||
let provider = DfTableProviderAdapter::new(table);
|
||||
let source = provider_as_source(Arc::new(provider));
|
||||
self.resolved_tables.insert(resolved_name, source.clone());
|
||||
let _ = self.resolved_tables.insert(resolved_name, source.clone());
|
||||
Ok(source)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ mod tests {
|
||||
let mut res = HashSet::new();
|
||||
while let Some(r) = iter.next().await {
|
||||
let kv = r.unwrap();
|
||||
res.insert(String::from_utf8_lossy(&kv.0).to_string());
|
||||
let _ = res.insert(String::from_utf8_lossy(&kv.0).to_string());
|
||||
}
|
||||
assert_eq!(
|
||||
vec!["__c-greptime".to_string()],
|
||||
@@ -305,11 +305,11 @@ mod tests {
|
||||
let schema_name = "nonexistent_schema".to_string();
|
||||
|
||||
// register catalog to catalog manager
|
||||
components
|
||||
assert!(components
|
||||
.catalog_manager
|
||||
.register_catalog(catalog_name.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
.is_ok());
|
||||
assert_eq!(
|
||||
HashSet::<String>::from_iter(
|
||||
vec![DEFAULT_CATALOG_NAME.to_string(), catalog_name.clone()].into_iter()
|
||||
|
||||
@@ -165,7 +165,7 @@ impl Client {
|
||||
pub async fn health_check(&self) -> Result<()> {
|
||||
let (_, channel) = self.find_channel()?;
|
||||
let mut client = HealthCheckClient::new(channel);
|
||||
client.health_check(HealthCheckRequest {}).await?;
|
||||
let _ = client.health_check(HealthCheckRequest {}).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,7 +173,7 @@ impl Database {
|
||||
let mut client = self.client.make_database_client()?.inner;
|
||||
let (sender, receiver) = mpsc::channel::<GreptimeRequest>(65536);
|
||||
let receiver = ReceiverStream::new(receiver);
|
||||
client.handle_requests(receiver).await?;
|
||||
let _ = client.handle_requests(receiver).await?;
|
||||
Ok(sender)
|
||||
}
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ mod tests {
|
||||
let random = Random;
|
||||
for _ in 0..100 {
|
||||
let peer = random.get_peer(&peers).unwrap();
|
||||
all.contains(peer);
|
||||
assert!(all.contains(peer));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,7 +108,7 @@ impl Repl {
|
||||
Ok(ref line) => {
|
||||
let request = line.trim();
|
||||
|
||||
self.rl.add_history_entry(request.to_string());
|
||||
let _ = self.rl.add_history_entry(request.to_string());
|
||||
|
||||
request.try_into()
|
||||
}
|
||||
@@ -137,7 +137,7 @@ impl Repl {
|
||||
}
|
||||
}
|
||||
ReplCommand::Sql { sql } => {
|
||||
self.execute_sql(sql).await;
|
||||
let _ = self.execute_sql(sql).await;
|
||||
}
|
||||
ReplCommand::Exit => {
|
||||
return Ok(());
|
||||
|
||||
@@ -326,12 +326,12 @@ mod tests {
|
||||
.is_err());
|
||||
|
||||
// Providing node_id but leave metasrv_addr absent is ok since metasrv_addr has default value
|
||||
(StartCommand {
|
||||
assert!((StartCommand {
|
||||
node_id: Some(42),
|
||||
..Default::default()
|
||||
})
|
||||
.load_options(TopLevelOptions::default())
|
||||
.unwrap();
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -27,10 +27,10 @@ mod tests {
|
||||
|
||||
impl Repl {
|
||||
fn send_line(&mut self, line: &str) {
|
||||
self.repl.send_line(line).unwrap();
|
||||
assert!(self.repl.send_line(line).is_ok());
|
||||
|
||||
// read a line to consume the prompt
|
||||
self.read_line();
|
||||
let _ = self.read_line();
|
||||
}
|
||||
|
||||
fn read_line(&mut self) -> String {
|
||||
@@ -76,7 +76,7 @@ mod tests {
|
||||
std::thread::sleep(Duration::from_secs(3));
|
||||
|
||||
let mut repl_cmd = Command::new("./greptime");
|
||||
repl_cmd.current_dir(bin_path).args([
|
||||
let _ = repl_cmd.current_dir(bin_path).args([
|
||||
"--log-level=off",
|
||||
"cli",
|
||||
"attach",
|
||||
@@ -105,7 +105,7 @@ mod tests {
|
||||
test_select(repl);
|
||||
|
||||
datanode.kill().unwrap();
|
||||
datanode.wait().unwrap();
|
||||
assert!(datanode.wait().is_ok());
|
||||
}
|
||||
|
||||
fn test_create_database(repl: &mut Repl) {
|
||||
|
||||
@@ -41,7 +41,7 @@ impl Plugins {
|
||||
}
|
||||
|
||||
pub fn insert<T: 'static + Send + Sync>(&self, value: T) {
|
||||
self.lock().insert(value);
|
||||
let _ = self.lock().insert(value);
|
||||
}
|
||||
|
||||
pub fn get<T: 'static + Send + Sync + Clone>(&self) -> Option<T> {
|
||||
|
||||
@@ -213,7 +213,7 @@ pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
|
||||
}
|
||||
|
||||
// Flushes all pending writes
|
||||
writer.try_flush(true).await?;
|
||||
let _ = writer.try_flush(true).await?;
|
||||
writer.close_inner_writer().await?;
|
||||
|
||||
Ok(rows)
|
||||
|
||||
@@ -291,20 +291,20 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_try_from() {
|
||||
let mut map = HashMap::new();
|
||||
let map = HashMap::new();
|
||||
let format: CsvFormat = CsvFormat::try_from(&map).unwrap();
|
||||
|
||||
assert_eq!(format, CsvFormat::default());
|
||||
|
||||
map.insert(
|
||||
FORMAT_SCHEMA_INFER_MAX_RECORD.to_string(),
|
||||
"2000".to_string(),
|
||||
);
|
||||
|
||||
map.insert(FORMAT_COMPRESSION_TYPE.to_string(), "zstd".to_string());
|
||||
map.insert(FORMAT_DELIMITER.to_string(), b'\t'.to_string());
|
||||
map.insert(FORMAT_HAS_HEADER.to_string(), "false".to_string());
|
||||
|
||||
let map = HashMap::from([
|
||||
(
|
||||
FORMAT_SCHEMA_INFER_MAX_RECORD.to_string(),
|
||||
"2000".to_string(),
|
||||
),
|
||||
(FORMAT_COMPRESSION_TYPE.to_string(), "zstd".to_string()),
|
||||
(FORMAT_DELIMITER.to_string(), b'\t'.to_string()),
|
||||
(FORMAT_HAS_HEADER.to_string(), "false".to_string()),
|
||||
]);
|
||||
let format = CsvFormat::try_from(&map).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
|
||||
@@ -214,18 +214,18 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_try_from() {
|
||||
let mut map = HashMap::new();
|
||||
let map = HashMap::new();
|
||||
let format = JsonFormat::try_from(&map).unwrap();
|
||||
|
||||
assert_eq!(format, JsonFormat::default());
|
||||
|
||||
map.insert(
|
||||
FORMAT_SCHEMA_INFER_MAX_RECORD.to_string(),
|
||||
"2000".to_string(),
|
||||
);
|
||||
|
||||
map.insert(FORMAT_COMPRESSION_TYPE.to_string(), "zstd".to_string());
|
||||
|
||||
let map = HashMap::from([
|
||||
(
|
||||
FORMAT_SCHEMA_INFER_MAX_RECORD.to_string(),
|
||||
"2000".to_string(),
|
||||
),
|
||||
(FORMAT_COMPRESSION_TYPE.to_string(), "zstd".to_string()),
|
||||
]);
|
||||
let format = JsonFormat::try_from(&map).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
|
||||
@@ -20,7 +20,7 @@ use crate::error::{BuildBackendSnafu, Result};
|
||||
|
||||
pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
|
||||
let mut builder = Fs::default();
|
||||
builder.root(root);
|
||||
let _ = builder.root(root);
|
||||
let object_store = ObjectStore::new(builder)
|
||||
.context(BuildBackendSnafu)?
|
||||
.finish();
|
||||
|
||||
@@ -34,28 +34,26 @@ pub fn build_s3_backend(
|
||||
) -> Result<ObjectStore> {
|
||||
let mut builder = S3::default();
|
||||
|
||||
builder.root(path);
|
||||
|
||||
builder.bucket(host);
|
||||
let _ = builder.root(path).bucket(host);
|
||||
|
||||
if let Some(endpoint) = connection.get(ENDPOINT_URL) {
|
||||
builder.endpoint(endpoint);
|
||||
let _ = builder.endpoint(endpoint);
|
||||
}
|
||||
|
||||
if let Some(region) = connection.get(REGION) {
|
||||
builder.region(region);
|
||||
let _ = builder.region(region);
|
||||
}
|
||||
|
||||
if let Some(key_id) = connection.get(ACCESS_KEY_ID) {
|
||||
builder.access_key_id(key_id);
|
||||
let _ = builder.access_key_id(key_id);
|
||||
}
|
||||
|
||||
if let Some(key) = connection.get(SECRET_ACCESS_KEY) {
|
||||
builder.secret_access_key(key);
|
||||
let _ = builder.secret_access_key(key);
|
||||
}
|
||||
|
||||
if let Some(session_token) = connection.get(SESSION_TOKEN) {
|
||||
builder.security_token(session_token);
|
||||
let _ = builder.security_token(session_token);
|
||||
}
|
||||
|
||||
if let Some(enable_str) = connection.get(ENABLE_VIRTUAL_HOST_STYLE) {
|
||||
@@ -69,7 +67,7 @@ pub fn build_s3_backend(
|
||||
.build()
|
||||
})?;
|
||||
if enable {
|
||||
builder.enable_virtual_host_style();
|
||||
let _ = builder.enable_virtual_host_style();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ pub fn format_schema(schema: Schema) -> Vec<String> {
|
||||
|
||||
pub fn test_store(root: &str) -> ObjectStore {
|
||||
let mut builder = Fs::default();
|
||||
builder.root(root);
|
||||
let _ = builder.root(root);
|
||||
|
||||
ObjectStore::new(builder).unwrap().finish()
|
||||
}
|
||||
@@ -64,7 +64,7 @@ pub fn test_tmp_store(root: &str) -> (ObjectStore, TempDir) {
|
||||
let dir = create_temp_dir(root);
|
||||
|
||||
let mut builder = Fs::default();
|
||||
builder.root("/");
|
||||
let _ = builder.root("/");
|
||||
|
||||
(ObjectStore::new(builder).unwrap().finish(), dir)
|
||||
}
|
||||
@@ -113,14 +113,14 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
|
||||
|
||||
let output_path = format!("{}/{}", dir.path().display(), "output");
|
||||
|
||||
stream_to_json(
|
||||
assert!(stream_to_json(
|
||||
Box::pin(stream),
|
||||
tmp_store.clone(),
|
||||
&output_path,
|
||||
threshold(size),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
.is_ok());
|
||||
|
||||
let written = tmp_store.read(&output_path).await.unwrap();
|
||||
let origin = store.read(origin_path).await.unwrap();
|
||||
@@ -155,14 +155,14 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz
|
||||
|
||||
let output_path = format!("{}/{}", dir.path().display(), "output");
|
||||
|
||||
stream_to_csv(
|
||||
assert!(stream_to_csv(
|
||||
Box::pin(stream),
|
||||
tmp_store.clone(),
|
||||
&output_path,
|
||||
threshold(size),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
.is_ok());
|
||||
|
||||
let written = tmp_store.read(&output_path).await.unwrap();
|
||||
let origin = store.read(origin_path).await.unwrap();
|
||||
|
||||
@@ -22,7 +22,7 @@ struct Foo {}
|
||||
#[test]
|
||||
#[allow(clippy::extra_unused_type_parameters)]
|
||||
fn test_derive() {
|
||||
Foo::default();
|
||||
let _ = Foo::default();
|
||||
assert_fields!(Foo: input_types);
|
||||
assert_impl_all!(Foo: std::fmt::Debug, Default, AggrFuncTypeStore);
|
||||
}
|
||||
|
||||
@@ -32,14 +32,16 @@ pub struct FunctionRegistry {
|
||||
|
||||
impl FunctionRegistry {
|
||||
pub fn register(&self, func: FunctionRef) {
|
||||
self.functions
|
||||
let _ = self
|
||||
.functions
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(func.name().to_string(), func);
|
||||
}
|
||||
|
||||
pub fn register_aggregate_function(&self, func: AggregateFunctionMetaRef) {
|
||||
self.aggregate_functions
|
||||
let _ = self
|
||||
.aggregate_functions
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(func.name(), func);
|
||||
|
||||
@@ -77,7 +77,7 @@ pub fn find_new_columns(schema: &SchemaRef, columns: &[Column]) -> Result<Option
|
||||
is_key: *semantic_type == TAG_SEMANTIC_TYPE,
|
||||
location: None,
|
||||
});
|
||||
new_columns.insert(column_name.to_string());
|
||||
let _ = new_columns.insert(column_name.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -239,7 +239,7 @@ pub fn build_create_expr_from_insertion(
|
||||
|
||||
let column_def = build_column_def(column_name, *datatype, is_nullable);
|
||||
column_defs.push(column_def);
|
||||
new_columns.insert(column_name.to_string());
|
||||
let _ = new_columns.insert(column_name.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ async fn do_bench_channel_manager() {
|
||||
}
|
||||
|
||||
fn bench_channel_manager(c: &mut Criterion) {
|
||||
c.bench_function("bench channel manager", |b| {
|
||||
let _ = c.bench_function("bench channel manager", |b| {
|
||||
b.iter(do_bench_channel_manager);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ impl ChannelManager {
|
||||
}
|
||||
|
||||
let pool = self.pool.clone();
|
||||
common_runtime::spawn_bg(async {
|
||||
let _handle = common_runtime::spawn_bg(async {
|
||||
recycle_channel_in_loop(pool, RECYCLE_CHANNEL_INTERVAL_SECS).await;
|
||||
});
|
||||
info!("Channel recycle is started, running in the background!");
|
||||
@@ -398,7 +398,7 @@ impl Channel {
|
||||
|
||||
#[inline]
|
||||
pub fn increase_access(&self) {
|
||||
self.access.fetch_add(1, Ordering::Relaxed);
|
||||
let _ = self.access.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -427,7 +427,7 @@ impl Pool {
|
||||
}
|
||||
|
||||
fn put(&self, addr: &str, channel: Channel) {
|
||||
self.channels.insert(addr.to_string(), channel);
|
||||
let _ = self.channels.insert(addr.to_string(), channel);
|
||||
}
|
||||
|
||||
fn retain_channel<F>(&self, f: F)
|
||||
@@ -442,7 +442,7 @@ async fn recycle_channel_in_loop(pool: Arc<Pool>, interval_secs: u64) {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let _ = interval.tick().await;
|
||||
pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -217,7 +217,7 @@ impl LinesWriter {
|
||||
datatype: datatype as i32,
|
||||
null_mask: Vec::default(),
|
||||
});
|
||||
column_names.insert(column_name.to_string(), new_idx);
|
||||
let _ = column_names.insert(column_name.to_string(), new_idx);
|
||||
new_idx
|
||||
}
|
||||
};
|
||||
|
||||
@@ -62,7 +62,8 @@ pub async fn dump_profile() -> error::Result<Vec<u8>> {
|
||||
.await
|
||||
.context(OpenTempFileSnafu { path: &path })?;
|
||||
let mut buf = vec![];
|
||||
f.read_to_end(&mut buf)
|
||||
let _ = f
|
||||
.read_to_end(&mut buf)
|
||||
.await
|
||||
.context(OpenTempFileSnafu { path })?;
|
||||
Ok(buf)
|
||||
|
||||
@@ -202,13 +202,13 @@ impl TableRoute {
|
||||
.iter()
|
||||
.filter_map(|x| x.leader_peer.as_ref())
|
||||
.for_each(|p| {
|
||||
peers.insert(p.clone());
|
||||
let _ = peers.insert(p.clone());
|
||||
});
|
||||
self.region_routes
|
||||
.iter()
|
||||
.flat_map(|x| x.follower_peers.iter())
|
||||
.for_each(|p| {
|
||||
peers.insert(p.clone());
|
||||
let _ = peers.insert(p.clone());
|
||||
});
|
||||
let mut peers = peers.into_iter().map(Into::into).collect::<Vec<PbPeer>>();
|
||||
peers.sort_by_key(|x| x.id);
|
||||
|
||||
@@ -292,7 +292,7 @@ impl ManagerContext {
|
||||
fn remove_messages(&self, procedure_ids: &[ProcedureId]) {
|
||||
let mut messages = self.messages.lock().unwrap();
|
||||
for procedure_id in procedure_ids {
|
||||
messages.remove(procedure_id);
|
||||
let _ = messages.remove(procedure_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -319,7 +319,7 @@ impl ManagerContext {
|
||||
while let Some((id, finish_time)) = finished_procedures.front() {
|
||||
if finish_time.elapsed() > ttl {
|
||||
ids_to_remove.push(*id);
|
||||
finished_procedures.pop_front();
|
||||
let _ = finished_procedures.pop_front();
|
||||
} else {
|
||||
// The rest procedures are finished later, so we can break
|
||||
// the loop.
|
||||
@@ -335,7 +335,7 @@ impl ManagerContext {
|
||||
|
||||
let mut procedures = self.procedures.write().unwrap();
|
||||
for id in ids {
|
||||
procedures.remove(&id);
|
||||
let _ = procedures.remove(&id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -419,7 +419,7 @@ impl LocalManager {
|
||||
DuplicateProcedureSnafu { procedure_id },
|
||||
);
|
||||
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
// Run the root procedure.
|
||||
runner.run().await;
|
||||
});
|
||||
@@ -434,7 +434,7 @@ impl ProcedureManager for LocalManager {
|
||||
let mut loaders = self.manager_ctx.loaders.lock().unwrap();
|
||||
ensure!(!loaders.contains_key(name), LoaderConflictSnafu { name });
|
||||
|
||||
loaders.insert(name.to_string(), loader);
|
||||
let _ = loaders.insert(name.to_string(), loader);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -559,7 +559,7 @@ mod test_util {
|
||||
pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
let mut builder = Builder::default();
|
||||
builder.root(store_dir);
|
||||
let _ = builder.root(store_dir);
|
||||
ObjectStore::new(builder).unwrap().finish()
|
||||
}
|
||||
}
|
||||
@@ -770,13 +770,13 @@ mod tests {
|
||||
|
||||
let mut procedure = ProcedureToLoad::new("submit");
|
||||
procedure.lock_key = LockKey::single("test.submit");
|
||||
manager
|
||||
assert!(manager
|
||||
.submit(ProcedureWithId {
|
||||
id: procedure_id,
|
||||
procedure: Box::new(procedure),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
.is_ok());
|
||||
assert!(manager
|
||||
.procedure_state(procedure_id)
|
||||
.await
|
||||
@@ -877,13 +877,13 @@ mod tests {
|
||||
let mut procedure = ProcedureToLoad::new("submit");
|
||||
procedure.lock_key = LockKey::single("test.submit");
|
||||
let procedure_id = ProcedureId::random();
|
||||
manager
|
||||
assert!(manager
|
||||
.submit(ProcedureWithId {
|
||||
id: procedure_id,
|
||||
procedure: Box::new(procedure),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
.is_ok());
|
||||
let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
|
||||
watcher.changed().await.unwrap();
|
||||
manager.start().unwrap();
|
||||
@@ -899,13 +899,13 @@ mod tests {
|
||||
let mut procedure = ProcedureToLoad::new("submit");
|
||||
procedure.lock_key = LockKey::single("test.submit");
|
||||
let procedure_id = ProcedureId::random();
|
||||
manager
|
||||
assert!(manager
|
||||
.submit(ProcedureWithId {
|
||||
id: procedure_id,
|
||||
procedure: Box::new(procedure),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
.is_ok());
|
||||
let mut watcher = manager.procedure_watcher(procedure_id).unwrap();
|
||||
watcher.changed().await.unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
@@ -88,7 +88,7 @@ impl LockMap {
|
||||
// expect that a procedure should not wait for two lock simultaneously.
|
||||
lock.waiters.push_back(meta.clone());
|
||||
} else {
|
||||
locks.insert(key.to_string(), Lock::from_owner(meta));
|
||||
let _ = locks.insert(key.to_string(), Lock::from_owner(meta));
|
||||
|
||||
return;
|
||||
}
|
||||
@@ -111,7 +111,7 @@ impl LockMap {
|
||||
|
||||
if !lock.switch_owner() {
|
||||
// No body waits for this lock, we can remove the lock entry.
|
||||
locks.remove(key);
|
||||
let _ = locks.remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -332,7 +332,7 @@ impl Runner {
|
||||
// Add the id of the subprocedure to the metadata.
|
||||
self.meta.push_child(procedure_id);
|
||||
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
// Run the root procedure.
|
||||
runner.run().await
|
||||
});
|
||||
|
||||
@@ -198,7 +198,7 @@ impl ProcedureStore {
|
||||
entry.1 = value;
|
||||
}
|
||||
} else {
|
||||
procedure_key_values.insert(curr_key.procedure_id, (curr_key, value));
|
||||
let _ = procedure_key_values.insert(curr_key.procedure_id, (curr_key, value));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -211,7 +211,7 @@ impl ProcedureStore {
|
||||
// procedures are loaded.
|
||||
continue;
|
||||
};
|
||||
messages.insert(procedure_id, message);
|
||||
let _ = messages.insert(procedure_id, message);
|
||||
} else {
|
||||
finished_ids.push(procedure_id);
|
||||
}
|
||||
@@ -331,7 +331,7 @@ mod tests {
|
||||
fn procedure_store_for_test(dir: &TempDir) -> ProcedureStore {
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
let mut builder = Builder::default();
|
||||
builder.root(store_dir);
|
||||
let _ = builder.root(store_dir);
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
|
||||
ProcedureStore::from_object_store(object_store)
|
||||
|
||||
@@ -173,7 +173,7 @@ mod tests {
|
||||
let dir = create_temp_dir("state_store");
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
let mut builder = Builder::default();
|
||||
builder.root(store_dir);
|
||||
let _ = builder.root(store_dir);
|
||||
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
let state_store = ObjectStateStore::new(object_store);
|
||||
@@ -244,7 +244,7 @@ mod tests {
|
||||
let dir = create_temp_dir("state_store_list");
|
||||
let store_dir = dir.path().to_str().unwrap();
|
||||
let mut builder = Builder::default();
|
||||
builder.root(store_dir);
|
||||
let _ = builder.root(store_dir);
|
||||
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
let state_store = ObjectStateStore::new(object_store);
|
||||
|
||||
@@ -164,7 +164,7 @@ impl RecordBatch {
|
||||
vector.clone()
|
||||
};
|
||||
|
||||
vectors.insert(column_name.clone(), vector);
|
||||
let _ = vectors.insert(column_name.clone(), vector);
|
||||
}
|
||||
|
||||
Ok(vectors)
|
||||
|
||||
@@ -172,8 +172,7 @@ mod tests {
|
||||
}
|
||||
|
||||
async fn call(&mut self) -> Result<()> {
|
||||
self.n.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let _ = self.n.fetch_add(1, Ordering::Relaxed);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ pub struct Dropper {
|
||||
impl Drop for Dropper {
|
||||
fn drop(&mut self) {
|
||||
// Send a signal to say i am dropping.
|
||||
self.close.take().map(|v| v.send(()));
|
||||
let _ = self.close.take().map(|v| v.send(()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,7 +104,7 @@ impl Builder {
|
||||
///
|
||||
/// This can be any number above 0. The default value is the number of cores available to the system.
|
||||
pub fn worker_threads(&mut self, val: usize) -> &mut Self {
|
||||
self.builder.worker_threads(val);
|
||||
let _ = self.builder.worker_threads(val);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -114,7 +114,7 @@ impl Builder {
|
||||
/// they are not always active and will exit if left idle for too long, You can change this timeout duration
|
||||
/// with thread_keep_alive. The default value is 512.
|
||||
pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self {
|
||||
self.builder.max_blocking_threads(val);
|
||||
let _ = self.builder.max_blocking_threads(val);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -122,7 +122,7 @@ impl Builder {
|
||||
///
|
||||
/// By default, the timeout for a thread is set to 10 seconds.
|
||||
pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self {
|
||||
self.builder.thread_keep_alive(duration);
|
||||
let _ = self.builder.thread_keep_alive(duration);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -227,7 +227,7 @@ mod tests {
|
||||
// wait threads created
|
||||
thread::sleep(Duration::from_millis(50));
|
||||
|
||||
runtime.spawn(async {
|
||||
let _handle = runtime.spawn(async {
|
||||
thread::sleep(Duration::from_millis(50));
|
||||
});
|
||||
|
||||
@@ -247,7 +247,7 @@ mod tests {
|
||||
let out = runtime.block_on(async {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let _ = thread::spawn(move || {
|
||||
thread::sleep(Duration::from_millis(50));
|
||||
tx.send("ZOMG").unwrap();
|
||||
});
|
||||
|
||||
@@ -46,7 +46,7 @@ pub fn set_panic_hook() {
|
||||
}));
|
||||
|
||||
#[cfg(feature = "deadlock_detection")]
|
||||
std::thread::spawn(move || loop {
|
||||
let _ = std::thread::spawn(move || loop {
|
||||
std::thread::sleep(Duration::from_secs(5));
|
||||
let deadlocks = parking_lot::deadlock::check_deadlock();
|
||||
if deadlocks.is_empty() {
|
||||
|
||||
@@ -879,14 +879,14 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_split_overflow() {
|
||||
Timestamp::new(i64::MAX, TimeUnit::Second).split();
|
||||
Timestamp::new(i64::MIN, TimeUnit::Second).split();
|
||||
Timestamp::new(i64::MAX, TimeUnit::Millisecond).split();
|
||||
Timestamp::new(i64::MIN, TimeUnit::Millisecond).split();
|
||||
Timestamp::new(i64::MAX, TimeUnit::Microsecond).split();
|
||||
Timestamp::new(i64::MIN, TimeUnit::Microsecond).split();
|
||||
Timestamp::new(i64::MAX, TimeUnit::Nanosecond).split();
|
||||
Timestamp::new(i64::MIN, TimeUnit::Nanosecond).split();
|
||||
let _ = Timestamp::new(i64::MAX, TimeUnit::Second).split();
|
||||
let _ = Timestamp::new(i64::MIN, TimeUnit::Second).split();
|
||||
let _ = Timestamp::new(i64::MAX, TimeUnit::Millisecond).split();
|
||||
let _ = Timestamp::new(i64::MIN, TimeUnit::Millisecond).split();
|
||||
let _ = Timestamp::new(i64::MAX, TimeUnit::Microsecond).split();
|
||||
let _ = Timestamp::new(i64::MIN, TimeUnit::Microsecond).split();
|
||||
let _ = Timestamp::new(i64::MAX, TimeUnit::Nanosecond).split();
|
||||
let _ = Timestamp::new(i64::MIN, TimeUnit::Nanosecond).split();
|
||||
let (sec, nsec) = Timestamp::new(i64::MIN, TimeUnit::Nanosecond).split();
|
||||
let time = NaiveDateTime::from_timestamp_opt(sec, nsec).unwrap();
|
||||
assert_eq!(sec, time.timestamp());
|
||||
|
||||
@@ -89,7 +89,7 @@ impl HeartbeatTask {
|
||||
let client_id = meta_client.id();
|
||||
|
||||
let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
while let Some(res) = match rx.message().await {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
@@ -160,7 +160,7 @@ impl HeartbeatTask {
|
||||
.await?;
|
||||
|
||||
let epoch = self.region_alive_keepers.epoch();
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
let sleep = tokio::time::sleep(Duration::from_millis(0));
|
||||
tokio::pin!(sleep);
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ impl HeartbeatResponseHandler for CloseRegionHandler {
|
||||
|
||||
let mailbox = ctx.mailbox.clone();
|
||||
let self_ref = Arc::new(self.clone());
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
let result = self_ref.close_region_inner(region_ident).await;
|
||||
|
||||
if let Err(e) = mailbox
|
||||
@@ -191,7 +191,8 @@ impl CloseRegionHandler {
|
||||
// Deregister table if The table released.
|
||||
self.deregister_table(table_ref).await?;
|
||||
|
||||
self.region_alive_keepers
|
||||
let _ = self
|
||||
.region_alive_keepers
|
||||
.deregister_table(table_ident)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -58,7 +58,7 @@ impl HeartbeatResponseHandler for OpenRegionHandler {
|
||||
let self_ref = Arc::new(self.clone());
|
||||
|
||||
let region_alive_keepers = self.region_alive_keepers.clone();
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
let table_ident = ®ion_ident.table_ident;
|
||||
let request = OpenTableRequest {
|
||||
catalog_name: table_ident.catalog.clone(),
|
||||
|
||||
@@ -177,21 +177,21 @@ impl Instance {
|
||||
object_store.clone(),
|
||||
));
|
||||
|
||||
let mut engine_procedures = HashMap::with_capacity(2);
|
||||
engine_procedures.insert(
|
||||
mito_engine.name().to_string(),
|
||||
mito_engine.clone() as TableEngineProcedureRef,
|
||||
);
|
||||
|
||||
let immutable_file_engine = Arc::new(ImmutableFileTableEngine::new(
|
||||
file_table_engine::config::EngineConfig::default(),
|
||||
object_store.clone(),
|
||||
));
|
||||
engine_procedures.insert(
|
||||
immutable_file_engine.name().to_string(),
|
||||
immutable_file_engine.clone() as TableEngineProcedureRef,
|
||||
);
|
||||
|
||||
let engine_procedures = HashMap::from([
|
||||
(
|
||||
mito_engine.name().to_string(),
|
||||
mito_engine.clone() as TableEngineProcedureRef,
|
||||
),
|
||||
(
|
||||
immutable_file_engine.name().to_string(),
|
||||
immutable_file_engine.clone() as TableEngineProcedureRef,
|
||||
),
|
||||
]);
|
||||
let engine_manager = Arc::new(
|
||||
MemoryTableEngineManager::with(vec![
|
||||
mito_engine.clone(),
|
||||
@@ -207,7 +207,7 @@ impl Instance {
|
||||
let catalog = Arc::new(catalog::local::MemoryCatalogManager::default());
|
||||
let table = NumbersTable::new(MIN_USER_TABLE_ID);
|
||||
|
||||
catalog
|
||||
let _ = catalog
|
||||
.register_table(RegisterTableRequest {
|
||||
table_id: MIN_USER_TABLE_ID,
|
||||
table_name: table.table_info().name.to_string(),
|
||||
@@ -376,7 +376,7 @@ impl Instance {
|
||||
.map_err(BoxedError::new)
|
||||
.context(ShutdownInstanceSnafu);
|
||||
info!("Flushed all tables result: {}", flush_result.is_ok());
|
||||
flush_result?;
|
||||
let _ = flush_result?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -281,11 +281,11 @@ async fn new_dummy_catalog_list(
|
||||
)
|
||||
.await?;
|
||||
let catalog_provider = MemoryCatalogProvider::new();
|
||||
catalog_provider
|
||||
assert!(catalog_provider
|
||||
.register_schema(schema_name, Arc::new(schema_provider) as Arc<_>)
|
||||
.unwrap();
|
||||
.is_ok());
|
||||
let catalog_list = MemoryCatalogList::new();
|
||||
catalog_list.register_catalog(
|
||||
let _ = catalog_list.register_catalog(
|
||||
catalog_name.to_string(),
|
||||
Arc::new(catalog_provider) as Arc<_>,
|
||||
);
|
||||
@@ -438,9 +438,12 @@ mod test {
|
||||
async fn test_handle_insert() {
|
||||
let instance = MockInstance::new("test_handle_insert").await;
|
||||
let instance = instance.inner();
|
||||
test_util::create_test_table(instance, ConcreteDataType::timestamp_millisecond_datatype())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(test_util::create_test_table(
|
||||
instance,
|
||||
ConcreteDataType::timestamp_millisecond_datatype()
|
||||
)
|
||||
.await
|
||||
.is_ok());
|
||||
|
||||
let insert = InsertRequest {
|
||||
table_name: "demo".to_string(),
|
||||
@@ -508,9 +511,12 @@ mod test {
|
||||
async fn test_handle_delete() {
|
||||
let instance = MockInstance::new("test_handle_delete").await;
|
||||
let instance = instance.inner();
|
||||
test_util::create_test_table(instance, ConcreteDataType::timestamp_millisecond_datatype())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(test_util::create_test_table(
|
||||
instance,
|
||||
ConcreteDataType::timestamp_millisecond_datatype()
|
||||
)
|
||||
.await
|
||||
.is_ok());
|
||||
|
||||
let query = GrpcRequest::Query(QueryRequest {
|
||||
query: Some(Query::Sql(
|
||||
@@ -574,9 +580,12 @@ mod test {
|
||||
async fn test_handle_query() {
|
||||
let instance = MockInstance::new("test_handle_query").await;
|
||||
let instance = instance.inner();
|
||||
test_util::create_test_table(instance, ConcreteDataType::timestamp_millisecond_datatype())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(test_util::create_test_table(
|
||||
instance,
|
||||
ConcreteDataType::timestamp_millisecond_datatype()
|
||||
)
|
||||
.await
|
||||
.is_ok());
|
||||
|
||||
let query = GrpcRequest::Query(QueryRequest {
|
||||
query: Some(Query::Sql(
|
||||
|
||||
@@ -72,7 +72,7 @@ impl Services {
|
||||
})?;
|
||||
let grpc = self.grpc_server.start(grpc_addr);
|
||||
let http = self.http_server.start(http_addr);
|
||||
future::try_join_all(vec![grpc, http])
|
||||
let _ = future::try_join_all(vec![grpc, http])
|
||||
.await
|
||||
.context(StartServerSnafu)?;
|
||||
|
||||
|
||||
@@ -63,7 +63,8 @@ impl SqlHandler {
|
||||
catalog,
|
||||
schema: schema.clone(),
|
||||
};
|
||||
self.catalog_manager
|
||||
let _ = self
|
||||
.catalog_manager
|
||||
.register_schema(reg_req)
|
||||
.await
|
||||
.context(RegisterSchemaSnafu)?;
|
||||
|
||||
@@ -38,7 +38,7 @@ impl SqlHandler {
|
||||
.context(error::PrepareImmutableTableSnafu)?;
|
||||
|
||||
let meta = ImmutableFileTableOptions { files };
|
||||
options.insert(
|
||||
let _ = options.insert(
|
||||
IMMUTABLE_TABLE_META_KEY.to_string(),
|
||||
serde_json::to_string(&meta).context(error::EncodeJsonSnafu)?,
|
||||
);
|
||||
|
||||
@@ -38,7 +38,7 @@ impl SqlHandler {
|
||||
.table_names(&req.catalog_name, &req.schema_name)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
futures::future::join_all(all_table_names.iter().map(|table| {
|
||||
let _ = futures::future::join_all(all_table_names.iter().map(|table| {
|
||||
self.flush_table_inner(
|
||||
&self.catalog_manager,
|
||||
&req.catalog_name,
|
||||
|
||||
@@ -30,7 +30,7 @@ pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Res
|
||||
);
|
||||
|
||||
let mut builder = AzureBuilder::default();
|
||||
builder
|
||||
let _ = builder
|
||||
.root(&root)
|
||||
.container(&azblob_config.container)
|
||||
.endpoint(&azblob_config.endpoint)
|
||||
@@ -38,7 +38,7 @@ pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Res
|
||||
.account_key(azblob_config.account_key.expose_secret());
|
||||
|
||||
if let Some(token) = &azblob_config.sas_token {
|
||||
builder.sas_token(token);
|
||||
let _ = builder.sas_token(token);
|
||||
}
|
||||
|
||||
Ok(ObjectStore::new(builder)
|
||||
|
||||
@@ -33,7 +33,7 @@ pub(crate) async fn new_fs_object_store(file_config: &FileConfig) -> Result<Obje
|
||||
store::clean_temp_dir(&atomic_write_dir)?;
|
||||
|
||||
let mut builder = FsBuilder::default();
|
||||
builder.root(&data_home).atomic_write_dir(&atomic_write_dir);
|
||||
let _ = builder.root(&data_home).atomic_write_dir(&atomic_write_dir);
|
||||
|
||||
let object_store = ObjectStore::new(builder)
|
||||
.context(error::InitBackendSnafu)?
|
||||
|
||||
@@ -29,7 +29,7 @@ pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result<Objec
|
||||
);
|
||||
|
||||
let mut builder = OSSBuilder::default();
|
||||
builder
|
||||
let _ = builder
|
||||
.root(&root)
|
||||
.bucket(&oss_config.bucket)
|
||||
.endpoint(&oss_config.endpoint)
|
||||
|
||||
@@ -30,17 +30,17 @@ pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectSt
|
||||
);
|
||||
|
||||
let mut builder = S3Builder::default();
|
||||
builder
|
||||
let _ = builder
|
||||
.root(&root)
|
||||
.bucket(&s3_config.bucket)
|
||||
.access_key_id(s3_config.access_key_id.expose_secret())
|
||||
.secret_access_key(s3_config.secret_access_key.expose_secret());
|
||||
|
||||
if s3_config.endpoint.is_some() {
|
||||
builder.endpoint(s3_config.endpoint.as_ref().unwrap());
|
||||
let _ = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
|
||||
}
|
||||
if s3_config.region.is_some() {
|
||||
builder.region(s3_config.region.as_ref().unwrap());
|
||||
let _ = builder.region(s3_config.region.as_ref().unwrap());
|
||||
}
|
||||
|
||||
Ok(ObjectStore::new(builder)
|
||||
|
||||
@@ -71,7 +71,7 @@ async fn test_close_region_handler() {
|
||||
),
|
||||
)]));
|
||||
|
||||
prepare_table(instance.inner()).await;
|
||||
let _ = prepare_table(instance.inner()).await;
|
||||
|
||||
// Closes demo table
|
||||
handle_instruction(
|
||||
|
||||
@@ -136,6 +136,6 @@ pub(crate) async fn create_test_table(
|
||||
table_id: table.table_info().ident.table_id,
|
||||
table: table.clone(),
|
||||
};
|
||||
instance.catalog_manager.register_table(req).await.unwrap();
|
||||
assert!(instance.catalog_manager.register_table(req).await.is_ok());
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
@@ -202,7 +202,7 @@ impl SchemaBuilder {
|
||||
///
|
||||
/// Old metadata with same key would be overwritten.
|
||||
pub fn add_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
|
||||
self.metadata.insert(key.into(), value.into());
|
||||
let _ = self.metadata.insert(key.into(), value.into());
|
||||
self
|
||||
}
|
||||
|
||||
@@ -211,7 +211,8 @@ impl SchemaBuilder {
|
||||
validate_timestamp_index(&self.column_schemas, timestamp_index)?;
|
||||
}
|
||||
|
||||
self.metadata
|
||||
let _ = self
|
||||
.metadata
|
||||
.insert(VERSION_KEY.to_string(), self.version.to_string());
|
||||
|
||||
let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
|
||||
@@ -242,7 +243,7 @@ fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
|
||||
}
|
||||
let field = Field::try_from(column_schema)?;
|
||||
fields.push(field);
|
||||
name_to_index.insert(column_schema.name.clone(), index);
|
||||
let _ = name_to_index.insert(column_schema.name.clone(), index);
|
||||
}
|
||||
|
||||
Ok(FieldsAndIndices {
|
||||
@@ -287,7 +288,7 @@ impl TryFrom<Arc<ArrowSchema>> for Schema {
|
||||
let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
|
||||
for field in &arrow_schema.fields {
|
||||
let column_schema = ColumnSchema::try_from(field.as_ref())?;
|
||||
name_to_index.insert(field.name().to_string(), column_schemas.len());
|
||||
let _ = name_to_index.insert(field.name().to_string(), column_schemas.len());
|
||||
column_schemas.push(column_schema);
|
||||
}
|
||||
|
||||
|
||||
@@ -87,10 +87,11 @@ impl ColumnSchema {
|
||||
pub fn with_time_index(mut self, is_time_index: bool) -> Self {
|
||||
self.is_time_index = is_time_index;
|
||||
if is_time_index {
|
||||
self.metadata
|
||||
let _ = self
|
||||
.metadata
|
||||
.insert(TIME_INDEX_KEY.to_string(), "true".to_string());
|
||||
} else {
|
||||
self.metadata.remove(TIME_INDEX_KEY);
|
||||
let _ = self.metadata.remove(TIME_INDEX_KEY);
|
||||
}
|
||||
self
|
||||
}
|
||||
@@ -266,8 +267,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_column_schema_with_metadata() {
|
||||
let mut metadata = Metadata::new();
|
||||
metadata.insert("k1".to_string(), "v1".to_string());
|
||||
let metadata = Metadata::from([("k1".to_string(), "v1".to_string())]);
|
||||
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
|
||||
.with_metadata(metadata)
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
|
||||
@@ -288,20 +288,21 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_column_schema_with_duplicate_metadata() {
|
||||
let mut metadata = Metadata::new();
|
||||
metadata.insert(DEFAULT_CONSTRAINT_KEY.to_string(), "v1".to_string());
|
||||
let metadata = Metadata::from([(DEFAULT_CONSTRAINT_KEY.to_string(), "v1".to_string())]);
|
||||
let column_schema = ColumnSchema::new("test", ConcreteDataType::int32_datatype(), true)
|
||||
.with_metadata(metadata)
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
|
||||
.unwrap();
|
||||
Field::try_from(&column_schema).unwrap_err();
|
||||
assert!(Field::try_from(&column_schema).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_column_schema_invalid_default_constraint() {
|
||||
ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false)
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
|
||||
.unwrap_err();
|
||||
assert!(
|
||||
ColumnSchema::new("test", ConcreteDataType::int32_datatype(), false)
|
||||
.with_default_constraint(Some(ColumnDefaultConstraint::null_value()))
|
||||
.is_err()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -196,7 +196,7 @@ mod tests {
|
||||
fn test_validate_null_constraint() {
|
||||
let constraint = ColumnDefaultConstraint::null_value();
|
||||
let data_type = ConcreteDataType::int32_datatype();
|
||||
constraint.validate(&data_type, false).unwrap_err();
|
||||
assert!(constraint.validate(&data_type, false).is_err());
|
||||
constraint.validate(&data_type, true).unwrap();
|
||||
}
|
||||
|
||||
@@ -207,9 +207,9 @@ mod tests {
|
||||
constraint.validate(&data_type, false).unwrap();
|
||||
constraint.validate(&data_type, true).unwrap();
|
||||
|
||||
constraint
|
||||
assert!(constraint
|
||||
.validate(&ConcreteDataType::uint32_datatype(), true)
|
||||
.unwrap_err();
|
||||
.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -218,23 +218,23 @@ mod tests {
|
||||
constraint
|
||||
.validate(&ConcreteDataType::timestamp_millisecond_datatype(), false)
|
||||
.unwrap();
|
||||
constraint
|
||||
assert!(constraint
|
||||
.validate(&ConcreteDataType::boolean_datatype(), false)
|
||||
.unwrap_err();
|
||||
.is_err());
|
||||
|
||||
let constraint = ColumnDefaultConstraint::Function("hello()".to_string());
|
||||
constraint
|
||||
assert!(constraint
|
||||
.validate(&ConcreteDataType::timestamp_millisecond_datatype(), false)
|
||||
.unwrap_err();
|
||||
.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_default_vector_by_null() {
|
||||
let constraint = ColumnDefaultConstraint::null_value();
|
||||
let data_type = ConcreteDataType::int32_datatype();
|
||||
constraint
|
||||
assert!(constraint
|
||||
.create_default_vector(&data_type, false, 10)
|
||||
.unwrap_err();
|
||||
.is_err());
|
||||
|
||||
let constraint = ColumnDefaultConstraint::null_value();
|
||||
let v = constraint
|
||||
@@ -286,9 +286,9 @@ mod tests {
|
||||
|
||||
let constraint = ColumnDefaultConstraint::Function("no".to_string());
|
||||
let data_type = ConcreteDataType::timestamp_millisecond_datatype();
|
||||
constraint
|
||||
assert!(constraint
|
||||
.create_default_vector(&data_type, false, 4)
|
||||
.unwrap_err();
|
||||
.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1006,10 +1006,10 @@ mod tests {
|
||||
);
|
||||
|
||||
let result: Result<Value> = ScalarValue::Decimal128(Some(1), 0, 0).try_into();
|
||||
result
|
||||
assert!(result
|
||||
.unwrap_err()
|
||||
.to_string()
|
||||
.contains("Unsupported arrow data type, type: Decimal(0, 0)");
|
||||
.contains("Unsupported arrow data type, type: Decimal128(0, 0)"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -176,7 +176,7 @@ impl MutableVector for NullVectorBuilder {
|
||||
}
|
||||
|
||||
fn extend_slice_of(&mut self, vector: &dyn Vector, offset: usize, length: usize) -> Result<()> {
|
||||
vector
|
||||
let _ = vector
|
||||
.as_any()
|
||||
.downcast_ref::<NullVector>()
|
||||
.with_context(|| error::CastTypeSnafu {
|
||||
|
||||
@@ -148,7 +148,7 @@ mod tests {
|
||||
fn test_take_out_of_index() {
|
||||
let v = Int32Vector::from_slice([1, 2, 3, 4, 5]);
|
||||
let indies = UInt32Vector::from_slice([1, 5, 6]);
|
||||
v.take(&indies).unwrap();
|
||||
let _ = v.take(&indies);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -275,7 +275,7 @@ impl EngineInner {
|
||||
table_id
|
||||
);
|
||||
|
||||
self.tables.write().unwrap().insert(table_id, table.clone());
|
||||
let _ = self.tables.write().unwrap().insert(table_id, table.clone());
|
||||
|
||||
Ok(table)
|
||||
}
|
||||
@@ -339,7 +339,7 @@ impl EngineInner {
|
||||
.context(table_error::TableOperationSnafu)?,
|
||||
);
|
||||
|
||||
self.tables.write().unwrap().insert(table_id, table.clone());
|
||||
let _ = self.tables.write().unwrap().insert(table_id, table.clone());
|
||||
Some(table as _)
|
||||
};
|
||||
|
||||
@@ -375,7 +375,7 @@ impl EngineInner {
|
||||
.context(DropTableSnafu {
|
||||
table_name: &table_full_name,
|
||||
})?;
|
||||
self.tables.write().unwrap().remove(&req.table_id);
|
||||
let _ = self.tables.write().unwrap().remove(&req.table_id);
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
@@ -388,7 +388,7 @@ impl EngineInner {
|
||||
|
||||
let tables = self.tables.read().unwrap().clone();
|
||||
|
||||
futures::future::try_join_all(tables.values().map(|t| t.close(&[])))
|
||||
let _ = futures::future::try_join_all(tables.values().map(|t| t.close(&[])))
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
@@ -427,7 +427,7 @@ impl EngineInner {
|
||||
.context(table_error::TableOperationSnafu)?;
|
||||
}
|
||||
|
||||
self.tables.write().unwrap().remove(&table_id);
|
||||
let _ = self.tables.write().unwrap().remove(&table_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -118,7 +118,8 @@ impl CreateImmutableFileTable {
|
||||
return Ok(Status::Done);
|
||||
}
|
||||
|
||||
self.engine
|
||||
let _ = self
|
||||
.engine
|
||||
.create_table(&engine_ctx, self.data.request.clone())
|
||||
.await
|
||||
.map_err(Error::from_error_ext)?;
|
||||
|
||||
@@ -42,7 +42,8 @@ impl Procedure for DropImmutableFileTable {
|
||||
let engine_ctx = EngineContext::default();
|
||||
// Currently, `drop_table()` of ImmutableFileTableEngine is idempotent so we just
|
||||
// invoke it.
|
||||
self.engine
|
||||
let _ = self
|
||||
.engine
|
||||
.drop_table(&engine_ctx, self.data.request.clone())
|
||||
.await
|
||||
.map_err(Error::from_error_ext)?;
|
||||
|
||||
@@ -36,7 +36,7 @@ pub fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) {
|
||||
let dir = create_temp_dir(prefix);
|
||||
let store_dir = dir.path().to_string_lossy();
|
||||
let mut builder = Fs::default();
|
||||
builder.root(&store_dir);
|
||||
let _ = builder.root(&store_dir);
|
||||
(dir, ObjectStore::new(builder).unwrap().finish())
|
||||
}
|
||||
|
||||
@@ -97,15 +97,15 @@ pub struct TestEngineComponents {
|
||||
|
||||
pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest {
|
||||
let mut table_options = TableOptions::default();
|
||||
table_options.extra_options.insert(
|
||||
let _ = table_options.extra_options.insert(
|
||||
requests::IMMUTABLE_TABLE_LOCATION_KEY.to_string(),
|
||||
"mock_path".to_string(),
|
||||
);
|
||||
table_options.extra_options.insert(
|
||||
let _ = table_options.extra_options.insert(
|
||||
requests::IMMUTABLE_TABLE_META_KEY.to_string(),
|
||||
serde_json::to_string(&ImmutableFileTableOptions::default()).unwrap(),
|
||||
);
|
||||
table_options.extra_options.insert(
|
||||
let _ = table_options.extra_options.insert(
|
||||
requests::IMMUTABLE_TABLE_FORMAT_KEY.to_string(),
|
||||
"csv".to_string(),
|
||||
);
|
||||
|
||||
@@ -257,7 +257,7 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
let Kv(k, _) = r?;
|
||||
let catalog_key = String::from_utf8_lossy(&k);
|
||||
if let Ok(key) = CatalogKey::parse(catalog_key.as_ref()) {
|
||||
res.insert(key.catalog_name);
|
||||
let _ = res.insert(key.catalog_name);
|
||||
} else {
|
||||
warn!("invalid catalog key: {:?}", catalog_key);
|
||||
}
|
||||
@@ -273,7 +273,7 @@ impl CatalogManager for FrontendCatalogManager {
|
||||
let Kv(k, _) = r?;
|
||||
let key =
|
||||
SchemaKey::parse(String::from_utf8_lossy(&k)).context(InvalidCatalogValueSnafu)?;
|
||||
res.insert(key.schema_name);
|
||||
let _ = res.insert(key.schema_name);
|
||||
}
|
||||
Ok(res.into_iter().collect())
|
||||
}
|
||||
|
||||
@@ -99,7 +99,7 @@ pub(crate) async fn create_external_expr(
|
||||
.context(error::PrepareImmutableTableSnafu)?;
|
||||
|
||||
let meta = ImmutableFileTableOptions { files };
|
||||
options.insert(
|
||||
let _ = options.insert(
|
||||
IMMUTABLE_TABLE_META_KEY.to_string(),
|
||||
serde_json::to_string(&meta).context(error::EncodeJsonSnafu)?,
|
||||
);
|
||||
|
||||
@@ -78,7 +78,7 @@ impl HeartbeatTask {
|
||||
let capture_self = self.clone();
|
||||
let retry_interval = self.retry_interval;
|
||||
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
loop {
|
||||
match resp_stream.message().await {
|
||||
Ok(Some(resp)) => {
|
||||
@@ -109,7 +109,7 @@ impl HeartbeatTask {
|
||||
) {
|
||||
let report_interval = self.report_interval;
|
||||
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
let sleep = tokio::time::sleep(Duration::from_millis(0));
|
||||
tokio::pin!(sleep);
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
|
||||
..
|
||||
} = table_ident;
|
||||
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
self_ref.invalidate_table(&catalog, &schema, &table).await;
|
||||
|
||||
if let Err(e) = mailbox
|
||||
|
||||
@@ -39,7 +39,7 @@ pub struct MockKvCacheInvalidator {
|
||||
#[async_trait::async_trait]
|
||||
impl KvCacheInvalidator for MockKvCacheInvalidator {
|
||||
async fn invalidate_key(&self, key: &[u8]) {
|
||||
self.inner.lock().unwrap().remove(key);
|
||||
let _ = self.inner.lock().unwrap().remove(key);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,7 +50,7 @@ pub struct MockTableRouteCacheInvalidator {
|
||||
#[async_trait::async_trait]
|
||||
impl TableRouteCacheInvalidator for MockTableRouteCacheInvalidator {
|
||||
async fn invalidate_table_route(&self, table: &TableName) {
|
||||
self.inner.lock().unwrap().remove(&table.to_string());
|
||||
let _ = self.inner.lock().unwrap().remove(&table.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -324,7 +324,8 @@ impl Instance {
|
||||
"Table {}.{}.{} does not exist, try create table",
|
||||
catalog_name, schema_name, table_name,
|
||||
);
|
||||
self.create_table_by_columns(ctx, table_name, columns, MITO_ENGINE)
|
||||
let _ = self
|
||||
.create_table_by_columns(ctx, table_name, columns, MITO_ENGINE)
|
||||
.await?;
|
||||
info!(
|
||||
"Successfully created table on insertion: {}.{}.{}",
|
||||
@@ -343,7 +344,8 @@ impl Instance {
|
||||
"Find new columns {:?} on insertion, try to alter table: {}.{}.{}",
|
||||
add_columns, catalog_name, schema_name, table_name
|
||||
);
|
||||
self.add_new_columns_to_table(ctx, table_name, add_columns)
|
||||
let _ = self
|
||||
.add_new_columns_to_table(ctx, table_name, add_columns)
|
||||
.await?;
|
||||
info!(
|
||||
"Successfully altered table on insertion: {}.{}.{}",
|
||||
@@ -810,10 +812,10 @@ mod tests {
|
||||
}
|
||||
|
||||
fn do_fmt(template: &str, catalog: &str, schema: &str) -> String {
|
||||
let mut vars = HashMap::new();
|
||||
vars.insert("catalog".to_string(), catalog);
|
||||
vars.insert("schema".to_string(), schema);
|
||||
|
||||
let vars = HashMap::from([
|
||||
("catalog".to_string(), catalog),
|
||||
("schema".to_string(), schema),
|
||||
]);
|
||||
template.format(&vars).unwrap()
|
||||
}
|
||||
|
||||
|
||||
@@ -213,7 +213,7 @@ impl DistInstance {
|
||||
);
|
||||
|
||||
let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE_IN_DATANODE);
|
||||
client
|
||||
let _ = client
|
||||
.create(create_expr_for_region)
|
||||
.await
|
||||
.context(RequestDatanodeSnafu)?;
|
||||
@@ -277,7 +277,7 @@ impl DistInstance {
|
||||
|
||||
let client = self.datanode_clients.get_client(&datanode).await;
|
||||
let client = Database::new(&expr.catalog_name, &expr.schema_name, client);
|
||||
client
|
||||
let _ = client
|
||||
.drop_table(expr.clone())
|
||||
.await
|
||||
.context(RequestDatanodeSnafu)?;
|
||||
@@ -349,7 +349,7 @@ impl DistInstance {
|
||||
|
||||
let client = self.datanode_clients.get_client(&datanode).await;
|
||||
let client = Database::new(&expr.catalog_name, &expr.schema_name, client);
|
||||
client
|
||||
let _ = client
|
||||
.flush_table(expr.clone())
|
||||
.await
|
||||
.context(RequestDatanodeSnafu)?;
|
||||
@@ -378,7 +378,7 @@ impl DistInstance {
|
||||
}
|
||||
Statement::CreateExternalTable(stmt) => {
|
||||
let create_expr = &mut expr_factory::create_external_expr(stmt, query_ctx).await?;
|
||||
self.create_table(create_expr, None).await?;
|
||||
let _ = self.create_table(create_expr, None).await?;
|
||||
Ok(Output::AffectedRows(0))
|
||||
}
|
||||
Statement::Alter(alter_table) => {
|
||||
@@ -545,7 +545,7 @@ impl DistInstance {
|
||||
|
||||
let mut context = AlterContext::with_capacity(1);
|
||||
|
||||
context.insert(expr);
|
||||
let _ = context.insert(expr);
|
||||
|
||||
table.alter(context, &request).await.context(TableSnafu)?;
|
||||
|
||||
@@ -730,7 +730,7 @@ fn create_table_info(create_table: &CreateTableExpr) -> Result<RawTableInfo> {
|
||||
let schema = schema.with_time_index(column.name == create_table.time_index);
|
||||
|
||||
column_schemas.push(schema);
|
||||
column_name_to_index_map.insert(column.name.clone(), idx);
|
||||
let _ = column_name_to_index_map.insert(column.name.clone(), idx);
|
||||
}
|
||||
|
||||
let timestamp_index = column_name_to_index_map
|
||||
|
||||
@@ -240,11 +240,12 @@ mod tests {
|
||||
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true),
|
||||
]));
|
||||
|
||||
let mut builder = TableMetaBuilder::default();
|
||||
builder.schema(schema);
|
||||
builder.primary_key_indices(vec![]);
|
||||
builder.next_column_id(1);
|
||||
let table_meta = builder.build().unwrap();
|
||||
let table_meta = TableMetaBuilder::default()
|
||||
.schema(schema)
|
||||
.primary_key_indices(vec![])
|
||||
.next_column_id(1)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let table_info = TableInfoBuilder::new(table_name, table_meta)
|
||||
.build()
|
||||
|
||||
@@ -29,7 +29,8 @@ impl InfluxdbLineProtocolHandler for Instance {
|
||||
ctx: QueryContextRef,
|
||||
) -> servers::error::Result<()> {
|
||||
let requests = request.try_into()?;
|
||||
self.handle_inserts(requests, ctx)
|
||||
let _ = self
|
||||
.handle_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(servers::error::ExecuteGrpcQuerySnafu)?;
|
||||
|
||||
@@ -29,7 +29,8 @@ impl OpentsdbProtocolHandler for Instance {
|
||||
let requests = InsertRequests {
|
||||
inserts: vec![data_point.as_grpc_insert()],
|
||||
};
|
||||
self.handle_inserts(requests, ctx)
|
||||
let _ = self
|
||||
.handle_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.with_context(|_| server_error::ExecuteQuerySnafu {
|
||||
|
||||
@@ -147,7 +147,8 @@ impl Instance {
|
||||
impl PrometheusProtocolHandler for Instance {
|
||||
async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> ServerResult<()> {
|
||||
let (requests, samples) = prometheus::to_grpc_insert_requests(request)?;
|
||||
self.handle_inserts(requests, ctx)
|
||||
let _ = self
|
||||
.handle_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
|
||||
@@ -153,36 +153,36 @@ impl Services {
|
||||
let http_addr = parse_addr(&http_options.addr)?;
|
||||
|
||||
let mut http_server_builder = HttpServerBuilder::new(http_options.clone());
|
||||
http_server_builder
|
||||
let _ = http_server_builder
|
||||
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.clone()))
|
||||
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone()));
|
||||
|
||||
if let Some(user_provider) = user_provider.clone() {
|
||||
http_server_builder.with_user_provider(user_provider);
|
||||
let _ = http_server_builder.with_user_provider(user_provider);
|
||||
}
|
||||
|
||||
if set_opentsdb_handler {
|
||||
http_server_builder.with_opentsdb_handler(instance.clone());
|
||||
let _ = http_server_builder.with_opentsdb_handler(instance.clone());
|
||||
}
|
||||
if matches!(
|
||||
opts.influxdb_options,
|
||||
Some(InfluxdbOptions { enable: true })
|
||||
) {
|
||||
http_server_builder.with_influxdb_handler(instance.clone());
|
||||
let _ = http_server_builder.with_influxdb_handler(instance.clone());
|
||||
}
|
||||
|
||||
if matches!(
|
||||
opts.prometheus_options,
|
||||
Some(PrometheusOptions { enable: true })
|
||||
) {
|
||||
http_server_builder.with_prom_handler(instance.clone());
|
||||
let _ = http_server_builder.with_prom_handler(instance.clone());
|
||||
}
|
||||
http_server_builder.with_metrics_handler(MetricsHandler);
|
||||
http_server_builder.with_script_handler(instance.clone());
|
||||
|
||||
http_server_builder.with_configurator(plugins.get::<ConfiguratorRef>());
|
||||
|
||||
let http_server = http_server_builder.build();
|
||||
let http_server = http_server_builder
|
||||
.with_metrics_handler(MetricsHandler)
|
||||
.with_script_handler(instance.clone())
|
||||
.with_configurator(plugins.get::<ConfiguratorRef>())
|
||||
.build();
|
||||
result.push((Box::new(http_server), http_addr));
|
||||
}
|
||||
|
||||
|
||||
@@ -980,7 +980,8 @@ pub(crate) mod test {
|
||||
|
||||
impl Collect for MockCollector {
|
||||
fn on_write(&self, record: WriteRecord) {
|
||||
self.write_sum
|
||||
let _ = self
|
||||
.write_sum
|
||||
.fetch_add(record.byte_count, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
|
||||
@@ -85,12 +85,13 @@ mod tests {
|
||||
ColumnSchema::new("id", ConcreteDataType::int32_datatype(), false),
|
||||
]);
|
||||
|
||||
let mut builder = TableMetaBuilder::default();
|
||||
builder.schema(Arc::new(schema));
|
||||
builder.primary_key_indices(vec![]);
|
||||
builder.next_column_id(2);
|
||||
let table_meta = TableMetaBuilder::default()
|
||||
.schema(Arc::new(schema))
|
||||
.primary_key_indices(vec![])
|
||||
.next_column_id(2)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let table_meta = builder.build().unwrap();
|
||||
let table_name = TableName {
|
||||
catalog_name: "greptime".to_string(),
|
||||
schema_name: "public".to_string(),
|
||||
|
||||
@@ -139,11 +139,12 @@ mod tests {
|
||||
ColumnSchema::new("v", ConcreteDataType::string_datatype(), true),
|
||||
]));
|
||||
|
||||
let mut builder = TableMetaBuilder::default();
|
||||
builder.schema(schema);
|
||||
builder.primary_key_indices(vec![1]);
|
||||
builder.next_column_id(3);
|
||||
let table_meta = builder.build().unwrap();
|
||||
let table_meta = TableMetaBuilder::default()
|
||||
.schema(schema)
|
||||
.primary_key_indices(vec![1])
|
||||
.next_column_id(3)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let column = vector_to_grpc_column(
|
||||
&table_meta,
|
||||
@@ -198,12 +199,13 @@ mod tests {
|
||||
ColumnSchema::new("host", ConcreteDataType::string_datatype(), false),
|
||||
]);
|
||||
|
||||
let mut builder = TableMetaBuilder::default();
|
||||
builder.schema(Arc::new(schema));
|
||||
builder.primary_key_indices(vec![]);
|
||||
builder.next_column_id(3);
|
||||
let table_meta = TableMetaBuilder::default()
|
||||
.schema(Arc::new(schema))
|
||||
.primary_key_indices(vec![])
|
||||
.next_column_id(3)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let table_meta = builder.build().unwrap();
|
||||
let insert_request = mock_insert_request();
|
||||
let request = to_grpc_insert_request(&table_meta, 12, insert_request).unwrap();
|
||||
|
||||
@@ -211,19 +213,19 @@ mod tests {
|
||||
}
|
||||
|
||||
fn mock_insert_request() -> InsertRequest {
|
||||
let mut columns_values = HashMap::with_capacity(4);
|
||||
|
||||
let mut builder = StringVectorBuilder::with_capacity(3);
|
||||
builder.push(Some("host1"));
|
||||
builder.push(None);
|
||||
builder.push(Some("host3"));
|
||||
columns_values.insert("host".to_string(), builder.to_vector());
|
||||
let host = builder.to_vector();
|
||||
|
||||
let mut builder = Int16VectorBuilder::with_capacity(3);
|
||||
builder.push(Some(1_i16));
|
||||
builder.push(Some(2_i16));
|
||||
builder.push(Some(3_i16));
|
||||
columns_values.insert("id".to_string(), builder.to_vector());
|
||||
let id = builder.to_vector();
|
||||
|
||||
let columns_values = HashMap::from([("host".to_string(), host), ("id".to_string(), id)]);
|
||||
|
||||
InsertRequest {
|
||||
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
|
||||
|
||||
@@ -130,11 +130,11 @@ mod tests {
|
||||
async fn test_noop_logstore() {
|
||||
let store = NoopLogStore::default();
|
||||
let e = store.entry("".as_bytes(), 1, NamespaceImpl::default());
|
||||
store.append(e.clone()).await.unwrap();
|
||||
store
|
||||
assert!(store.append(e.clone()).await.is_ok());
|
||||
assert!(store
|
||||
.append_batch(&NamespaceImpl::default(), vec![e])
|
||||
.await
|
||||
.unwrap();
|
||||
.is_ok());
|
||||
store
|
||||
.create_namespace(&NamespaceImpl::default())
|
||||
.await
|
||||
|
||||
@@ -157,7 +157,8 @@ impl LogStore for RaftEngineLogStore {
|
||||
);
|
||||
}
|
||||
|
||||
self.engine
|
||||
let _ = self
|
||||
.engine
|
||||
.write(&mut batch, self.config.sync_write)
|
||||
.context(RaftEngineSnafu)?;
|
||||
Ok(AppendResponse { entry_id })
|
||||
@@ -203,7 +204,8 @@ impl LogStore for RaftEngineLogStore {
|
||||
);
|
||||
}
|
||||
|
||||
self.engine
|
||||
let _ = self
|
||||
.engine
|
||||
.write(&mut batch, self.config.sync_write)
|
||||
.context(RaftEngineSnafu)?;
|
||||
Ok(entry_ids)
|
||||
@@ -231,7 +233,7 @@ impl LogStore for RaftEngineLogStore {
|
||||
let max_batch_size = self.config.read_batch_size;
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(max_batch_size);
|
||||
let ns = ns.clone();
|
||||
common_runtime::spawn_read(async move {
|
||||
let _handle = common_runtime::spawn_read(async move {
|
||||
while start_index <= last_index {
|
||||
let mut vec = Vec::with_capacity(max_batch_size);
|
||||
match engine
|
||||
@@ -284,7 +286,8 @@ impl LogStore for RaftEngineLogStore {
|
||||
batch
|
||||
.put_message::<Namespace>(SYSTEM_NAMESPACE, key, ns)
|
||||
.context(RaftEngineSnafu)?;
|
||||
self.engine
|
||||
let _ = self
|
||||
.engine
|
||||
.write(&mut batch, true)
|
||||
.context(RaftEngineSnafu)?;
|
||||
Ok(())
|
||||
@@ -299,7 +302,8 @@ impl LogStore for RaftEngineLogStore {
|
||||
let key = format!("{}{}", NAMESPACE_PREFIX, ns.id).as_bytes().to_vec();
|
||||
let mut batch = LogBatch::with_capacity(1);
|
||||
batch.delete(SYSTEM_NAMESPACE, key);
|
||||
self.engine
|
||||
let _ = self
|
||||
.engine
|
||||
.write(&mut batch, true)
|
||||
.context(RaftEngineSnafu)?;
|
||||
Ok(())
|
||||
@@ -471,10 +475,10 @@ mod tests {
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
logstore
|
||||
assert!(logstore
|
||||
.append(Entry::create(1, 1, "1".as_bytes().to_vec()))
|
||||
.await
|
||||
.unwrap();
|
||||
.is_ok());
|
||||
let entries = logstore
|
||||
.read(&Namespace::with_id(1), 1)
|
||||
.await
|
||||
@@ -533,7 +537,7 @@ mod tests {
|
||||
let namespace = Namespace::with_id(42);
|
||||
for id in 0..4096 {
|
||||
let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec());
|
||||
logstore.append(entry).await.unwrap();
|
||||
assert!(logstore.append(entry).await.is_ok());
|
||||
}
|
||||
|
||||
let before_purge = wal_dir_usage(dir.path().to_str().unwrap()).await;
|
||||
@@ -565,7 +569,7 @@ mod tests {
|
||||
let namespace = Namespace::with_id(42);
|
||||
for id in 0..1024 {
|
||||
let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec());
|
||||
logstore.append(entry).await.unwrap();
|
||||
assert!(logstore.append(entry).await.is_ok());
|
||||
}
|
||||
|
||||
logstore.obsolete(namespace.clone(), 100).await.unwrap();
|
||||
|
||||
@@ -58,7 +58,7 @@ async fn run() {
|
||||
let (sender, mut receiver) = meta_client.heartbeat().await.unwrap();
|
||||
|
||||
// send heartbeats
|
||||
tokio::spawn(async move {
|
||||
let _handle = tokio::spawn(async move {
|
||||
for _ in 0..5 {
|
||||
let req = HeartbeatRequest {
|
||||
peer: Some(Peer {
|
||||
@@ -72,7 +72,7 @@ async fn run() {
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
let _handle = tokio::spawn(async move {
|
||||
while let Some(res) = receiver.message().await.unwrap() {
|
||||
event!(Level::TRACE, "heartbeat response: {:#?}", res);
|
||||
}
|
||||
|
||||
@@ -330,7 +330,7 @@ impl MetaClient {
|
||||
}
|
||||
|
||||
pub async fn unlock(&self, req: UnlockRequest) -> Result<()> {
|
||||
self.lock_client()?.unlock(req.into()).await?;
|
||||
let _ = self.lock_client()?.unlock(req.into()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -577,7 +577,7 @@ mod tests {
|
||||
let tc = new_client("test_heartbeat").await;
|
||||
let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
|
||||
// send heartbeats
|
||||
tokio::spawn(async move {
|
||||
let _handle = tokio::spawn(async move {
|
||||
for _ in 0..5 {
|
||||
let req = HeartbeatRequest {
|
||||
peer: Some(Peer {
|
||||
@@ -590,7 +590,7 @@ mod tests {
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
let _handle = tokio::spawn(async move {
|
||||
while let Some(res) = receiver.message().await.unwrap() {
|
||||
assert_eq!(1000, res.header.unwrap().cluster_id);
|
||||
}
|
||||
|
||||
@@ -292,7 +292,7 @@ mod test {
|
||||
async fn test_heartbeat_stream() {
|
||||
let (sender, mut receiver) = mpsc::channel::<HeartbeatRequest>(100);
|
||||
let sender = HeartbeatSender::new((8, 8), Role::Datanode, sender);
|
||||
tokio::spawn(async move {
|
||||
let _handle = tokio::spawn(async move {
|
||||
for _ in 0..10 {
|
||||
sender.send(HeartbeatRequest::default()).await.unwrap();
|
||||
}
|
||||
|
||||
@@ -132,7 +132,7 @@ pub async fn bootstrap_meta_srv_with_router(
|
||||
|
||||
router
|
||||
.serve_with_incoming_shutdown(listener, async {
|
||||
signal.recv().await;
|
||||
let _ = signal.recv().await;
|
||||
})
|
||||
.await
|
||||
.context(error::StartGrpcSnafu)?;
|
||||
|
||||
@@ -201,7 +201,7 @@ impl MetaPeerClient {
|
||||
fn to_stat_kv_map(kvs: Vec<KeyValue>) -> Result<HashMap<StatKey, StatValue>> {
|
||||
let mut map = HashMap::with_capacity(kvs.len());
|
||||
for kv in kvs {
|
||||
map.insert(kv.key.try_into()?, kv.value.try_into()?);
|
||||
let _ = map.insert(kv.key.try_into()?, kv.value.try_into()?);
|
||||
}
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ impl EtcdElection {
|
||||
|
||||
let leader_ident = leader_value.clone();
|
||||
let (tx, mut rx) = broadcast::channel(100);
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(msg) => match msg {
|
||||
@@ -142,7 +142,7 @@ impl Election for EtcdElection {
|
||||
let mut keep_alive_interval =
|
||||
tokio::time::interval(Duration::from_secs(KEEP_ALIVE_PERIOD_SECS));
|
||||
loop {
|
||||
keep_alive_interval.tick().await;
|
||||
let _ = keep_alive_interval.tick().await;
|
||||
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
|
||||
|
||||
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
|
||||
|
||||
@@ -286,7 +286,7 @@ impl HeartbeatMailbox {
|
||||
let mailbox = Arc::new(Self::new(pushers, sequence));
|
||||
|
||||
let timeout_checker = mailbox.clone();
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
timeout_checker.check_timeout_bg(10).await;
|
||||
});
|
||||
|
||||
@@ -307,7 +307,7 @@ impl HeartbeatMailbox {
|
||||
let mut interval = tokio::time::interval(Duration::from_millis(interval_millis));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let _ = interval.tick().await;
|
||||
|
||||
if self.timeouts.is_empty() {
|
||||
self.timeout_notify.notified().await;
|
||||
@@ -363,10 +363,10 @@ impl Mailbox for HeartbeatMailbox {
|
||||
debug!("Sending mailbox message {msg:?} to {pusher_id}");
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.senders.insert(message_id, tx);
|
||||
let _ = self.senders.insert(message_id, tx);
|
||||
let deadline =
|
||||
Duration::from_millis(common_time::util::current_time_millis() as u64) + timeout;
|
||||
self.timeouts.insert(message_id, deadline);
|
||||
let _ = self.timeouts.insert(message_id, deadline);
|
||||
self.timeout_notify.notify_one();
|
||||
|
||||
self.pushers.push(&pusher_id, msg).await?;
|
||||
@@ -381,7 +381,7 @@ impl Mailbox for HeartbeatMailbox {
|
||||
async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()> {
|
||||
debug!("Received mailbox message {maybe_msg:?}");
|
||||
|
||||
self.timeouts.remove(&id);
|
||||
let _ = self.timeouts.remove(&id);
|
||||
|
||||
if let Some((_, tx)) = self.senders.remove(&id) {
|
||||
tx.send(maybe_msg)
|
||||
|
||||
@@ -296,7 +296,7 @@ mod tests {
|
||||
datanode_id: 2,
|
||||
region_number: 1,
|
||||
};
|
||||
container.get_failure_detector(ident.clone());
|
||||
let _ = container.get_failure_detector(ident.clone());
|
||||
|
||||
let region_failover_manager = create_region_failover_manager();
|
||||
let mut runner = FailureDetectRunner::new(None, region_failover_manager);
|
||||
|
||||
@@ -30,7 +30,7 @@ pub struct KeepLeaseHandler {
|
||||
impl KeepLeaseHandler {
|
||||
pub fn new(kv_store: KvStoreRef) -> Self {
|
||||
let (tx, mut rx) = mpsc::channel(1024);
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
while let Some(kv) = rx.recv().await {
|
||||
let mut kvs = vec![kv];
|
||||
|
||||
|
||||
@@ -122,7 +122,7 @@ impl HeartbeatHandler for PersistStatsHandler {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
ctx.in_memory.put(put).await?;
|
||||
let _ = ctx.in_memory.put(put).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -174,7 +174,7 @@ mod test {
|
||||
table_id: 1,
|
||||
engine: "mito".to_string(),
|
||||
};
|
||||
region_failover_manager
|
||||
let _ = region_failover_manager
|
||||
.running_procedures()
|
||||
.write()
|
||||
.unwrap()
|
||||
|
||||
@@ -60,7 +60,7 @@ where
|
||||
if !predicate(&lease_key, &lease_value) {
|
||||
continue;
|
||||
}
|
||||
lease_kvs.insert(lease_key, lease_value);
|
||||
let _ = lease_kvs.insert(lease_key, lease_value);
|
||||
}
|
||||
|
||||
Ok(lease_kvs)
|
||||
|
||||
@@ -90,7 +90,7 @@ impl Drop for DistLockGuard<'_> {
|
||||
if let Some(key) = self.key.take() {
|
||||
let lock = self.lock.clone();
|
||||
let name = self.name.clone();
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
if let Err(e) = lock.unlock(key).await {
|
||||
error!(e; "Failed to unlock '{}'", String::from_utf8_lossy(&name));
|
||||
}
|
||||
|
||||
@@ -38,14 +38,14 @@ impl DistLock for MemLock {
|
||||
|
||||
let guard = mutex.lock_owned().await;
|
||||
|
||||
self.guards.insert(key.clone(), guard);
|
||||
let _ = self.guards.insert(key.clone(), guard);
|
||||
Ok(key)
|
||||
}
|
||||
|
||||
async fn unlock(&self, key: Vec<u8>) -> Result<()> {
|
||||
// drop the guard, so that the mutex can be unlocked,
|
||||
// effectively make the `mutex.lock_owned` in `lock` method to proceed
|
||||
self.guards.remove(&key);
|
||||
let _ = self.guards.remove(&key);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -85,10 +85,10 @@ mod tests {
|
||||
// every key counter will be added by 1 for 10 times
|
||||
for i in 0..100 {
|
||||
let key = &keys[i % keys.len()];
|
||||
lock_clone
|
||||
assert!(lock_clone
|
||||
.lock(key.clone(), Opts { expire_secs: None })
|
||||
.await
|
||||
.unwrap();
|
||||
.is_ok());
|
||||
|
||||
// Intentionally create a critical section:
|
||||
// if our MemLock is flawed, the resulting counter is wrong.
|
||||
@@ -105,7 +105,7 @@ mod tests {
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
futures::future::join_all(tasks).await;
|
||||
let _ = futures::future::join_all(tasks).await;
|
||||
|
||||
assert!(counters.values().all(|x| x.load(Ordering::Relaxed) == 1000));
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user