mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
chore: update rust to nightly 2025-10-01 (#7069)
* chore: update rust to nightly 2025-10-01 Signed-off-by: luofucong <luofc@foxmail.com> * chore: nix update --------- Signed-off-by: luofucong <luofc@foxmail.com> Co-authored-by: Ning Sun <sunning@greptime.com>
This commit is contained in:
18
flake.lock
generated
18
flake.lock
generated
@@ -8,11 +8,11 @@
|
|||||||
"rust-analyzer-src": "rust-analyzer-src"
|
"rust-analyzer-src": "rust-analyzer-src"
|
||||||
},
|
},
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1745735608,
|
"lastModified": 1760078406,
|
||||||
"narHash": "sha256-L0jzm815XBFfF2wCFmR+M1CF+beIEFj6SxlqVKF59Ec=",
|
"narHash": "sha256-JeJK0ZA845PtkCHkfo4KjeI1mYrsr2s3cxBYKhF4BoE=",
|
||||||
"owner": "nix-community",
|
"owner": "nix-community",
|
||||||
"repo": "fenix",
|
"repo": "fenix",
|
||||||
"rev": "c39a78eba6ed2a022cc3218db90d485077101496",
|
"rev": "351277c60d104944122ee389cdf581c5ce2c6732",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
@@ -41,11 +41,11 @@
|
|||||||
},
|
},
|
||||||
"nixpkgs": {
|
"nixpkgs": {
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1748162331,
|
"lastModified": 1759994382,
|
||||||
"narHash": "sha256-rqc2RKYTxP3tbjA+PB3VMRQNnjesrT0pEofXQTrMsS8=",
|
"narHash": "sha256-wSK+3UkalDZRVHGCRikZ//CyZUJWDJkBDTQX1+G77Ow=",
|
||||||
"owner": "NixOS",
|
"owner": "NixOS",
|
||||||
"repo": "nixpkgs",
|
"repo": "nixpkgs",
|
||||||
"rev": "7c43f080a7f28b2774f3b3f43234ca11661bf334",
|
"rev": "5da4a26309e796daa7ffca72df93dbe53b8164c7",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
@@ -65,11 +65,11 @@
|
|||||||
"rust-analyzer-src": {
|
"rust-analyzer-src": {
|
||||||
"flake": false,
|
"flake": false,
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1745694049,
|
"lastModified": 1760014945,
|
||||||
"narHash": "sha256-fxvRYH/tS7hGQeg9zCVh5RBcSWT+JGJet7RA8Ss+rC0=",
|
"narHash": "sha256-ySdl7F9+oeWNHVrg3QL/brazqmJvYFEdpGnF3pyoDH8=",
|
||||||
"owner": "rust-lang",
|
"owner": "rust-lang",
|
||||||
"repo": "rust-analyzer",
|
"repo": "rust-analyzer",
|
||||||
"rev": "d8887c0758bbd2d5f752d5bd405d4491e90e7ed6",
|
"rev": "90d2e1ce4dfe7dc49250a8b88a0f08ffdb9cb23f",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
|
|||||||
@@ -19,7 +19,7 @@
|
|||||||
lib = nixpkgs.lib;
|
lib = nixpkgs.lib;
|
||||||
rustToolchain = fenix.packages.${system}.fromToolchainName {
|
rustToolchain = fenix.packages.${system}.fromToolchainName {
|
||||||
name = (lib.importTOML ./rust-toolchain.toml).toolchain.channel;
|
name = (lib.importTOML ./rust-toolchain.toml).toolchain.channel;
|
||||||
sha256 = "sha256-tJJr8oqX3YD+ohhPK7jlt/7kvKBnBqJVjYtoFr520d4=";
|
sha256 = "sha256-GCGEXGZeJySLND0KU5TdtTrqFV76TF3UdvAHSUegSsk=";
|
||||||
};
|
};
|
||||||
in
|
in
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -1,2 +1,2 @@
|
|||||||
[toolchain]
|
[toolchain]
|
||||||
channel = "nightly-2025-05-19"
|
channel = "nightly-2025-10-01"
|
||||||
|
|||||||
@@ -191,7 +191,7 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
|
|||||||
datatype: f.datatype(),
|
datatype: f.datatype(),
|
||||||
datatype_ext: f.datatype_extension.clone(),
|
datatype_ext: f.datatype_extension.clone(),
|
||||||
};
|
};
|
||||||
StructField::new(f.name.to_string(), field_type.into(), true)
|
StructField::new(f.name.clone(), field_type.into(), true)
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
ConcreteDataType::struct_datatype(StructType::from(fields))
|
ConcreteDataType::struct_datatype(StructType::from(fields))
|
||||||
@@ -744,7 +744,7 @@ pub fn pb_value_to_value_ref<'a>(
|
|||||||
field.datatype(),
|
field.datatype(),
|
||||||
field.datatype_extension.clone(),
|
field.datatype_extension.clone(),
|
||||||
));
|
));
|
||||||
let field_name = field.name.to_string();
|
let field_name = field.name.clone();
|
||||||
StructField::new(field_name, field_type, true)
|
StructField::new(field_name, field_type, true)
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|||||||
@@ -12,8 +12,6 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
#![feature(let_chains)]
|
|
||||||
|
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod helper;
|
pub mod helper;
|
||||||
|
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ pub fn userinfo_by_name(username: Option<String>) -> UserInfoRef {
|
|||||||
DefaultUserInfo::with_name(username.unwrap_or_else(|| DEFAULT_USERNAME.to_string()))
|
DefaultUserInfo::with_name(username.unwrap_or_else(|| DEFAULT_USERNAME.to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn user_provider_from_option(opt: &String) -> Result<UserProviderRef> {
|
pub fn user_provider_from_option(opt: &str) -> Result<UserProviderRef> {
|
||||||
let (name, content) = opt.split_once(':').with_context(|| InvalidConfigSnafu {
|
let (name, content) = opt.split_once(':').with_context(|| InvalidConfigSnafu {
|
||||||
value: opt.to_string(),
|
value: opt.to_string(),
|
||||||
msg: "UserProviderOption must be in format `<option>:<value>`",
|
msg: "UserProviderOption must be in format `<option>:<value>`",
|
||||||
@@ -57,7 +57,7 @@ pub fn user_provider_from_option(opt: &String) -> Result<UserProviderRef> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn static_user_provider_from_option(opt: &String) -> Result<StaticUserProvider> {
|
pub fn static_user_provider_from_option(opt: &str) -> Result<StaticUserProvider> {
|
||||||
let (name, content) = opt.split_once(':').with_context(|| InvalidConfigSnafu {
|
let (name, content) = opt.split_once(':').with_context(|| InvalidConfigSnafu {
|
||||||
value: opt.to_string(),
|
value: opt.to_string(),
|
||||||
msg: "UserProviderOption must be in format `<option>:<value>`",
|
msg: "UserProviderOption must be in format `<option>:<value>`",
|
||||||
|
|||||||
@@ -14,7 +14,6 @@
|
|||||||
|
|
||||||
#![feature(assert_matches)]
|
#![feature(assert_matches)]
|
||||||
#![feature(try_blocks)]
|
#![feature(try_blocks)]
|
||||||
#![feature(let_chains)]
|
|
||||||
|
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
use std::fmt::{Debug, Formatter};
|
use std::fmt::{Debug, Formatter};
|
||||||
|
|||||||
@@ -392,15 +392,15 @@ impl MemoryCatalogManager {
|
|||||||
if !manager.schema_exist_sync(catalog, schema).unwrap() {
|
if !manager.schema_exist_sync(catalog, schema).unwrap() {
|
||||||
manager
|
manager
|
||||||
.register_schema_sync(RegisterSchemaRequest {
|
.register_schema_sync(RegisterSchemaRequest {
|
||||||
catalog: catalog.to_string(),
|
catalog: catalog.clone(),
|
||||||
schema: schema.to_string(),
|
schema: schema.clone(),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
let request = RegisterTableRequest {
|
let request = RegisterTableRequest {
|
||||||
catalog: catalog.to_string(),
|
catalog: catalog.clone(),
|
||||||
schema: schema.to_string(),
|
schema: schema.clone(),
|
||||||
table_name: table.table_info().name.clone(),
|
table_name: table.table_info().name.clone(),
|
||||||
table_id: table.table_info().ident.table_id,
|
table_id: table.table_info().ident.table_id,
|
||||||
table,
|
table,
|
||||||
|
|||||||
@@ -362,7 +362,7 @@ impl InformationSchemaProvider {
|
|||||||
}
|
}
|
||||||
#[cfg(feature = "enterprise")]
|
#[cfg(feature = "enterprise")]
|
||||||
for name in self.extra_table_factories.keys() {
|
for name in self.extra_table_factories.keys() {
|
||||||
tables.insert(name.to_string(), self.build_table(name).expect(name));
|
tables.insert(name.clone(), self.build_table(name).expect(name));
|
||||||
}
|
}
|
||||||
// Add memory tables
|
// Add memory tables
|
||||||
for name in MEMORY_TABLES.iter() {
|
for name in MEMORY_TABLES.iter() {
|
||||||
|
|||||||
@@ -254,9 +254,9 @@ impl InformationSchemaFlowsBuilder {
|
|||||||
.await
|
.await
|
||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.context(InternalSnafu)?
|
.context(InternalSnafu)?
|
||||||
.context(FlowInfoNotFoundSnafu {
|
.with_context(|| FlowInfoNotFoundSnafu {
|
||||||
catalog_name: catalog_name.to_string(),
|
catalog_name: catalog_name.clone(),
|
||||||
flow_name: flow_name.to_string(),
|
flow_name: flow_name.clone(),
|
||||||
})?;
|
})?;
|
||||||
self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)
|
self.add_flow(&predicates, flow_id.flow_id(), flow_info, &flow_stat)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -273,11 +273,11 @@ impl InformationSchemaFlowsBuilder {
|
|||||||
flow_stat: &Option<FlowStat>,
|
flow_stat: &Option<FlowStat>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let row = [
|
let row = [
|
||||||
(FLOW_NAME, &Value::from(flow_info.flow_name().to_string())),
|
(FLOW_NAME, &Value::from(flow_info.flow_name().clone())),
|
||||||
(FLOW_ID, &Value::from(flow_id)),
|
(FLOW_ID, &Value::from(flow_id)),
|
||||||
(
|
(
|
||||||
TABLE_CATALOG,
|
TABLE_CATALOG,
|
||||||
&Value::from(flow_info.catalog_name().to_string()),
|
&Value::from(flow_info.catalog_name().clone()),
|
||||||
),
|
),
|
||||||
];
|
];
|
||||||
if !predicates.eval(&row) {
|
if !predicates.eval(&row) {
|
||||||
|
|||||||
@@ -135,7 +135,7 @@ async fn make_process_list(
|
|||||||
|
|
||||||
for process in queries {
|
for process in queries {
|
||||||
let display_id = DisplayProcessId {
|
let display_id = DisplayProcessId {
|
||||||
server_addr: process.frontend.to_string(),
|
server_addr: process.frontend.clone(),
|
||||||
id: process.id,
|
id: process.id,
|
||||||
}
|
}
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|||||||
@@ -199,10 +199,7 @@ impl InformationSchemaRegionPeersBuilder {
|
|||||||
if table_info.table_type == TableType::Temporary {
|
if table_info.table_type == TableType::Temporary {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
} else {
|
} else {
|
||||||
Ok(Some((
|
Ok(Some((table_info.ident.table_id, table_info.name.clone())))
|
||||||
table_info.ident.table_id,
|
|
||||||
table_info.name.to_string(),
|
|
||||||
)))
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -166,7 +166,7 @@ impl CatalogInfo for CatalogManagerWrapper {
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| DataFusionError::External(Box::new(e)))
|
.map_err(|e| DataFusionError::External(Box::new(e)))
|
||||||
} else {
|
} else {
|
||||||
Ok(vec![self.catalog_name.to_string()])
|
Ok(vec![self.catalog_name.clone()])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -201,7 +201,7 @@ impl DfTableSourceProvider {
|
|||||||
|
|
||||||
Ok(Arc::new(ViewTable::new(
|
Ok(Arc::new(ViewTable::new(
|
||||||
logical_plan,
|
logical_plan,
|
||||||
Some(view_info.definition.to_string()),
|
Some(view_info.definition.clone()),
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ impl DelKeyCommand {
|
|||||||
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
pub async fn build(&self) -> Result<Box<dyn Tool>, BoxedError> {
|
||||||
let kv_backend = self.store.build().await?;
|
let kv_backend = self.store.build().await?;
|
||||||
Ok(Box::new(DelKeyTool {
|
Ok(Box::new(DelKeyTool {
|
||||||
key: self.key.to_string(),
|
key: self.key.clone(),
|
||||||
prefix: self.prefix,
|
prefix: self.prefix,
|
||||||
key_deleter: KeyDeleter::new(kv_backend),
|
key_deleter: KeyDeleter::new(kv_backend),
|
||||||
}))
|
}))
|
||||||
|
|||||||
@@ -138,13 +138,7 @@ impl RepairTool {
|
|||||||
|
|
||||||
let table_names = table_names
|
let table_names = table_names
|
||||||
.iter()
|
.iter()
|
||||||
.map(|table_name| {
|
.map(|table_name| (catalog.clone(), schema_name.clone(), table_name.clone()))
|
||||||
(
|
|
||||||
catalog.to_string(),
|
|
||||||
schema_name.to_string(),
|
|
||||||
table_name.to_string(),
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
return Ok(IteratorInput::new_table_names(table_names));
|
return Ok(IteratorInput::new_table_names(table_names));
|
||||||
} else if !self.table_ids.is_empty() {
|
} else if !self.table_ids.is_empty() {
|
||||||
|
|||||||
@@ -32,9 +32,9 @@ pub fn generate_alter_table_expr_for_all_columns(
|
|||||||
let schema = &table_info.meta.schema;
|
let schema = &table_info.meta.schema;
|
||||||
|
|
||||||
let mut alter_table_expr = AlterTableExpr {
|
let mut alter_table_expr = AlterTableExpr {
|
||||||
catalog_name: table_info.catalog_name.to_string(),
|
catalog_name: table_info.catalog_name.clone(),
|
||||||
schema_name: table_info.schema_name.to_string(),
|
schema_name: table_info.schema_name.clone(),
|
||||||
table_name: table_info.name.to_string(),
|
table_name: table_info.name.clone(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -44,9 +44,9 @@ pub fn generate_create_table_expr(table_info: &RawTableInfo) -> Result<CreateTab
|
|||||||
let table_options = HashMap::from(&table_info.meta.options);
|
let table_options = HashMap::from(&table_info.meta.options);
|
||||||
|
|
||||||
Ok(CreateTableExpr {
|
Ok(CreateTableExpr {
|
||||||
catalog_name: table_info.catalog_name.to_string(),
|
catalog_name: table_info.catalog_name.clone(),
|
||||||
schema_name: table_info.schema_name.to_string(),
|
schema_name: table_info.schema_name.clone(),
|
||||||
table_name: table_info.name.to_string(),
|
table_name: table_info.name.clone(),
|
||||||
desc: String::default(),
|
desc: String::default(),
|
||||||
column_defs,
|
column_defs,
|
||||||
time_index,
|
time_index,
|
||||||
@@ -54,7 +54,7 @@ pub fn generate_create_table_expr(table_info: &RawTableInfo) -> Result<CreateTab
|
|||||||
create_if_not_exists: true,
|
create_if_not_exists: true,
|
||||||
table_options,
|
table_options,
|
||||||
table_id: None,
|
table_id: None,
|
||||||
engine: table_info.meta.engine.to_string(),
|
engine: table_info.meta.engine.clone(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ use common_error::define_from_tonic_status;
|
|||||||
use common_error::ext::{BoxedError, ErrorExt};
|
use common_error::ext::{BoxedError, ErrorExt};
|
||||||
use common_error::status_code::StatusCode;
|
use common_error::status_code::StatusCode;
|
||||||
use common_macro::stack_trace_debug;
|
use common_macro::stack_trace_debug;
|
||||||
use snafu::{Location, Snafu, location};
|
use snafu::{Location, Snafu};
|
||||||
use tonic::Code;
|
use tonic::Code;
|
||||||
use tonic::metadata::errors::InvalidMetadataValue;
|
use tonic::metadata::errors::InvalidMetadataValue;
|
||||||
|
|
||||||
|
|||||||
@@ -252,10 +252,10 @@ impl StartCommand {
|
|||||||
|
|
||||||
if let Some(addr) = &self.internal_rpc_bind_addr {
|
if let Some(addr) = &self.internal_rpc_bind_addr {
|
||||||
if let Some(internal_grpc) = &mut opts.internal_grpc {
|
if let Some(internal_grpc) = &mut opts.internal_grpc {
|
||||||
internal_grpc.bind_addr = addr.to_string();
|
internal_grpc.bind_addr = addr.clone();
|
||||||
} else {
|
} else {
|
||||||
let grpc_options = GrpcOptions {
|
let grpc_options = GrpcOptions {
|
||||||
bind_addr: addr.to_string(),
|
bind_addr: addr.clone(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -265,10 +265,10 @@ impl StartCommand {
|
|||||||
|
|
||||||
if let Some(addr) = &self.internal_rpc_server_addr {
|
if let Some(addr) = &self.internal_rpc_server_addr {
|
||||||
if let Some(internal_grpc) = &mut opts.internal_grpc {
|
if let Some(internal_grpc) = &mut opts.internal_grpc {
|
||||||
internal_grpc.server_addr = addr.to_string();
|
internal_grpc.server_addr = addr.clone();
|
||||||
} else {
|
} else {
|
||||||
let grpc_options = GrpcOptions {
|
let grpc_options = GrpcOptions {
|
||||||
server_addr: addr.to_string(),
|
server_addr: addr.clone(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
opts.internal_grpc = Some(grpc_options);
|
opts.internal_grpc = Some(grpc_options);
|
||||||
|
|||||||
@@ -12,7 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
#![feature(assert_matches, let_chains)]
|
#![feature(assert_matches)]
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use common_error::ext::ErrorExt;
|
use common_error::ext::ErrorExt;
|
||||||
|
|||||||
@@ -75,11 +75,11 @@ impl Plugins {
|
|||||||
self.read().is_empty()
|
self.read().is_empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read(&self) -> RwLockReadGuard<SendSyncAnyMap> {
|
fn read(&self) -> RwLockReadGuard<'_, SendSyncAnyMap> {
|
||||||
self.inner.read().unwrap()
|
self.inner.read().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write(&self) -> RwLockWriteGuard<SendSyncAnyMap> {
|
fn write(&self) -> RwLockWriteGuard<'_, SendSyncAnyMap> {
|
||||||
self.inner.write().unwrap()
|
self.inner.write().unwrap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -150,7 +150,7 @@ impl<
|
|||||||
if let Some(ref mut writer) = self.writer {
|
if let Some(ref mut writer) = self.writer {
|
||||||
Ok(writer)
|
Ok(writer)
|
||||||
} else {
|
} else {
|
||||||
let writer = (self.writer_factory)(self.path.to_string()).await?;
|
let writer = (self.writer_factory)(self.path.clone()).await?;
|
||||||
Ok(self.writer.insert(writer))
|
Ok(self.writer.insert(writer))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -87,7 +87,7 @@ pub(crate) fn scan_config(
|
|||||||
) -> FileScanConfig {
|
) -> FileScanConfig {
|
||||||
// object_store only recognize the Unix style path, so make it happy.
|
// object_store only recognize the Unix style path, so make it happy.
|
||||||
let filename = &filename.replace('\\', "/");
|
let filename = &filename.replace('\\', "/");
|
||||||
let file_group = FileGroup::new(vec![PartitionedFile::new(filename.to_string(), 4096)]);
|
let file_group = FileGroup::new(vec![PartitionedFile::new(filename.clone(), 4096)]);
|
||||||
|
|
||||||
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_schema, file_source)
|
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_schema, file_source)
|
||||||
.with_file_group(file_group)
|
.with_file_group(file_group)
|
||||||
|
|||||||
@@ -71,7 +71,7 @@ impl FunctionRegistry {
|
|||||||
for alias in func.aliases() {
|
for alias in func.aliases() {
|
||||||
let func: ScalarFunctionFactory = func.clone().into();
|
let func: ScalarFunctionFactory = func.clone().into();
|
||||||
let alias = ScalarFunctionFactory {
|
let alias = ScalarFunctionFactory {
|
||||||
name: alias.to_string(),
|
name: alias.clone(),
|
||||||
..func
|
..func
|
||||||
};
|
};
|
||||||
self.register(alias);
|
self.register(alias);
|
||||||
|
|||||||
@@ -12,7 +12,6 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
#![feature(let_chains)]
|
|
||||||
#![feature(try_blocks)]
|
#![feature(try_blocks)]
|
||||||
#![feature(assert_matches)]
|
#![feature(assert_matches)]
|
||||||
|
|
||||||
|
|||||||
@@ -309,7 +309,7 @@ fn is_ipv6_in_range(ip: &Ipv6Addr, cidr_base: &Ipv6Addr, prefix_len: u8) -> Opti
|
|||||||
}
|
}
|
||||||
|
|
||||||
// If there's a partial byte to check
|
// If there's a partial byte to check
|
||||||
if prefix_len % 8 != 0 && full_bytes < 16 {
|
if !prefix_len.is_multiple_of(8) && full_bytes < 16 {
|
||||||
let bits_to_check = prefix_len % 8;
|
let bits_to_check = prefix_len % 8;
|
||||||
let mask = 0xFF_u8 << (8 - bits_to_check);
|
let mask = 0xFF_u8 << (8 - bits_to_check);
|
||||||
|
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ pub fn as_veclit(arg: &ScalarValue) -> Result<Option<Cow<'_, [f32]>>> {
|
|||||||
|
|
||||||
/// Convert a u8 slice to a vector literal.
|
/// Convert a u8 slice to a vector literal.
|
||||||
pub fn binlit_as_veclit(bytes: &[u8]) -> Result<Cow<'_, [f32]>> {
|
pub fn binlit_as_veclit(bytes: &[u8]) -> Result<Cow<'_, [f32]>> {
|
||||||
if bytes.len() % std::mem::size_of::<f32>() != 0 {
|
if !bytes.len().is_multiple_of(size_of::<f32>()) {
|
||||||
return InvalidFuncArgsSnafu {
|
return InvalidFuncArgsSnafu {
|
||||||
err_msg: format!("Invalid binary length of vector: {}", bytes.len()),
|
err_msg: format!("Invalid binary length of vector: {}", bytes.len()),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -479,7 +479,7 @@ impl Pool {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn entry(&self, addr: String) -> Entry<String, Channel> {
|
fn entry(&self, addr: String) -> Entry<'_, String, Channel> {
|
||||||
self.channels.entry(addr)
|
self.channels.entry(addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -98,7 +98,7 @@ fn impl_schema_method(fields: &[ParsedField<'_>]) -> Result<TokenStream2> {
|
|||||||
}
|
}
|
||||||
Some(TypeExt::StructType(ext)) => {
|
Some(TypeExt::StructType(ext)) => {
|
||||||
let fields = ext.fields.iter().map(|field| {
|
let fields = ext.fields.iter().map(|field| {
|
||||||
let field_name = syn::Ident::new(&field.name.to_string(), ident.span());
|
let field_name = syn::Ident::new(&field.name.clone(), ident.span());
|
||||||
let field_type = syn::Ident::new(&field.datatype.to_string(), ident.span());
|
let field_type = syn::Ident::new(&field.datatype.to_string(), ident.span());
|
||||||
quote! {
|
quote! {
|
||||||
StructField { name: #field_name, type_: #field_type }
|
StructField { name: #field_name, type_: #field_type }
|
||||||
|
|||||||
@@ -29,22 +29,17 @@ use crate::error;
|
|||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
use crate::heartbeat::utils::get_datanode_workloads;
|
use crate::heartbeat::utils::get_datanode_workloads;
|
||||||
|
|
||||||
pub(crate) const DATANODE_LEASE_PREFIX: &str = "__meta_datanode_lease";
|
|
||||||
const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region";
|
|
||||||
|
|
||||||
const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
|
const DATANODE_STAT_PREFIX: &str = "__meta_datanode_stat";
|
||||||
|
|
||||||
pub const REGION_STATISTIC_KEY: &str = "__region_statistic";
|
pub const REGION_STATISTIC_KEY: &str = "__region_statistic";
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
|
pub(crate) static ref DATANODE_LEASE_KEY_PATTERN: Regex =
|
||||||
Regex::new(&format!("^{DATANODE_LEASE_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
|
Regex::new("^__meta_datanode_lease-([0-9]+)-([0-9]+)$").unwrap();
|
||||||
static ref DATANODE_STAT_KEY_PATTERN: Regex =
|
static ref DATANODE_STAT_KEY_PATTERN: Regex =
|
||||||
Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
|
Regex::new(&format!("^{DATANODE_STAT_PREFIX}-([0-9]+)-([0-9]+)$")).unwrap();
|
||||||
static ref INACTIVE_REGION_KEY_PATTERN: Regex = Regex::new(&format!(
|
static ref INACTIVE_REGION_KEY_PATTERN: Regex =
|
||||||
"^{INACTIVE_REGION_PREFIX}-([0-9]+)-([0-9]+)-([0-9]+)$"
|
Regex::new("^__meta_inactive_region-([0-9]+)-([0-9]+)-([0-9]+)$").unwrap();
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The key of the datanode stat in the storage.
|
/// The key of the datanode stat in the storage.
|
||||||
@@ -297,7 +292,7 @@ impl From<&api::v1::meta::RegionStat> for RegionStat {
|
|||||||
rcus: value.rcus,
|
rcus: value.rcus,
|
||||||
wcus: value.wcus,
|
wcus: value.wcus,
|
||||||
approximate_bytes: value.approximate_bytes as u64,
|
approximate_bytes: value.approximate_bytes as u64,
|
||||||
engine: value.engine.to_string(),
|
engine: value.engine.clone(),
|
||||||
role: RegionRole::from(value.role()),
|
role: RegionRole::from(value.role()),
|
||||||
num_rows: region_stat.num_rows,
|
num_rows: region_stat.num_rows,
|
||||||
memtable_size: region_stat.memtable_size,
|
memtable_size: region_stat.memtable_size,
|
||||||
|
|||||||
@@ -85,7 +85,7 @@ impl<'a> AlterLogicalTableValidator<'a> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn table_names(&self) -> Vec<TableReference> {
|
fn table_names(&self) -> Vec<TableReference<'_>> {
|
||||||
self.alters
|
self.alters
|
||||||
.iter()
|
.iter()
|
||||||
.map(|alter| {
|
.map(|alter| {
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ fn build_executor_from_alter_expr(alter_data: &AlterTableData) -> AlterTableExec
|
|||||||
let table_id = alter_data.table_id;
|
let table_id = alter_data.table_id;
|
||||||
let alter_kind = alter_data.task.alter_table.kind.as_ref().unwrap();
|
let alter_kind = alter_data.task.alter_table.kind.as_ref().unwrap();
|
||||||
let new_table_name = if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
|
let new_table_name = if let Kind::RenameTable(RenameTable { new_table_name }) = alter_kind {
|
||||||
Some(new_table_name.to_string())
|
Some(new_table_name.clone())
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
@@ -418,7 +418,7 @@ impl AlterTableData {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn table_ref(&self) -> TableReference {
|
fn table_ref(&self) -> TableReference<'_> {
|
||||||
self.task.table_ref()
|
self.task.table_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -132,7 +132,7 @@ impl AlterTableExecutor {
|
|||||||
);
|
);
|
||||||
|
|
||||||
table_metadata_manager
|
table_metadata_manager
|
||||||
.rename_table(current_table_info_value, new_table_name.to_string())
|
.rename_table(current_table_info_value, new_table_name.clone())
|
||||||
.await?;
|
.await?;
|
||||||
} else {
|
} else {
|
||||||
debug!(
|
debug!(
|
||||||
@@ -293,7 +293,7 @@ fn build_new_table_info(
|
|||||||
new_info.meta.next_column_id += columns.len() as u32;
|
new_info.meta.next_column_id += columns.len() as u32;
|
||||||
}
|
}
|
||||||
AlterKind::RenameTable { new_table_name } => {
|
AlterKind::RenameTable { new_table_name } => {
|
||||||
new_info.name = new_table_name.to_string();
|
new_info.name = new_table_name.clone();
|
||||||
}
|
}
|
||||||
AlterKind::DropColumns { .. }
|
AlterKind::DropColumns { .. }
|
||||||
| AlterKind::ModifyColumnTypes { .. }
|
| AlterKind::ModifyColumnTypes { .. }
|
||||||
|
|||||||
@@ -379,9 +379,10 @@ pub enum CreateFlowState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// The type of flow.
|
/// The type of flow.
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
|
||||||
pub enum FlowType {
|
pub enum FlowType {
|
||||||
/// The flow is a batching task.
|
/// The flow is a batching task.
|
||||||
|
#[default]
|
||||||
Batching,
|
Batching,
|
||||||
/// The flow is a streaming task.
|
/// The flow is a streaming task.
|
||||||
Streaming,
|
Streaming,
|
||||||
@@ -393,12 +394,6 @@ impl FlowType {
|
|||||||
pub const FLOW_TYPE_KEY: &str = "flow_type";
|
pub const FLOW_TYPE_KEY: &str = "flow_type";
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for FlowType {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::Batching
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for FlowType {
|
impl fmt::Display for FlowType {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ impl CreateLogicalTablesProcedure {
|
|||||||
ensure!(
|
ensure!(
|
||||||
task.create_table.create_if_not_exists,
|
task.create_table.create_if_not_exists,
|
||||||
TableAlreadyExistsSnafu {
|
TableAlreadyExistsSnafu {
|
||||||
table_name: task.create_table.table_name.to_string(),
|
table_name: task.create_table.table_name.clone(),
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
continue;
|
continue;
|
||||||
|
|||||||
@@ -119,7 +119,7 @@ pub(crate) fn build_template(create_table_expr: &CreateTableExpr) -> Result<Crea
|
|||||||
|
|
||||||
let template = CreateRequest {
|
let template = CreateRequest {
|
||||||
region_id: 0,
|
region_id: 0,
|
||||||
engine: create_table_expr.engine.to_string(),
|
engine: create_table_expr.engine.clone(),
|
||||||
column_defs,
|
column_defs,
|
||||||
primary_key,
|
primary_key,
|
||||||
path: String::new(),
|
path: String::new(),
|
||||||
|
|||||||
@@ -167,8 +167,8 @@ impl DropFlowProcedure {
|
|||||||
&[
|
&[
|
||||||
CacheIdent::FlowId(flow_id),
|
CacheIdent::FlowId(flow_id),
|
||||||
CacheIdent::FlowName(FlowName {
|
CacheIdent::FlowName(FlowName {
|
||||||
catalog_name: flow_info_value.catalog_name.to_string(),
|
catalog_name: flow_info_value.catalog_name.clone(),
|
||||||
flow_name: flow_info_value.flow_name.to_string(),
|
flow_name: flow_info_value.flow_name.clone(),
|
||||||
}),
|
}),
|
||||||
CacheIdent::DropFlow(DropFlow {
|
CacheIdent::DropFlow(DropFlow {
|
||||||
flow_id,
|
flow_id,
|
||||||
|
|||||||
@@ -291,7 +291,7 @@ impl DropTableData {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn table_ref(&self) -> TableReference {
|
fn table_ref(&self) -> TableReference<'_> {
|
||||||
self.task.table_ref()
|
self.task.table_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -219,7 +219,7 @@ pub(crate) struct DropViewData {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl DropViewData {
|
impl DropViewData {
|
||||||
fn table_ref(&self) -> TableReference {
|
fn table_ref(&self) -> TableReference<'_> {
|
||||||
self.task.table_ref()
|
self.task.table_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -247,7 +247,7 @@ pub fn assert_column_name(table_info: &RawTableInfo, expected_column_names: &[&s
|
|||||||
.schema
|
.schema
|
||||||
.column_schemas
|
.column_schemas
|
||||||
.iter()
|
.iter()
|
||||||
.map(|c| c.name.to_string())
|
.map(|c| c.name.clone())
|
||||||
.collect::<Vec<_>>(),
|
.collect::<Vec<_>>(),
|
||||||
expected_column_names
|
expected_column_names
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -98,10 +98,10 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
|
|||||||
.unwrap_or(0),
|
.unwrap_or(0),
|
||||||
version: 1,
|
version: 1,
|
||||||
},
|
},
|
||||||
name: expr.table_name.to_string(),
|
name: expr.table_name.clone(),
|
||||||
desc: Some(expr.desc.to_string()),
|
desc: Some(expr.desc.clone()),
|
||||||
catalog_name: expr.catalog_name.to_string(),
|
catalog_name: expr.catalog_name.clone(),
|
||||||
schema_name: expr.schema_name.to_string(),
|
schema_name: expr.schema_name.clone(),
|
||||||
meta: RawTableMeta {
|
meta: RawTableMeta {
|
||||||
schema: RawSchema {
|
schema: RawSchema {
|
||||||
column_schemas: expr
|
column_schemas: expr
|
||||||
@@ -126,7 +126,7 @@ pub fn build_raw_table_info_from_expr(expr: &CreateTableExpr) -> RawTableInfo {
|
|||||||
})
|
})
|
||||||
.collect(),
|
.collect(),
|
||||||
value_indices: vec![],
|
value_indices: vec![],
|
||||||
engine: expr.engine.to_string(),
|
engine: expr.engine.clone(),
|
||||||
next_column_id: expr.column_defs.len() as u32,
|
next_column_id: expr.column_defs.len() as u32,
|
||||||
region_numbers: vec![],
|
region_numbers: vec![],
|
||||||
options: TableOptions::try_from_iter(&expr.table_options).unwrap(),
|
options: TableOptions::try_from_iter(&expr.table_options).unwrap(),
|
||||||
|
|||||||
@@ -218,7 +218,7 @@ pub struct PartialSuccessDatanodeHandler {
|
|||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
|
impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
|
||||||
async fn handle(&self, peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
|
async fn handle(&self, peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
|
||||||
let success = peer.id % 2 == 0;
|
let success = peer.id.is_multiple_of(2);
|
||||||
if success {
|
if success {
|
||||||
Ok(RegionResponse::new(0))
|
Ok(RegionResponse::new(0))
|
||||||
} else if self.retryable {
|
} else if self.retryable {
|
||||||
|
|||||||
@@ -218,7 +218,7 @@ impl TruncateTableData {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn table_ref(&self) -> TableReference {
|
pub fn table_ref(&self) -> TableReference<'_> {
|
||||||
self.task.table_ref()
|
self.task.table_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -302,35 +302,25 @@ pub struct DropFlow {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Strategy for executing flush operations.
|
/// Strategy for executing flush operations.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
|
||||||
pub enum FlushStrategy {
|
pub enum FlushStrategy {
|
||||||
/// Synchronous operation that waits for completion and expects a reply
|
/// Synchronous operation that waits for completion and expects a reply
|
||||||
|
#[default]
|
||||||
Sync,
|
Sync,
|
||||||
/// Asynchronous hint operation (fire-and-forget, no reply expected)
|
/// Asynchronous hint operation (fire-and-forget, no reply expected)
|
||||||
Async,
|
Async,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Error handling strategy for batch flush operations.
|
/// Error handling strategy for batch flush operations.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
|
||||||
pub enum FlushErrorStrategy {
|
pub enum FlushErrorStrategy {
|
||||||
/// Abort on first error (fail-fast)
|
/// Abort on first error (fail-fast)
|
||||||
|
#[default]
|
||||||
FailFast,
|
FailFast,
|
||||||
/// Attempt to flush all regions and collect all errors
|
/// Attempt to flush all regions and collect all errors
|
||||||
TryAll,
|
TryAll,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for FlushStrategy {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::Sync
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for FlushErrorStrategy {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::FailFast
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Unified flush instruction supporting both single and batch operations
|
/// Unified flush instruction supporting both single and batch operations
|
||||||
/// with configurable execution strategies and error handling.
|
/// with configurable execution strategies and error handling.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
|||||||
@@ -2172,7 +2172,7 @@ mod tests {
|
|||||||
table_id,
|
table_id,
|
||||||
RegionInfo {
|
RegionInfo {
|
||||||
engine: engine.to_string(),
|
engine: engine.to_string(),
|
||||||
region_storage_path: region_storage_path.to_string(),
|
region_storage_path: region_storage_path.clone(),
|
||||||
region_options: HashMap::new(),
|
region_options: HashMap::new(),
|
||||||
region_wal_options: HashMap::new(),
|
region_wal_options: HashMap::new(),
|
||||||
},
|
},
|
||||||
@@ -2191,7 +2191,7 @@ mod tests {
|
|||||||
table_id,
|
table_id,
|
||||||
RegionInfo {
|
RegionInfo {
|
||||||
engine: engine.to_string(),
|
engine: engine.to_string(),
|
||||||
region_storage_path: region_storage_path.to_string(),
|
region_storage_path: region_storage_path.clone(),
|
||||||
region_options: HashMap::new(),
|
region_options: HashMap::new(),
|
||||||
region_wal_options: HashMap::new(),
|
region_wal_options: HashMap::new(),
|
||||||
},
|
},
|
||||||
@@ -2216,7 +2216,7 @@ mod tests {
|
|||||||
table_id,
|
table_id,
|
||||||
RegionInfo {
|
RegionInfo {
|
||||||
engine: engine.to_string(),
|
engine: engine.to_string(),
|
||||||
region_storage_path: region_storage_path.to_string(),
|
region_storage_path: region_storage_path.clone(),
|
||||||
region_options: HashMap::new(),
|
region_options: HashMap::new(),
|
||||||
region_wal_options: HashMap::new(),
|
region_wal_options: HashMap::new(),
|
||||||
},
|
},
|
||||||
@@ -2247,7 +2247,7 @@ mod tests {
|
|||||||
table_id,
|
table_id,
|
||||||
RegionInfo {
|
RegionInfo {
|
||||||
engine: engine.to_string(),
|
engine: engine.to_string(),
|
||||||
region_storage_path: region_storage_path.to_string(),
|
region_storage_path: region_storage_path.clone(),
|
||||||
region_options: HashMap::new(),
|
region_options: HashMap::new(),
|
||||||
region_wal_options: HashMap::new(),
|
region_wal_options: HashMap::new(),
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -113,7 +113,7 @@ impl TableInfoValue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn table_ref(&self) -> TableReference {
|
pub fn table_ref(&self) -> TableReference<'_> {
|
||||||
TableReference::full(
|
TableReference::full(
|
||||||
&self.table_info.catalog_name,
|
&self.table_info.catalog_name,
|
||||||
&self.table_info.schema_name,
|
&self.table_info.schema_name,
|
||||||
@@ -123,9 +123,9 @@ impl TableInfoValue {
|
|||||||
|
|
||||||
pub fn table_name(&self) -> TableName {
|
pub fn table_name(&self) -> TableName {
|
||||||
TableName {
|
TableName {
|
||||||
catalog_name: self.table_info.catalog_name.to_string(),
|
catalog_name: self.table_info.catalog_name.clone(),
|
||||||
schema_name: self.table_info.schema_name.to_string(),
|
schema_name: self.table_info.schema_name.clone(),
|
||||||
table_name: self.table_info.name.to_string(),
|
table_name: self.table_info.name.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -555,8 +555,8 @@ impl MySqlStore {
|
|||||||
sqlx::query(&sql_template_set.create_table_statement)
|
sqlx::query(&sql_template_set.create_table_statement)
|
||||||
.execute(&pool)
|
.execute(&pool)
|
||||||
.await
|
.await
|
||||||
.context(MySqlExecutionSnafu {
|
.with_context(|_| MySqlExecutionSnafu {
|
||||||
sql: sql_template_set.create_table_statement.to_string(),
|
sql: sql_template_set.create_table_statement.clone(),
|
||||||
})?;
|
})?;
|
||||||
Ok(Arc::new(MySqlStore {
|
Ok(Arc::new(MySqlStore {
|
||||||
max_txn_ops,
|
max_txn_ops,
|
||||||
|
|||||||
@@ -880,7 +880,7 @@ impl PgStore {
|
|||||||
.execute(&sql_template_set.create_table_statement, &[])
|
.execute(&sql_template_set.create_table_statement, &[])
|
||||||
.await
|
.await
|
||||||
.with_context(|_| PostgresExecutionSnafu {
|
.with_context(|_| PostgresExecutionSnafu {
|
||||||
sql: sql_template_set.create_table_statement.to_string(),
|
sql: sql_template_set.create_table_statement.clone(),
|
||||||
})?;
|
})?;
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
max_txn_ops,
|
max_txn_ops,
|
||||||
@@ -926,8 +926,8 @@ mod tests {
|
|||||||
client
|
client
|
||||||
.execute(&sql_templates.create_table_statement, &[])
|
.execute(&sql_templates.create_table_statement, &[])
|
||||||
.await
|
.await
|
||||||
.context(PostgresExecutionSnafu {
|
.with_context(|_| PostgresExecutionSnafu {
|
||||||
sql: sql_templates.create_table_statement.to_string(),
|
sql: sql_templates.create_table_statement.clone(),
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
Some(PgStore {
|
Some(PgStore {
|
||||||
|
|||||||
@@ -13,8 +13,6 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
#![feature(assert_matches)]
|
#![feature(assert_matches)]
|
||||||
#![feature(btree_extract_if)]
|
|
||||||
#![feature(let_chains)]
|
|
||||||
#![feature(duration_millis_float)]
|
#![feature(duration_millis_float)]
|
||||||
|
|
||||||
pub mod cache;
|
pub mod cache;
|
||||||
|
|||||||
@@ -71,7 +71,7 @@ impl State for ReconcileRegions {
|
|||||||
.column_metadatas
|
.column_metadatas
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|c| c.semantic_type == SemanticType::Tag)
|
.filter(|c| c.semantic_type == SemanticType::Tag)
|
||||||
.map(|c| c.column_schema.name.to_string())
|
.map(|c| c.column_schema.name.clone())
|
||||||
.collect::<HashSet<_>>();
|
.collect::<HashSet<_>>();
|
||||||
let column_defs = self
|
let column_defs = self
|
||||||
.column_metadatas
|
.column_metadatas
|
||||||
|
|||||||
@@ -383,7 +383,7 @@ pub struct CreateViewTask {
|
|||||||
|
|
||||||
impl CreateViewTask {
|
impl CreateViewTask {
|
||||||
/// Returns the [`TableReference`] of view.
|
/// Returns the [`TableReference`] of view.
|
||||||
pub fn table_ref(&self) -> TableReference {
|
pub fn table_ref(&self) -> TableReference<'_> {
|
||||||
TableReference {
|
TableReference {
|
||||||
catalog: &self.create_view.catalog_name,
|
catalog: &self.create_view.catalog_name,
|
||||||
schema: &self.create_view.schema_name,
|
schema: &self.create_view.schema_name,
|
||||||
@@ -496,7 +496,7 @@ pub struct DropViewTask {
|
|||||||
|
|
||||||
impl DropViewTask {
|
impl DropViewTask {
|
||||||
/// Returns the [`TableReference`] of view.
|
/// Returns the [`TableReference`] of view.
|
||||||
pub fn table_ref(&self) -> TableReference {
|
pub fn table_ref(&self) -> TableReference<'_> {
|
||||||
TableReference {
|
TableReference {
|
||||||
catalog: &self.catalog,
|
catalog: &self.catalog,
|
||||||
schema: &self.schema,
|
schema: &self.schema,
|
||||||
@@ -553,7 +553,7 @@ pub struct DropTableTask {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl DropTableTask {
|
impl DropTableTask {
|
||||||
pub fn table_ref(&self) -> TableReference {
|
pub fn table_ref(&self) -> TableReference<'_> {
|
||||||
TableReference {
|
TableReference {
|
||||||
catalog: &self.catalog,
|
catalog: &self.catalog,
|
||||||
schema: &self.schema,
|
schema: &self.schema,
|
||||||
@@ -563,9 +563,9 @@ impl DropTableTask {
|
|||||||
|
|
||||||
pub fn table_name(&self) -> TableName {
|
pub fn table_name(&self) -> TableName {
|
||||||
TableName {
|
TableName {
|
||||||
catalog_name: self.catalog.to_string(),
|
catalog_name: self.catalog.clone(),
|
||||||
schema_name: self.schema.to_string(),
|
schema_name: self.schema.clone(),
|
||||||
table_name: self.table.to_string(),
|
table_name: self.table.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -659,13 +659,13 @@ impl CreateTableTask {
|
|||||||
let table = &self.create_table;
|
let table = &self.create_table;
|
||||||
|
|
||||||
TableName {
|
TableName {
|
||||||
catalog_name: table.catalog_name.to_string(),
|
catalog_name: table.catalog_name.clone(),
|
||||||
schema_name: table.schema_name.to_string(),
|
schema_name: table.schema_name.clone(),
|
||||||
table_name: table.table_name.to_string(),
|
table_name: table.table_name.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn table_ref(&self) -> TableReference {
|
pub fn table_ref(&self) -> TableReference<'_> {
|
||||||
let table = &self.create_table;
|
let table = &self.create_table;
|
||||||
|
|
||||||
TableReference {
|
TableReference {
|
||||||
@@ -750,7 +750,7 @@ impl AlterTableTask {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn table_ref(&self) -> TableReference {
|
pub fn table_ref(&self) -> TableReference<'_> {
|
||||||
TableReference {
|
TableReference {
|
||||||
catalog: &self.alter_table.catalog_name,
|
catalog: &self.alter_table.catalog_name,
|
||||||
schema: &self.alter_table.schema_name,
|
schema: &self.alter_table.schema_name,
|
||||||
@@ -762,9 +762,9 @@ impl AlterTableTask {
|
|||||||
let table = &self.alter_table;
|
let table = &self.alter_table;
|
||||||
|
|
||||||
TableName {
|
TableName {
|
||||||
catalog_name: table.catalog_name.to_string(),
|
catalog_name: table.catalog_name.clone(),
|
||||||
schema_name: table.schema_name.to_string(),
|
schema_name: table.schema_name.clone(),
|
||||||
table_name: table.table_name.to_string(),
|
table_name: table.table_name.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -834,7 +834,7 @@ pub struct TruncateTableTask {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TruncateTableTask {
|
impl TruncateTableTask {
|
||||||
pub fn table_ref(&self) -> TableReference {
|
pub fn table_ref(&self) -> TableReference<'_> {
|
||||||
TableReference {
|
TableReference {
|
||||||
catalog: &self.catalog,
|
catalog: &self.catalog,
|
||||||
schema: &self.schema,
|
schema: &self.schema,
|
||||||
@@ -844,9 +844,9 @@ impl TruncateTableTask {
|
|||||||
|
|
||||||
pub fn table_name(&self) -> TableName {
|
pub fn table_name(&self) -> TableName {
|
||||||
TableName {
|
TableName {
|
||||||
catalog_name: self.catalog.to_string(),
|
catalog_name: self.catalog.clone(),
|
||||||
schema_name: self.schema.to_string(),
|
schema_name: self.schema.clone(),
|
||||||
table_name: self.table.to_string(),
|
table_name: self.table.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1344,7 +1344,7 @@ impl From<QueryContextRef> for QueryContext {
|
|||||||
fn from(query_context: QueryContextRef) -> Self {
|
fn from(query_context: QueryContextRef) -> Self {
|
||||||
QueryContext {
|
QueryContext {
|
||||||
current_catalog: query_context.current_catalog().to_string(),
|
current_catalog: query_context.current_catalog().to_string(),
|
||||||
current_schema: query_context.current_schema().to_string(),
|
current_schema: query_context.current_schema().clone(),
|
||||||
timezone: query_context.timezone().to_string(),
|
timezone: query_context.timezone().to_string(),
|
||||||
extensions: query_context.extensions(),
|
extensions: query_context.extensions(),
|
||||||
channel: query_context.channel() as u8,
|
channel: query_context.channel() as u8,
|
||||||
@@ -1401,7 +1401,7 @@ impl From<QueryContextRef> for FlowQueryContext {
|
|||||||
fn from(ctx: QueryContextRef) -> Self {
|
fn from(ctx: QueryContextRef) -> Self {
|
||||||
Self {
|
Self {
|
||||||
catalog: ctx.current_catalog().to_string(),
|
catalog: ctx.current_catalog().to_string(),
|
||||||
schema: ctx.current_schema().to_string(),
|
schema: ctx.current_schema().clone(),
|
||||||
timezone: ctx.timezone().to_string(),
|
timezone: ctx.timezone().to_string(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -128,7 +128,7 @@ pub fn procedure_details_to_pb_response(metas: Vec<ProcedureInfo>) -> PbProcedur
|
|||||||
let (status, error) = procedure_state_to_pb_state(&meta.state);
|
let (status, error) = procedure_state_to_pb_state(&meta.state);
|
||||||
PbProcedureMeta {
|
PbProcedureMeta {
|
||||||
id: Some(pid_to_pb_pid(meta.id)),
|
id: Some(pid_to_pb_pid(meta.id)),
|
||||||
type_name: meta.type_name.to_string(),
|
type_name: meta.type_name.clone(),
|
||||||
status: status.into(),
|
status: status.into(),
|
||||||
start_time_ms: meta.start_time_ms,
|
start_time_ms: meta.start_time_ms,
|
||||||
end_time_ms: meta.end_time_ms,
|
end_time_ms: meta.end_time_ms,
|
||||||
|
|||||||
@@ -121,7 +121,7 @@ impl StateStore for KvStateStore {
|
|||||||
self.kv_backend
|
self.kv_backend
|
||||||
.put(
|
.put(
|
||||||
PutRequest::new()
|
PutRequest::new()
|
||||||
.with_key(key.to_string().into_bytes())
|
.with_key(key.clone().into_bytes())
|
||||||
.with_value(value),
|
.with_value(value),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -145,7 +145,7 @@ impl StateStore for KvStateStore {
|
|||||||
let key = if idx > 0 {
|
let key = if idx > 0 {
|
||||||
KeySet::with_segment_suffix(&key, idx)
|
KeySet::with_segment_suffix(&key, idx)
|
||||||
} else {
|
} else {
|
||||||
key.to_string()
|
key.clone()
|
||||||
};
|
};
|
||||||
let kv_backend = self.kv_backend.clone();
|
let kv_backend = self.kv_backend.clone();
|
||||||
async move {
|
async move {
|
||||||
|
|||||||
@@ -169,7 +169,7 @@ impl ActiveBucket {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
datanode_stats.insert(
|
datanode_stats.insert(
|
||||||
stat.topic.to_string(),
|
stat.topic.clone(),
|
||||||
PartialTopicStat {
|
PartialTopicStat {
|
||||||
latest_entry_id: stat.latest_entry_id,
|
latest_entry_id: stat.latest_entry_id,
|
||||||
record_size: stat.record_size,
|
record_size: stat.record_size,
|
||||||
@@ -205,7 +205,7 @@ impl ActiveBucket {
|
|||||||
let record_num = stats.iter().map(|stat| stat.record_num).sum::<u64>();
|
let record_num = stats.iter().map(|stat| stat.record_num).sum::<u64>();
|
||||||
|
|
||||||
output.insert(
|
output.insert(
|
||||||
topic.to_string(),
|
topic.clone(),
|
||||||
HistoryTopicStat {
|
HistoryTopicStat {
|
||||||
latest_entry_id,
|
latest_entry_id,
|
||||||
record_size,
|
record_size,
|
||||||
|
|||||||
@@ -104,8 +104,8 @@ impl KafkaTopicCreator {
|
|||||||
let end_offset = partition_client
|
let end_offset = partition_client
|
||||||
.get_offset(OffsetAt::Latest)
|
.get_offset(OffsetAt::Latest)
|
||||||
.await
|
.await
|
||||||
.context(KafkaGetOffsetSnafu {
|
.with_context(|_| KafkaGetOffsetSnafu {
|
||||||
topic: topic.to_string(),
|
topic: topic.clone(),
|
||||||
partition: DEFAULT_PARTITION,
|
partition: DEFAULT_PARTITION,
|
||||||
})?;
|
})?;
|
||||||
if end_offset > 0 {
|
if end_offset > 0 {
|
||||||
@@ -284,9 +284,11 @@ mod tests {
|
|||||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||||
|
|
||||||
let topic = format!("{}{}", prefix, "0");
|
let topic = format!("{}{}", prefix, "0");
|
||||||
|
let topics = std::slice::from_ref(&topic);
|
||||||
|
|
||||||
// Clean up the topics before test
|
// Clean up the topics before test
|
||||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
creator.delete_topics(topics).await.unwrap();
|
||||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
creator.create_topics(topics).await.unwrap();
|
||||||
|
|
||||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||||
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
|
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
|
||||||
@@ -309,10 +311,12 @@ mod tests {
|
|||||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||||
|
|
||||||
let topic = format!("{}{}", prefix, "0");
|
let topic = format!("{}{}", prefix, "0");
|
||||||
// Clean up the topics before test
|
let topics = std::slice::from_ref(&topic);
|
||||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
|
||||||
|
|
||||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
// Clean up the topics before test
|
||||||
|
creator.delete_topics(topics).await.unwrap();
|
||||||
|
|
||||||
|
creator.create_topics(topics).await.unwrap();
|
||||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||||
append_records(&partition_client, 2).await.unwrap();
|
append_records(&partition_client, 2).await.unwrap();
|
||||||
|
|
||||||
@@ -336,12 +340,14 @@ mod tests {
|
|||||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||||
|
|
||||||
let topic = format!("{}{}", prefix, "0");
|
let topic = format!("{}{}", prefix, "0");
|
||||||
// Clean up the topics before test
|
let topics = std::slice::from_ref(&topic);
|
||||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
|
||||||
|
|
||||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
// Clean up the topics before test
|
||||||
|
creator.delete_topics(topics).await.unwrap();
|
||||||
|
|
||||||
|
creator.create_topics(topics).await.unwrap();
|
||||||
// Should be ok
|
// Should be ok
|
||||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
creator.create_topics(topics).await.unwrap();
|
||||||
|
|
||||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||||
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
|
let end_offset = partition_client.get_offset(OffsetAt::Latest).await.unwrap();
|
||||||
@@ -356,10 +362,12 @@ mod tests {
|
|||||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||||
|
|
||||||
let topic = format!("{}{}", prefix, "0");
|
let topic = format!("{}{}", prefix, "0");
|
||||||
// Clean up the topics before test
|
let topics = std::slice::from_ref(&topic);
|
||||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
|
||||||
|
|
||||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
// Clean up the topics before test
|
||||||
|
creator.delete_topics(topics).await.unwrap();
|
||||||
|
|
||||||
|
creator.create_topics(topics).await.unwrap();
|
||||||
creator.prepare_topic(&topic).await.unwrap();
|
creator.prepare_topic(&topic).await.unwrap();
|
||||||
|
|
||||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||||
@@ -382,10 +390,12 @@ mod tests {
|
|||||||
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
let creator = test_topic_creator(get_kafka_endpoints()).await;
|
||||||
|
|
||||||
let topic = format!("{}{}", prefix, "0");
|
let topic = format!("{}{}", prefix, "0");
|
||||||
// Clean up the topics before test
|
let topics = std::slice::from_ref(&topic);
|
||||||
creator.delete_topics(&[topic.to_string()]).await.unwrap();
|
|
||||||
|
|
||||||
creator.create_topics(&[topic.to_string()]).await.unwrap();
|
// Clean up the topics before test
|
||||||
|
creator.delete_topics(topics).await.unwrap();
|
||||||
|
|
||||||
|
creator.create_topics(topics).await.unwrap();
|
||||||
let partition_client = creator.partition_client(&topic).await.unwrap();
|
let partition_client = creator.partition_client(&topic).await.unwrap();
|
||||||
append_records(&partition_client, 10).await.unwrap();
|
append_records(&partition_client, 10).await.unwrap();
|
||||||
|
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ impl KafkaTopicManager {
|
|||||||
let mut topics_to_create = Vec::with_capacity(all_topics.len());
|
let mut topics_to_create = Vec::with_capacity(all_topics.len());
|
||||||
for topic in all_topics {
|
for topic in all_topics {
|
||||||
if !existing_topic_set.contains(topic) {
|
if !existing_topic_set.contains(topic) {
|
||||||
topics_to_create.push(topic.to_string());
|
topics_to_create.push(topic.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(topics_to_create)
|
Ok(topics_to_create)
|
||||||
|
|||||||
@@ -195,18 +195,18 @@ pub async fn acquire_dynamic_key_lock(
|
|||||||
) -> DynamicKeyLockGuard {
|
) -> DynamicKeyLockGuard {
|
||||||
match key {
|
match key {
|
||||||
StringKey::Share(key) => {
|
StringKey::Share(key) => {
|
||||||
let guard = lock.read(key.to_string()).await;
|
let guard = lock.read(key.clone()).await;
|
||||||
DynamicKeyLockGuard {
|
DynamicKeyLockGuard {
|
||||||
guard: Some(OwnedKeyRwLockGuard::from(guard)),
|
guard: Some(OwnedKeyRwLockGuard::from(guard)),
|
||||||
key: key.to_string(),
|
key: key.clone(),
|
||||||
lock: lock.clone(),
|
lock: lock.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
StringKey::Exclusive(key) => {
|
StringKey::Exclusive(key) => {
|
||||||
let guard = lock.write(key.to_string()).await;
|
let guard = lock.write(key.clone()).await;
|
||||||
DynamicKeyLockGuard {
|
DynamicKeyLockGuard {
|
||||||
guard: Some(OwnedKeyRwLockGuard::from(guard)),
|
guard: Some(OwnedKeyRwLockGuard::from(guard)),
|
||||||
key: key.to_string(),
|
key: key.clone(),
|
||||||
lock: lock.clone(),
|
lock: lock.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -227,7 +227,7 @@ impl Drop for DynamicKeyLockGuard {
|
|||||||
if let Some(guard) = self.guard.take() {
|
if let Some(guard) = self.guard.take() {
|
||||||
drop(guard);
|
drop(guard);
|
||||||
}
|
}
|
||||||
self.lock.clean_keys(&[self.key.to_string()]);
|
self.lock.clean_keys(std::slice::from_ref(&self.key));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1616,7 +1616,7 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// If the procedure is poisoned, the poison key shouldn't be deleted.
|
// If the procedure is poisoned, the poison key shouldn't be deleted.
|
||||||
assert_eq!(&procedure_id.to_string(), ROOT_ID);
|
assert_eq!(&procedure_id.clone(), ROOT_ID);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ macro_rules! proc_path {
|
|||||||
($store: expr, $fmt:expr, $($args:tt)*) => { format!("{}{}", $store.proc_path(), format_args!($fmt, $($args)*)) };
|
($store: expr, $fmt:expr, $($args:tt)*) => { format!("{}{}", $store.proc_path(), format_args!($fmt, $($args)*)) };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
pub(crate) use proc_path;
|
pub(crate) use proc_path;
|
||||||
|
|
||||||
/// Serialized data of a procedure.
|
/// Serialized data of a procedure.
|
||||||
@@ -579,13 +580,7 @@ mod tests {
|
|||||||
let type_name = procedure.type_name().to_string();
|
let type_name = procedure.type_name().to_string();
|
||||||
let data = procedure.dump().unwrap();
|
let data = procedure.dump().unwrap();
|
||||||
store
|
store
|
||||||
.store_procedure(
|
.store_procedure(procedure_id, 0, type_name.clone(), data.clone(), None)
|
||||||
procedure_id,
|
|
||||||
0,
|
|
||||||
type_name.to_string(),
|
|
||||||
data.to_string(),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let message = ProcedureMessage {
|
let message = ProcedureMessage {
|
||||||
|
|||||||
@@ -68,7 +68,7 @@ impl KeySet {
|
|||||||
|
|
||||||
pub fn keys(&self) -> Vec<String> {
|
pub fn keys(&self) -> Vec<String> {
|
||||||
let mut keys = Vec::with_capacity(self.segments + 1);
|
let mut keys = Vec::with_capacity(self.segments + 1);
|
||||||
keys.push(self.key.to_string());
|
keys.push(self.key.clone());
|
||||||
for i in 1..=self.segments {
|
for i in 1..=self.segments {
|
||||||
keys.push(Self::with_segment_suffix(&self.key, i))
|
keys.push(Self::with_segment_suffix(&self.key, i))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ impl PoisonStore for InMemoryPoisonStore {
|
|||||||
let mut map = self.map.write().unwrap();
|
let mut map = self.map.write().unwrap();
|
||||||
match map.entry(key) {
|
match map.entry(key) {
|
||||||
Entry::Vacant(v) => {
|
Entry::Vacant(v) => {
|
||||||
v.insert(token.to_string());
|
v.insert(token.clone());
|
||||||
}
|
}
|
||||||
Entry::Occupied(o) => {
|
Entry::Occupied(o) => {
|
||||||
let value = o.get();
|
let value = o.get();
|
||||||
|
|||||||
@@ -145,7 +145,7 @@ impl From<&AddColumnLocation> for Location {
|
|||||||
},
|
},
|
||||||
AddColumnLocation::After { column_name } => Location {
|
AddColumnLocation::After { column_name } => Location {
|
||||||
location_type: LocationType::After.into(),
|
location_type: LocationType::After.into(),
|
||||||
after_column_name: column_name.to_string(),
|
after_column_name: column_name.clone(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -137,7 +137,7 @@ pub fn map_json_type_to_string_schema(schema: SchemaRef) -> (SchemaRef, bool) {
|
|||||||
for column in schema.column_schemas() {
|
for column in schema.column_schemas() {
|
||||||
if matches!(column.data_type, ConcreteDataType::Json(_)) {
|
if matches!(column.data_type, ConcreteDataType::Json(_)) {
|
||||||
new_columns.push(ColumnSchema::new(
|
new_columns.push(ColumnSchema::new(
|
||||||
column.name.to_string(),
|
column.name.clone(),
|
||||||
ConcreteDataType::string_datatype(),
|
ConcreteDataType::string_datatype(),
|
||||||
column.is_nullable(),
|
column.is_nullable(),
|
||||||
));
|
));
|
||||||
|
|||||||
@@ -12,8 +12,6 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
#![feature(let_chains)]
|
|
||||||
|
|
||||||
mod df_substrait;
|
mod df_substrait;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod extension_serializer;
|
pub mod extension_serializer;
|
||||||
|
|||||||
@@ -12,7 +12,6 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
#![feature(let_chains)]
|
|
||||||
#![feature(duration_constructors)]
|
#![feature(duration_constructors)]
|
||||||
|
|
||||||
pub mod logging;
|
pub mod logging;
|
||||||
|
|||||||
@@ -454,7 +454,7 @@ fn build_otlp_exporter(opts: &LoggingOptions) -> SpanExporter {
|
|||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|e| {
|
.map(|e| {
|
||||||
if e.starts_with("http") {
|
if e.starts_with("http") {
|
||||||
e.to_string()
|
e.clone()
|
||||||
} else {
|
} else {
|
||||||
format!("http://{}", e)
|
format!("http://{}", e)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -308,12 +308,12 @@ impl Timestamp {
|
|||||||
fn from_splits(sec: i64, nsec: u32) -> Option<Self> {
|
fn from_splits(sec: i64, nsec: u32) -> Option<Self> {
|
||||||
if nsec == 0 {
|
if nsec == 0 {
|
||||||
Some(Timestamp::new_second(sec))
|
Some(Timestamp::new_second(sec))
|
||||||
} else if nsec % 1_000_000 == 0 {
|
} else if nsec.is_multiple_of(1_000_000) {
|
||||||
let millis = nsec / 1_000_000;
|
let millis = nsec / 1_000_000;
|
||||||
sec.checked_mul(1000)
|
sec.checked_mul(1000)
|
||||||
.and_then(|v| v.checked_add(millis as i64))
|
.and_then(|v| v.checked_add(millis as i64))
|
||||||
.map(Timestamp::new_millisecond)
|
.map(Timestamp::new_millisecond)
|
||||||
} else if nsec % 1000 == 0 {
|
} else if nsec.is_multiple_of(1_000) {
|
||||||
let micros = nsec / 1000;
|
let micros = nsec / 1000;
|
||||||
sec.checked_mul(1_000_000)
|
sec.checked_mul(1_000_000)
|
||||||
.and_then(|v| v.checked_add(micros as i64))
|
.and_then(|v| v.checked_add(micros as i64))
|
||||||
|
|||||||
@@ -13,7 +13,6 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
#![feature(assert_matches)]
|
#![feature(assert_matches)]
|
||||||
#![feature(duration_constructors_lite)]
|
|
||||||
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,6 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
#![feature(assert_matches)]
|
#![feature(assert_matches)]
|
||||||
#![feature(let_chains)]
|
|
||||||
|
|
||||||
pub mod alive_keeper;
|
pub mod alive_keeper;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
|
|||||||
@@ -706,7 +706,7 @@ impl RegionServerParallelism {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn acquire(&self) -> Result<SemaphorePermit> {
|
pub async fn acquire(&self) -> Result<SemaphorePermit<'_>> {
|
||||||
timeout(self.timeout, self.semaphore.acquire())
|
timeout(self.timeout, self.semaphore.acquire())
|
||||||
.await
|
.await
|
||||||
.context(ConcurrentQueryLimiterTimeoutSnafu)?
|
.context(ConcurrentQueryLimiterTimeoutSnafu)?
|
||||||
@@ -919,8 +919,7 @@ impl RegionServerInner {
|
|||||||
if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
|
if let Some(requests) = engine_grouped_requests.get_mut(&request.engine) {
|
||||||
requests.push((region_id, request));
|
requests.push((region_id, request));
|
||||||
} else {
|
} else {
|
||||||
engine_grouped_requests
|
engine_grouped_requests.insert(request.engine.clone(), vec![(region_id, request)]);
|
||||||
.insert(request.engine.to_string(), vec![(region_id, request)]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -12,7 +12,6 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
#![feature(let_chains)]
|
|
||||||
#![feature(assert_matches)]
|
#![feature(assert_matches)]
|
||||||
|
|
||||||
pub mod arrow_array;
|
pub mod arrow_array;
|
||||||
|
|||||||
@@ -329,7 +329,7 @@ impl TryFrom<Arc<ArrowSchema>> for Schema {
|
|||||||
let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
|
let mut name_to_index = HashMap::with_capacity(arrow_schema.fields.len());
|
||||||
for field in &arrow_schema.fields {
|
for field in &arrow_schema.fields {
|
||||||
let column_schema = ColumnSchema::try_from(field.as_ref())?;
|
let column_schema = ColumnSchema::try_from(field.as_ref())?;
|
||||||
let _ = name_to_index.insert(field.name().to_string(), column_schemas.len());
|
let _ = name_to_index.insert(field.name().clone(), column_schemas.len());
|
||||||
column_schemas.push(column_schema);
|
column_schemas.push(column_schema);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -28,17 +28,14 @@ use crate::vectors::{BinaryVectorBuilder, MutableVector};
|
|||||||
|
|
||||||
pub const JSON_TYPE_NAME: &str = "Json";
|
pub const JSON_TYPE_NAME: &str = "Json";
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
|
#[derive(
|
||||||
|
Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, Default,
|
||||||
|
)]
|
||||||
pub enum JsonFormat {
|
pub enum JsonFormat {
|
||||||
|
#[default]
|
||||||
Jsonb,
|
Jsonb,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for JsonFormat {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::Jsonb
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// JsonType is a data type for JSON data. It is stored as binary data of jsonb format.
|
/// JsonType is a data type for JSON data. It is stored as binary data of jsonb format.
|
||||||
/// It utilizes current binary value and vector implementation.
|
/// It utilizes current binary value and vector implementation.
|
||||||
#[derive(
|
#[derive(
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ impl TryFrom<&Fields> for StructType {
|
|||||||
.iter()
|
.iter()
|
||||||
.map(|field| {
|
.map(|field| {
|
||||||
Ok(StructField::new(
|
Ok(StructField::new(
|
||||||
field.name().to_string(),
|
field.name().clone(),
|
||||||
ConcreteDataType::try_from(field.data_type())?,
|
ConcreteDataType::try_from(field.data_type())?,
|
||||||
field.is_nullable(),
|
field.is_nullable(),
|
||||||
))
|
))
|
||||||
|
|||||||
@@ -227,7 +227,7 @@ impl Value {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Cast itself to [ValueRef].
|
/// Cast itself to [ValueRef].
|
||||||
pub fn as_value_ref(&self) -> ValueRef {
|
pub fn as_value_ref(&self) -> ValueRef<'_> {
|
||||||
match self {
|
match self {
|
||||||
Value::Null => ValueRef::Null,
|
Value::Null => ValueRef::Null,
|
||||||
Value::Boolean(v) => ValueRef::Boolean(*v),
|
Value::Boolean(v) => ValueRef::Boolean(*v),
|
||||||
@@ -1351,11 +1351,11 @@ impl<'a> ValueRef<'a> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Cast itself to [ListValueRef].
|
/// Cast itself to [ListValueRef].
|
||||||
pub fn as_list(&self) -> Result<Option<ListValueRef>> {
|
pub fn as_list(&self) -> Result<Option<ListValueRef<'_>>> {
|
||||||
impl_as_for_value_ref!(self, List)
|
impl_as_for_value_ref!(self, List)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn as_struct(&self) -> Result<Option<StructValueRef>> {
|
pub fn as_struct(&self) -> Result<Option<StructValueRef<'_>>> {
|
||||||
impl_as_for_value_ref!(self, Struct)
|
impl_as_for_value_ref!(self, Struct)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -166,7 +166,7 @@ pub trait Vector: Send + Sync + Serializable + Debug + VectorOp {
|
|||||||
///
|
///
|
||||||
/// # Panics
|
/// # Panics
|
||||||
/// Panic if `index` is out of bound.
|
/// Panic if `index` is out of bound.
|
||||||
fn get_ref(&self, index: usize) -> ValueRef;
|
fn get_ref(&self, index: usize) -> ValueRef<'_>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type VectorRef = Arc<dyn Vector>;
|
pub type VectorRef = Arc<dyn Vector>;
|
||||||
|
|||||||
@@ -180,7 +180,7 @@ impl Vector for BinaryVector {
|
|||||||
vectors::impl_get_for_vector!(self.array, index)
|
vectors::impl_get_for_vector!(self.array, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_ref(&self, index: usize) -> ValueRef {
|
fn get_ref(&self, index: usize) -> ValueRef<'_> {
|
||||||
vectors::impl_get_ref_for_vector!(self.array, index)
|
vectors::impl_get_ref_for_vector!(self.array, index)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -126,7 +126,7 @@ impl Vector for BooleanVector {
|
|||||||
vectors::impl_get_for_vector!(self.array, index)
|
vectors::impl_get_for_vector!(self.array, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_ref(&self, index: usize) -> ValueRef {
|
fn get_ref(&self, index: usize) -> ValueRef<'_> {
|
||||||
vectors::impl_get_ref_for_vector!(self.array, index)
|
vectors::impl_get_ref_for_vector!(self.array, index)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ impl ConstantVector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the constant value.
|
/// Returns the constant value.
|
||||||
pub fn get_constant_ref(&self) -> ValueRef {
|
pub fn get_constant_ref(&self) -> ValueRef<'_> {
|
||||||
self.vector.get_ref(0)
|
self.vector.get_ref(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -179,7 +179,7 @@ impl Vector for ConstantVector {
|
|||||||
self.vector.get(0)
|
self.vector.get(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_ref(&self, _index: usize) -> ValueRef {
|
fn get_ref(&self, _index: usize) -> ValueRef<'_> {
|
||||||
self.vector.get_ref(0)
|
self.vector.get_ref(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -207,7 +207,7 @@ impl Vector for Decimal128Vector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_ref(&self, index: usize) -> ValueRef {
|
fn get_ref(&self, index: usize) -> ValueRef<'_> {
|
||||||
if let Some(decimal) = self.get_decimal128_value_from_array(index) {
|
if let Some(decimal) = self.get_decimal128_value_from_array(index) {
|
||||||
ValueRef::Decimal128(decimal)
|
ValueRef::Decimal128(decimal)
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -154,7 +154,7 @@ impl<K: ArrowDictionaryKeyType> Vector for DictionaryVector<K> {
|
|||||||
self.item_vector.get(key.as_usize())
|
self.item_vector.get(key.as_usize())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_ref(&self, index: usize) -> ValueRef {
|
fn get_ref(&self, index: usize) -> ValueRef<'_> {
|
||||||
if !self.array.is_valid(index) {
|
if !self.array.is_valid(index) {
|
||||||
return ValueRef::Null;
|
return ValueRef::Null;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -121,7 +121,7 @@ impl Vector for ListVector {
|
|||||||
Value::List(ListValue::new(values, self.item_type.clone()))
|
Value::List(ListValue::new(values, self.item_type.clone()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_ref(&self, index: usize) -> ValueRef {
|
fn get_ref(&self, index: usize) -> ValueRef<'_> {
|
||||||
ValueRef::List(ListValueRef::Indexed {
|
ValueRef::List(ListValueRef::Indexed {
|
||||||
vector: self,
|
vector: self,
|
||||||
idx: index,
|
idx: index,
|
||||||
|
|||||||
@@ -107,7 +107,7 @@ impl Vector for NullVector {
|
|||||||
Value::Null
|
Value::Null
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_ref(&self, _index: usize) -> ValueRef {
|
fn get_ref(&self, _index: usize) -> ValueRef<'_> {
|
||||||
// Skips bound check for null array.
|
// Skips bound check for null array.
|
||||||
ValueRef::Null
|
ValueRef::Null
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -172,7 +172,7 @@ impl<T: LogicalPrimitiveType> Vector for PrimitiveVector<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_ref(&self, index: usize) -> ValueRef {
|
fn get_ref(&self, index: usize) -> ValueRef<'_> {
|
||||||
if self.array.is_valid(index) {
|
if self.array.is_valid(index) {
|
||||||
// Safety: The index have been checked by `is_valid()`.
|
// Safety: The index have been checked by `is_valid()`.
|
||||||
let wrapper = unsafe { T::Wrapper::from_native(self.array.value_unchecked(index)) };
|
let wrapper = unsafe { T::Wrapper::from_native(self.array.value_unchecked(index)) };
|
||||||
|
|||||||
@@ -141,7 +141,7 @@ impl Vector for StringVector {
|
|||||||
vectors::impl_get_for_vector!(self.array, index)
|
vectors::impl_get_for_vector!(self.array, index)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_ref(&self, index: usize) -> ValueRef {
|
fn get_ref(&self, index: usize) -> ValueRef<'_> {
|
||||||
vectors::impl_get_ref_for_vector!(self.array, index)
|
vectors::impl_get_ref_for_vector!(self.array, index)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -134,7 +134,7 @@ impl Vector for StructVector {
|
|||||||
Value::Struct(StructValue::try_new(values, self.fields.clone()).unwrap())
|
Value::Struct(StructValue::try_new(values, self.fields.clone()).unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_ref(&self, index: usize) -> ValueRef {
|
fn get_ref(&self, index: usize) -> ValueRef<'_> {
|
||||||
ValueRef::Struct(StructValueRef::Indexed {
|
ValueRef::Struct(StructValueRef::Indexed {
|
||||||
vector: self,
|
vector: self,
|
||||||
idx: index,
|
idx: index,
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ fn build_record_batch_stream(
|
|||||||
let files = scan_plan_config
|
let files = scan_plan_config
|
||||||
.files
|
.files
|
||||||
.iter()
|
.iter()
|
||||||
.map(|filename| PartitionedFile::new(filename.to_string(), 0))
|
.map(|filename| PartitionedFile::new(filename.clone(), 0))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let config = FileScanConfigBuilder::new(
|
let config = FileScanConfigBuilder::new(
|
||||||
@@ -122,7 +122,7 @@ fn new_parquet_stream_with_exec_plan(config: &ScanPlanConfig) -> Result<Sendable
|
|||||||
let file_group = FileGroup::new(
|
let file_group = FileGroup::new(
|
||||||
files
|
files
|
||||||
.iter()
|
.iter()
|
||||||
.map(|filename| PartitionedFile::new(filename.to_string(), 0))
|
.map(|filename| PartitionedFile::new(filename.clone(), 0))
|
||||||
.collect::<Vec<_>>(),
|
.collect::<Vec<_>>(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -335,7 +335,7 @@ impl FlowDualEngine {
|
|||||||
.or_else(|| {
|
.or_else(|| {
|
||||||
Some(
|
Some(
|
||||||
QueryContextBuilder::default()
|
QueryContextBuilder::default()
|
||||||
.current_catalog(info.catalog_name().to_string())
|
.current_catalog(info.catalog_name().clone())
|
||||||
.build(),
|
.build(),
|
||||||
)
|
)
|
||||||
}),
|
}),
|
||||||
|
|||||||
@@ -947,7 +947,7 @@ fn build_pk_from_aggr(plan: &LogicalPlan) -> Result<Option<TableDef>, Error> {
|
|||||||
|
|
||||||
let all_pk_cols: Vec<_> = all_pk_cols
|
let all_pk_cols: Vec<_> = all_pk_cols
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter(|col| first_time_stamp != Some(col.to_string()))
|
.filter(|col| first_time_stamp.as_ref() != Some(col))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
Ok(Some(TableDef {
|
Ok(Some(TableDef {
|
||||||
|
|||||||
@@ -928,13 +928,13 @@ mod test {
|
|||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let (col_name, lower, upper) = real;
|
let (col_name, lower, upper) = real;
|
||||||
let new_sql = if lower.is_some() {
|
let new_sql = if let Some(lower) = lower {
|
||||||
let to_df_literal = |value| {
|
let to_df_literal = |value| {
|
||||||
let value = Value::from(value);
|
let value = Value::from(value);
|
||||||
|
|
||||||
value.try_to_scalar_value(&value.data_type()).unwrap()
|
value.try_to_scalar_value(&value.data_type()).unwrap()
|
||||||
};
|
};
|
||||||
let lower = to_df_literal(lower.unwrap());
|
let lower = to_df_literal(lower);
|
||||||
let upper = to_df_literal(upper.unwrap());
|
let upper = to_df_literal(upper.unwrap());
|
||||||
let expr = col(&col_name)
|
let expr = col(&col_name)
|
||||||
.gt_eq(lit(lower))
|
.gt_eq(lit(lower))
|
||||||
|
|||||||
@@ -16,7 +16,6 @@
|
|||||||
//! It can transform substrait plan into it's own plan and execute it.
|
//! It can transform substrait plan into it's own plan and execute it.
|
||||||
//! It also contains definition of expression, adapter and plan, and internal state management.
|
//! It also contains definition of expression, adapter and plan, and internal state management.
|
||||||
|
|
||||||
#![feature(let_chains)]
|
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
#![warn(clippy::missing_docs_in_private_items)]
|
#![warn(clippy::missing_docs_in_private_items)]
|
||||||
#![warn(clippy::too_many_lines)]
|
#![warn(clippy::too_many_lines)]
|
||||||
|
|||||||
@@ -208,7 +208,6 @@ impl KeyValPlan {
|
|||||||
) -> Result<KeyValPlan, Error> {
|
) -> Result<KeyValPlan, Error> {
|
||||||
let group_expr_val = group_exprs
|
let group_expr_val = group_exprs
|
||||||
.iter()
|
.iter()
|
||||||
.cloned()
|
|
||||||
.map(|expr| expr.expr.clone())
|
.map(|expr| expr.expr.clone())
|
||||||
.collect_vec();
|
.collect_vec();
|
||||||
let output_arity = group_expr_val.len();
|
let output_arity = group_expr_val.len();
|
||||||
|
|||||||
@@ -158,7 +158,7 @@ impl TypedPlan {
|
|||||||
let table_reference = match nt.names.len() {
|
let table_reference = match nt.names.len() {
|
||||||
1 => [
|
1 => [
|
||||||
query_ctx.current_catalog().to_string(),
|
query_ctx.current_catalog().to_string(),
|
||||||
query_ctx.current_schema().to_string(),
|
query_ctx.current_schema().clone(),
|
||||||
nt.names[0].clone(),
|
nt.names[0].clone(),
|
||||||
],
|
],
|
||||||
2 => [
|
2 => [
|
||||||
|
|||||||
@@ -26,8 +26,6 @@ use common_meta::instruction::{CacheIdent, Instruction};
|
|||||||
use common_meta::key::MetadataKey;
|
use common_meta::key::MetadataKey;
|
||||||
use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
|
use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
|
||||||
use common_meta::key::table_info::TableInfoKey;
|
use common_meta::key::table_info::TableInfoKey;
|
||||||
use partition::manager::TableRouteCacheInvalidator;
|
|
||||||
use table::metadata::TableId;
|
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
@@ -42,17 +40,6 @@ impl KvCacheInvalidator for MockKvCacheInvalidator {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct MockTableRouteCacheInvalidator {
|
|
||||||
inner: Mutex<HashMap<TableId, i32>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl TableRouteCacheInvalidator for MockTableRouteCacheInvalidator {
|
|
||||||
async fn invalidate_table_route(&self, table_id: TableId) {
|
|
||||||
let _ = self.inner.lock().unwrap().remove(&table_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta {
|
pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta {
|
||||||
MessageMeta {
|
MessageMeta {
|
||||||
id,
|
id,
|
||||||
|
|||||||
@@ -375,7 +375,7 @@ impl Instance {
|
|||||||
.entry(db_string)
|
.entry(db_string)
|
||||||
.or_default();
|
.or_default();
|
||||||
names.iter().for_each(|name| {
|
names.iter().for_each(|name| {
|
||||||
cache.insert(name.to_string(), false);
|
cache.insert((*name).clone(), false);
|
||||||
});
|
});
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
@@ -411,13 +411,13 @@ impl Instance {
|
|||||||
ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
|
ensure!(!(has_prom && has_legacy), OtlpMetricModeIncompatibleSnafu);
|
||||||
let flag = has_legacy;
|
let flag = has_legacy;
|
||||||
names.iter().for_each(|name| {
|
names.iter().for_each(|name| {
|
||||||
cache.insert(name.to_string(), flag);
|
cache.insert((*name).clone(), flag);
|
||||||
});
|
});
|
||||||
Ok(flag)
|
Ok(flag)
|
||||||
} else {
|
} else {
|
||||||
// no table info, use new mode
|
// no table info, use new mode
|
||||||
names.iter().for_each(|name| {
|
names.iter().for_each(|name| {
|
||||||
cache.insert(name.to_string(), false);
|
cache.insert((*name).clone(), false);
|
||||||
});
|
});
|
||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
@@ -448,7 +448,7 @@ fn fast_legacy_check(
|
|||||||
// set cache for all names
|
// set cache for all names
|
||||||
names.iter().for_each(|name| {
|
names.iter().for_each(|name| {
|
||||||
if !cache.contains_key(*name) {
|
if !cache.contains_key(*name) {
|
||||||
cache.insert(name.to_string(), flag);
|
cache.insert((*name).clone(), flag);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Ok(Some(flag))
|
Ok(Some(flag))
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ impl OpenTelemetryProtocolHandler for Instance {
|
|||||||
.extension(PHYSICAL_TABLE_PARAM)
|
.extension(PHYSICAL_TABLE_PARAM)
|
||||||
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
|
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
|
||||||
.to_string();
|
.to_string();
|
||||||
self.handle_metric_row_inserts(requests, ctx, physical_table.to_string())
|
self.handle_metric_row_inserts(requests, ctx, physical_table.clone())
|
||||||
.await
|
.await
|
||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.context(error::ExecuteGrpcQuerySnafu)
|
.context(error::ExecuteGrpcQuerySnafu)
|
||||||
|
|||||||
@@ -192,7 +192,7 @@ impl PromStoreProtocolHandler for Instance {
|
|||||||
.extension(PHYSICAL_TABLE_PARAM)
|
.extension(PHYSICAL_TABLE_PARAM)
|
||||||
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
|
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
|
||||||
.to_string();
|
.to_string();
|
||||||
self.handle_metric_row_inserts(request, ctx.clone(), physical_table.to_string())
|
self.handle_metric_row_inserts(request, ctx.clone(), physical_table.clone())
|
||||||
.await
|
.await
|
||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.context(error::ExecuteGrpcQuerySnafu)?
|
.context(error::ExecuteGrpcQuerySnafu)?
|
||||||
|
|||||||
@@ -137,7 +137,10 @@ impl BloomFilterCreator {
|
|||||||
self.global_memory_usage
|
self.global_memory_usage
|
||||||
.fetch_add(mem_diff, Ordering::Relaxed);
|
.fetch_add(mem_diff, Ordering::Relaxed);
|
||||||
|
|
||||||
if self.accumulated_row_count % self.rows_per_segment == 0 {
|
if self
|
||||||
|
.accumulated_row_count
|
||||||
|
.is_multiple_of(self.rows_per_segment)
|
||||||
|
{
|
||||||
self.finalize_segment().await?;
|
self.finalize_segment().await?;
|
||||||
self.finalized_row_count = self.accumulated_row_count;
|
self.finalized_row_count = self.accumulated_row_count;
|
||||||
}
|
}
|
||||||
@@ -163,7 +166,10 @@ impl BloomFilterCreator {
|
|||||||
self.global_memory_usage
|
self.global_memory_usage
|
||||||
.fetch_add(mem_diff, Ordering::Relaxed);
|
.fetch_add(mem_diff, Ordering::Relaxed);
|
||||||
|
|
||||||
if self.accumulated_row_count % self.rows_per_segment == 0 {
|
if self
|
||||||
|
.accumulated_row_count
|
||||||
|
.is_multiple_of(self.rows_per_segment)
|
||||||
|
{
|
||||||
self.finalize_segment().await?;
|
self.finalize_segment().await?;
|
||||||
self.finalized_row_count = self.accumulated_row_count;
|
self.finalized_row_count = self.accumulated_row_count;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ pub struct ApplyOutput {
|
|||||||
pub trait IndexApplier: Send + Sync {
|
pub trait IndexApplier: Send + Sync {
|
||||||
/// Applies the predefined predicates to the data read by the given index reader, returning
|
/// Applies the predefined predicates to the data read by the given index reader, returning
|
||||||
/// a list of relevant indices (e.g., post IDs, group IDs, row IDs).
|
/// a list of relevant indices (e.g., post IDs, group IDs, row IDs).
|
||||||
|
#[allow(unused_parens)]
|
||||||
async fn apply<'a>(
|
async fn apply<'a>(
|
||||||
&self,
|
&self,
|
||||||
context: SearchContext,
|
context: SearchContext,
|
||||||
|
|||||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user