mirror of
https://github.com/kharonsec/garage.git
synced 2026-04-25 20:44:55 +02:00
Merge pull request 'code maintenance with help clippy' (#1314) from gwenlg/garage:code_maintenance_part2 into main-v2
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/1314 Reviewed-by: Alex <lx@deuxfleurs.fr>
This commit is contained in:
12
Cargo.toml
12
Cargo.toml
@@ -179,3 +179,15 @@ lto = "thin"
|
||||
codegen-units = 16
|
||||
opt-level = 3
|
||||
strip = "debuginfo"
|
||||
|
||||
[workspace.lints.clippy]
|
||||
# pedantic lints configuration
|
||||
doc_markdown = "warn"
|
||||
format_collect = "warn"
|
||||
manual_midpoint = "warn"
|
||||
semicolon_if_nothing_returned = "warn"
|
||||
unnecessary_semicolon = "warn"
|
||||
unnecessary_wraps = "warn"
|
||||
|
||||
# nursery lints configuration
|
||||
# or_fun_call = "warn" # enable it to help detect non trivial code used in `_or` method
|
||||
|
||||
@@ -48,3 +48,6 @@ prometheus = { workspace = true, optional = true }
|
||||
[features]
|
||||
metrics = ["opentelemetry-prometheus", "prometheus"]
|
||||
k2v = ["garage_model/k2v"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -557,7 +557,7 @@ impl RequestHandler for AddBucketAliasRequest {
|
||||
BucketAliasEnum::Global { global_alias } => {
|
||||
helper
|
||||
.set_global_bucket_alias(bucket_id, &global_alias)
|
||||
.await?
|
||||
.await?;
|
||||
}
|
||||
BucketAliasEnum::Local {
|
||||
local_alias,
|
||||
@@ -565,7 +565,7 @@ impl RequestHandler for AddBucketAliasRequest {
|
||||
} => {
|
||||
helper
|
||||
.set_local_bucket_alias(bucket_id, &access_key_id, &local_alias)
|
||||
.await?
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -591,7 +591,7 @@ impl RequestHandler for RemoveBucketAliasRequest {
|
||||
BucketAliasEnum::Global { global_alias } => {
|
||||
helper
|
||||
.unset_global_bucket_alias(bucket_id, &global_alias)
|
||||
.await?
|
||||
.await?;
|
||||
}
|
||||
BucketAliasEnum::Local {
|
||||
local_alias,
|
||||
@@ -599,7 +599,7 @@ impl RequestHandler for RemoveBucketAliasRequest {
|
||||
} => {
|
||||
helper
|
||||
.unset_local_bucket_alias(bucket_id, &access_key_id, &local_alias)
|
||||
.await?
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -869,7 +869,7 @@ impl Modify for SecurityAddon {
|
||||
components.add_security_scheme(
|
||||
"bearerAuth",
|
||||
SecurityScheme::Http(Http::builder().scheme(HttpAuthScheme::Bearer).build()),
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ pub enum Endpoint {
|
||||
impl Endpoint {
|
||||
/// Determine which S3 endpoint a request is for using the request, and a bucket which was
|
||||
/// possibly extracted from the Host header.
|
||||
/// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
|
||||
/// Returns Self plus bucket name, if endpoint is not `Endpoint::ListBuckets`
|
||||
pub fn from_request<T>(req: &Request<T>) -> Result<Self, Error> {
|
||||
let uri = req.uri();
|
||||
let path = uri.path();
|
||||
@@ -124,7 +124,7 @@ impl Endpoint {
|
||||
]);
|
||||
|
||||
if let Some(message) = query.nonempty_message() {
|
||||
debug!("Unused query parameter: {}", message)
|
||||
debug!("Unused query parameter: {}", message);
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
|
||||
@@ -79,7 +79,7 @@ pub enum Endpoint {
|
||||
impl Endpoint {
|
||||
/// Determine which S3 endpoint a request is for using the request, and a bucket which was
|
||||
/// possibly extracted from the Host header.
|
||||
/// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
|
||||
/// Returns Self plus bucket name, if endpoint is not `Endpoint::ListBuckets`
|
||||
pub fn from_request<T>(req: &Request<T>) -> Result<Self, Error> {
|
||||
let uri = req.uri();
|
||||
let path = uri.path();
|
||||
@@ -126,7 +126,7 @@ impl Endpoint {
|
||||
]);
|
||||
|
||||
if let Some(message) = query.nonempty_message() {
|
||||
debug!("Unused query parameter: {}", message)
|
||||
debug!("Unused query parameter: {}", message);
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
|
||||
@@ -15,7 +15,7 @@ use crate::Authorization;
|
||||
impl AdminApiRequest {
|
||||
/// Determine which S3 endpoint a request is for using the request, and a bucket which was
|
||||
/// possibly extracted from the Host header.
|
||||
/// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
|
||||
/// Returns Self plus bucket name, if endpoint is not `Endpoint::ListBuckets`
|
||||
pub async fn from_request(req: Request<IncomingBody>) -> Result<Self, Error> {
|
||||
let uri = req.uri().clone();
|
||||
let path = uri.path();
|
||||
@@ -89,7 +89,7 @@ impl AdminApiRequest {
|
||||
]);
|
||||
|
||||
if let Some(message) = query.nonempty_message() {
|
||||
debug!("Unused query parameter: {}", message)
|
||||
debug!("Unused query parameter: {}", message);
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
|
||||
@@ -45,3 +45,6 @@ serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
opentelemetry.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -142,10 +142,10 @@ impl TryFrom<HelperError> for CommonError {
|
||||
}
|
||||
}
|
||||
|
||||
/// This function converts HelperErrors into CommonErrors,
|
||||
/// for variants that exist in CommonError.
|
||||
/// This is used for helper functions that might return InvalidBucketName
|
||||
/// or NoSuchBucket for instance, and we want to pass that error
|
||||
/// This function converts `HelperErrors` into `CommonErrors`,
|
||||
/// for variants that exist in `CommonError`.
|
||||
/// This is used for helper functions that might return `InvalidBucketName`
|
||||
/// or `NoSuchBucket` for instance, and we want to pass that error
|
||||
/// up to our caller.
|
||||
pub fn pass_helper_error(err: HelperError) -> CommonError {
|
||||
match CommonError::try_from(err) {
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
//! Module containing various helpers for encoding
|
||||
|
||||
use std::fmt::Write as _;
|
||||
|
||||
/// Encode &str for use in a URI
|
||||
pub fn uri_encode(string: &str, encode_slash: bool) -> String {
|
||||
let mut result = String::with_capacity(string.len() * 2);
|
||||
@@ -9,14 +11,98 @@ pub fn uri_encode(string: &str, encode_slash: bool) -> String {
|
||||
'/' if encode_slash => result.push_str("%2F"),
|
||||
'/' if !encode_slash => result.push('/'),
|
||||
_ => {
|
||||
result.push_str(
|
||||
&format!("{}", c)
|
||||
.bytes()
|
||||
.map(|b| format!("%{:02X}", b))
|
||||
.collect::<String>(),
|
||||
);
|
||||
let mut buf = [0_u8; 4];
|
||||
let str = c.encode_utf8(&mut buf);
|
||||
for b in str.bytes() {
|
||||
write!(&mut result, "%{:02X}", b).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::encoding::uri_encode;
|
||||
|
||||
#[test]
|
||||
fn test_uri_encode() {
|
||||
let url1_encoded = uri_encode(
|
||||
"https://garagehq.deuxfleurs.fr/documentation/reference-manual/features/",
|
||||
true,
|
||||
);
|
||||
assert_eq!(
|
||||
&url1_encoded,
|
||||
"https%3A%2F%2Fgaragehq.deuxfleurs.fr%2Fdocumentation%2Freference-manual%2Ffeatures%2F"
|
||||
);
|
||||
|
||||
let url2_encoded = uri_encode(
|
||||
"https://garagehq.deuxfleurs.fr/blog/2025-06-garage-v2/",
|
||||
true,
|
||||
);
|
||||
assert_eq!(
|
||||
&url2_encoded,
|
||||
"https%3A%2F%2Fgaragehq.deuxfleurs.fr%2Fblog%2F2025-06-garage-v2%2F"
|
||||
);
|
||||
|
||||
let url3_encoded = uri_encode(
|
||||
"https://garagehq.deuxfleurs.fr/blog/2025-06-hé_les_gens/",
|
||||
true,
|
||||
);
|
||||
assert_eq!(
|
||||
&url3_encoded,
|
||||
"https%3A%2F%2Fgaragehq.deuxfleurs.fr%2Fblog%2F2025-06-h%C3%A9_les_gens%2F"
|
||||
);
|
||||
|
||||
let url4_encoded = uri_encode("/home/local user/Documents/personnel/à_blog.md", true);
|
||||
assert_eq!(
|
||||
&url4_encoded,
|
||||
"%2Fhome%2Flocal%20user%2FDocuments%2Fpersonnel%2F%C3%A0_blog.md"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_uri_encode_without_slash() {
|
||||
let url1_encoded = uri_encode(
|
||||
"https://garagehq.deuxfleurs.fr/documentation/reference-manual/features/",
|
||||
false,
|
||||
);
|
||||
assert_eq!(
|
||||
&url1_encoded,
|
||||
"https%3A//garagehq.deuxfleurs.fr/documentation/reference-manual/features/"
|
||||
);
|
||||
|
||||
let url2_encoded = uri_encode(
|
||||
"https://garagehq.deuxfleurs.fr/blog/2025-06-garage-v2/",
|
||||
false,
|
||||
);
|
||||
assert_eq!(
|
||||
&url2_encoded,
|
||||
"https%3A//garagehq.deuxfleurs.fr/blog/2025-06-garage-v2/"
|
||||
);
|
||||
|
||||
let url3_encoded = uri_encode(
|
||||
"https://garagehq.deuxfleurs.fr/blog/2025-06-hé_les_gens/",
|
||||
false,
|
||||
);
|
||||
assert_eq!(
|
||||
&url3_encoded,
|
||||
"https%3A//garagehq.deuxfleurs.fr/blog/2025-06-h%C3%A9_les_gens/"
|
||||
);
|
||||
let url4_encoded = uri_encode("/home/local user/Documents/personnel/à_blog.md", false);
|
||||
assert_eq!(
|
||||
&url4_encoded,
|
||||
"/home/local%20user/Documents/personnel/%C3%A0_blog.md"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_uri_encode_most_than_double_size() {
|
||||
let url_encoded = uri_encode("/home/ùàé ç/çaèù/à_êô.md", true);
|
||||
assert_eq!(
|
||||
&url_encoded,
|
||||
"%2Fhome%2F%C3%B9%C3%A0%C3%A9%20%C3%A7%2F%C3%A7a%C3%A8%C3%B9%2F%C3%A0_%C3%AA%C3%B4.md"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -125,7 +125,7 @@ impl<A: ApiHandler> ApiServer<A> {
|
||||
}
|
||||
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
|
||||
if path.exists() {
|
||||
fs::remove_file(path)?
|
||||
fs::remove_file(path)?;
|
||||
}
|
||||
|
||||
let listener = UnixListener::bind(path)?;
|
||||
@@ -190,7 +190,7 @@ impl<A: ApiHandler> ApiServer<A> {
|
||||
let mut http_error_builder = Response::builder().status(e.http_status_code());
|
||||
|
||||
if let Some(header_map) = http_error_builder.headers_mut() {
|
||||
e.add_http_headers(header_map)
|
||||
e.add_http_headers(header_map);
|
||||
}
|
||||
|
||||
let http_error = http_error_builder.body(body)?;
|
||||
|
||||
@@ -89,7 +89,7 @@ impl ReqBody {
|
||||
checksummer
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
}
|
||||
Err(frame) => {
|
||||
let trailers = frame.into_trailers().unwrap();
|
||||
|
||||
@@ -35,3 +35,6 @@ serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
opentelemetry.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -97,7 +97,7 @@ impl ReturnFormat {
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle ReadItem request
|
||||
/// Handle `ReadItem` request
|
||||
#[allow(clippy::ptr_arg)]
|
||||
pub async fn handle_read_item(
|
||||
ctx: ReqCtx,
|
||||
@@ -201,7 +201,7 @@ pub async fn handle_delete_item(
|
||||
.body(empty_body())?)
|
||||
}
|
||||
|
||||
/// Handle ReadItem request
|
||||
/// Handle `ReadItem` request
|
||||
#[allow(clippy::ptr_arg)]
|
||||
pub async fn handle_poll_item(
|
||||
ctx: ReqCtx,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Utility module for retrieving ranges of items in Garage tables
|
||||
//! Implements parameters (prefix, start, end, limit) as specified
|
||||
//! for endpoints ReadIndex, ReadBatch and DeleteBatch
|
||||
//! for endpoints `ReadIndex`, `ReadBatch` and `DeleteBatch`
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ pub enum Endpoint {
|
||||
impl Endpoint {
|
||||
/// Determine which S3 endpoint a request is for using the request, and a bucket which was
|
||||
/// possibly extracted from the Host header.
|
||||
/// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
|
||||
/// Returns Self plus bucket name, if endpoint is not `Endpoint::ListBuckets`
|
||||
pub fn from_request<T>(req: &Request<T>) -> Result<(Self, String), Error> {
|
||||
let uri = req.uri();
|
||||
let path = uri.path().trim_start_matches('/');
|
||||
@@ -62,7 +62,7 @@ impl Endpoint {
|
||||
let (bucket, partition_key) = path
|
||||
.split_once('/')
|
||||
.map(|(b, p)| (b.to_owned(), p.trim_start_matches('/')))
|
||||
.unwrap_or((path.to_owned(), ""));
|
||||
.unwrap_or_else(|| (path.to_owned(), ""));
|
||||
|
||||
if bucket.is_empty() {
|
||||
return Err(Error::bad_request("Missing bucket name"));
|
||||
@@ -90,7 +90,7 @@ impl Endpoint {
|
||||
};
|
||||
|
||||
if let Some(message) = query.nonempty_message() {
|
||||
debug!("Unused query parameter: {}", message)
|
||||
debug!("Unused query parameter: {}", message);
|
||||
}
|
||||
Ok((res, bucket))
|
||||
}
|
||||
|
||||
@@ -58,3 +58,6 @@ serde_json.workspace = true
|
||||
quick-xml.workspace = true
|
||||
|
||||
opentelemetry.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -660,11 +660,11 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_encrypt_block() {
|
||||
test_block_enc(None).await
|
||||
test_block_enc(None).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_encrypt_block_compressed() {
|
||||
test_block_enc(Some(1)).await
|
||||
test_block_enc(Some(1)).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,11 +50,11 @@ pub enum Error {
|
||||
#[error("Parts given to CompleteMultipartUpload do not match uploaded parts")]
|
||||
InvalidPart,
|
||||
|
||||
/// Parts given to CompleteMultipartUpload were not in ascending order
|
||||
/// Parts given to `CompleteMultipartUpload` were not in ascending order
|
||||
#[error("Parts given to CompleteMultipartUpload were not in ascending order")]
|
||||
InvalidPartOrder,
|
||||
|
||||
/// In CompleteMultipartUpload: not enough data
|
||||
/// In `CompleteMultipartUpload`: not enough data
|
||||
/// (here we are more lenient than AWS S3)
|
||||
#[error("Proposed upload is smaller than the minimum allowed object size")]
|
||||
EntityTooSmall,
|
||||
|
||||
@@ -126,7 +126,7 @@ fn handle_http_precondition(
|
||||
) -> Result<Option<Response<ResBody>>, Error> {
|
||||
let precondition_headers = PreconditionHeaders::parse(req)?;
|
||||
|
||||
if let Some(status_code) = precondition_headers.check(version, &version_meta.etag)? {
|
||||
if let Some(status_code) = precondition_headers.check(version, &version_meta.etag) {
|
||||
let mut response = object_headers(
|
||||
version,
|
||||
version_meta,
|
||||
@@ -877,7 +877,7 @@ impl PreconditionHeaders {
|
||||
})
|
||||
}
|
||||
|
||||
fn check(&self, v: &ObjectVersion, etag: &str) -> Result<Option<StatusCode>, Error> {
|
||||
fn check(&self, v: &ObjectVersion, etag: &str) -> Option<StatusCode> {
|
||||
// we store date with ms precision, but headers are precise to the second: truncate
|
||||
// the timestamp to handle the same-second edge case
|
||||
let v_date = UNIX_EPOCH + Duration::from_secs(v.timestamp / 1000);
|
||||
@@ -887,32 +887,32 @@ impl PreconditionHeaders {
|
||||
if let Some(im) = &self.if_match {
|
||||
// Step 1: if-match is present
|
||||
if !im.iter().any(|x| x == etag || x == "*") {
|
||||
return Ok(Some(StatusCode::PRECONDITION_FAILED));
|
||||
return Some(StatusCode::PRECONDITION_FAILED);
|
||||
}
|
||||
} else if let Some(ius) = &self.if_unmodified_since {
|
||||
// Step 2: if-unmodified-since is present, and if-match is absent
|
||||
if v_date > *ius {
|
||||
return Ok(Some(StatusCode::PRECONDITION_FAILED));
|
||||
return Some(StatusCode::PRECONDITION_FAILED);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(inm) = &self.if_none_match {
|
||||
// Step 3: if-none-match is present
|
||||
if inm.iter().any(|x| x == etag || x == "*") {
|
||||
return Ok(Some(StatusCode::NOT_MODIFIED));
|
||||
return Some(StatusCode::NOT_MODIFIED);
|
||||
}
|
||||
} else if let Some(ims) = &self.if_modified_since {
|
||||
// Step 4: if-modified-since is present, and if-none-match is absent
|
||||
if v_date <= *ims {
|
||||
return Ok(Some(StatusCode::NOT_MODIFIED));
|
||||
return Some(StatusCode::NOT_MODIFIED);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) fn check_copy_source(&self, v: &ObjectVersion, etag: &str) -> Result<(), Error> {
|
||||
match self.check(v, etag)? {
|
||||
match self.check(v, etag) {
|
||||
Some(_) => Err(Error::PreconditionFailed),
|
||||
None => Ok(()),
|
||||
}
|
||||
|
||||
@@ -296,7 +296,7 @@ pub async fn handle_list_parts(
|
||||
},
|
||||
);
|
||||
|
||||
let (info, next) = fetch_part_info(query, &mpu)?;
|
||||
let (info, next) = fetch_part_info(query, &mpu);
|
||||
|
||||
let result = s3_xml::ListPartsResult {
|
||||
xmlns: (),
|
||||
@@ -484,7 +484,7 @@ where
|
||||
iter.next();
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
}
|
||||
|
||||
while let Some(object) = iter.peek() {
|
||||
if !object.key.starts_with(&query.prefix) {
|
||||
@@ -508,7 +508,7 @@ where
|
||||
ExtractionResult::NoMore => {
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if !server_more {
|
||||
@@ -526,7 +526,7 @@ where
|
||||
fn fetch_part_info<'a>(
|
||||
query: &ListPartsQuery,
|
||||
mpu: &'a MultipartUpload,
|
||||
) -> Result<(Vec<PartInfo<'a>>, Option<u64>), Error> {
|
||||
) -> (Vec<PartInfo<'a>>, Option<u64>) {
|
||||
assert!((1..=1000).contains(&query.max_parts)); // see s3/api_server.rs
|
||||
|
||||
// Parse multipart upload part list, removing parts not yet finished
|
||||
@@ -565,10 +565,10 @@ fn fetch_part_info<'a>(
|
||||
if parts.len() > query.max_parts as usize {
|
||||
parts.truncate(query.max_parts as usize);
|
||||
let pagination = Some(parts.last().unwrap().part_number);
|
||||
return Ok((parts, pagination));
|
||||
return (parts, pagination);
|
||||
}
|
||||
|
||||
Ok((parts, None))
|
||||
(parts, None)
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -756,7 +756,7 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
|
||||
None => Some(ExtractionResult::NoMore),
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -939,7 +939,7 @@ fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a
|
||||
}
|
||||
}
|
||||
|
||||
/// URIencode a value if needed
|
||||
/// `URIencode` a value if needed
|
||||
fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value {
|
||||
if yes {
|
||||
s3_xml::Value(uri_encode(s, true))
|
||||
@@ -1069,7 +1069,7 @@ mod tests {
|
||||
assert_eq!(upload, Uuid::from([0x8f; 32]));
|
||||
}
|
||||
_ => panic!("wrong result"),
|
||||
};
|
||||
}
|
||||
|
||||
assert_eq!(acc.keys.len(), 2);
|
||||
assert_eq!(
|
||||
@@ -1098,7 +1098,7 @@ mod tests {
|
||||
match acc.extract(&(query().common), &start, &mut iter) {
|
||||
ExtractionResult::Extracted { key } if key.as_str() == "b" => (),
|
||||
_ => panic!("wrong result"),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1255,7 +1255,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_fetch_part_info() -> Result<(), Error> {
|
||||
fn test_fetch_part_info() {
|
||||
let mut query = ListPartsQuery {
|
||||
bucket_name: "a".to_string(),
|
||||
key: "a".to_string(),
|
||||
@@ -1267,7 +1267,7 @@ mod tests {
|
||||
let mpu = mpu();
|
||||
|
||||
// Start from the beginning but with limited size to trigger pagination
|
||||
let (info, pagination) = fetch_part_info(&query, &mpu)?;
|
||||
let (info, pagination) = fetch_part_info(&query, &mpu);
|
||||
assert_eq!(pagination.unwrap(), 3);
|
||||
assert_eq!(
|
||||
info,
|
||||
@@ -1291,7 +1291,7 @@ mod tests {
|
||||
|
||||
// Use previous pagination to make a new request
|
||||
query.part_number_marker = Some(pagination.unwrap());
|
||||
let (info, pagination) = fetch_part_info(&query, &mpu)?;
|
||||
let (info, pagination) = fetch_part_info(&query, &mpu);
|
||||
assert!(pagination.is_none());
|
||||
assert_eq!(
|
||||
info,
|
||||
@@ -1315,14 +1315,14 @@ mod tests {
|
||||
|
||||
// Trying to access a part that is way larger than registered ones
|
||||
query.part_number_marker = Some(9999);
|
||||
let (info, pagination) = fetch_part_info(&query, &mpu)?;
|
||||
let (info, pagination) = fetch_part_info(&query, &mpu);
|
||||
assert!(pagination.is_none());
|
||||
assert_eq!(info, vec![]);
|
||||
|
||||
// Try without any limitation
|
||||
query.max_parts = 1000;
|
||||
query.part_number_marker = None;
|
||||
let (info, pagination) = fetch_part_info(&query, &mpu)?;
|
||||
let (info, pagination) = fetch_part_info(&query, &mpu);
|
||||
assert!(pagination.is_none());
|
||||
assert_eq!(
|
||||
info,
|
||||
@@ -1357,7 +1357,5 @@ mod tests {
|
||||
},
|
||||
]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -370,7 +370,7 @@ pub async fn handle_complete_multipart_upload(
|
||||
req_part.part_number, req_part.checksum, part.checksum
|
||||
)));
|
||||
}
|
||||
parts.push(*part)
|
||||
parts.push(*part);
|
||||
}
|
||||
_ => return Err(Error::InvalidPart),
|
||||
}
|
||||
@@ -494,7 +494,7 @@ pub async fn handle_complete_multipart_upload(
|
||||
.root_domain
|
||||
.as_ref()
|
||||
.map(|rd| s3_xml::Value(format!("https://{}.{}/{}", bucket_name, rd, key)))
|
||||
.or(Some(s3_xml::Value(format!("/{}/{}", bucket_name, key)))),
|
||||
.or_else(|| Some(s3_xml::Value(format!("/{}/{}", bucket_name, key)))),
|
||||
bucket: s3_xml::Value(bucket_name.to_string()),
|
||||
key: s3_xml::Value(key),
|
||||
etag: s3_xml::Value(format!("\"{}\"", etag)),
|
||||
|
||||
@@ -192,7 +192,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||
meta.checksum = checksums.extract(Some(algo));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let size = first_block.len() as u64;
|
||||
check_quotas(ctx, size, existing_object.as_ref()).await?;
|
||||
@@ -293,7 +293,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||
meta.checksum = checksums.extract(Some(algo));
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Verify quotas are respsected
|
||||
check_quotas(ctx, total_size, existing_object.as_ref()).await?;
|
||||
@@ -339,7 +339,7 @@ pub(crate) async fn check_quotas(
|
||||
let quotas = bucket_params.quotas.get();
|
||||
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
|
||||
return Ok(());
|
||||
};
|
||||
}
|
||||
|
||||
let counters = garage
|
||||
.object_counter_table
|
||||
@@ -436,7 +436,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
|
||||
tracer.start("Hash block (md5, sha256)"),
|
||||
))
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
}
|
||||
Err(e) => {
|
||||
block_tx2.send(Err(e)).await?;
|
||||
|
||||
@@ -309,7 +309,7 @@ pub enum Endpoint {
|
||||
impl Endpoint {
|
||||
/// Determine which S3 endpoint a request is for using the request, and a bucket which was
|
||||
/// possibly extracted from the Host header.
|
||||
/// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
|
||||
/// Returns Self plus bucket name, if endpoint is not `Endpoint::ListBuckets`
|
||||
pub fn from_request<T>(
|
||||
req: &Request<T>,
|
||||
bucket: Option<String>,
|
||||
@@ -330,7 +330,7 @@ impl Endpoint {
|
||||
} else {
|
||||
path.split_once('/')
|
||||
.map(|(b, p)| (b.to_owned(), p.trim_start_matches('/')))
|
||||
.unwrap_or((path.to_owned(), ""))
|
||||
.unwrap_or_else(|| (path.to_owned(), ""))
|
||||
};
|
||||
|
||||
if *req.method() == Method::OPTIONS {
|
||||
@@ -365,7 +365,7 @@ impl Endpoint {
|
||||
}
|
||||
|
||||
if let Some(message) = query.nonempty_message() {
|
||||
debug!("Unused query parameter: {}", message)
|
||||
debug!("Unused query parameter: {}", message);
|
||||
}
|
||||
Ok((res, Some(bucket)))
|
||||
}
|
||||
@@ -580,7 +580,7 @@ impl Endpoint {
|
||||
pub fn authorization_type(&self) -> Authorization {
|
||||
if let Endpoint::ListBuckets = self {
|
||||
return Authorization::None;
|
||||
};
|
||||
}
|
||||
let readonly = router_match! {
|
||||
@match
|
||||
self,
|
||||
@@ -725,7 +725,7 @@ mod tests {
|
||||
) -> (Endpoint, Option<String>) {
|
||||
let mut req = Request::builder().method(method).uri(uri);
|
||||
if let Some((k, v)) = header {
|
||||
req = req.header(k, v)
|
||||
req = req.header(k, v);
|
||||
}
|
||||
let req = req.body(()).unwrap();
|
||||
|
||||
@@ -859,7 +859,7 @@ mod tests {
|
||||
.body(())
|
||||
.unwrap();
|
||||
|
||||
assert!(Endpoint::from_request(&req, None).is_err())
|
||||
assert!(Endpoint::from_request(&req, None).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -40,3 +40,6 @@ tokio-util.workspace = true
|
||||
|
||||
[features]
|
||||
system-libs = ["zstd/pkg-config"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -21,7 +21,7 @@ pub(crate) struct DataLayout {
|
||||
pub(crate) data_dirs: Vec<DataDir>,
|
||||
markers: HashMap<PathBuf, String>,
|
||||
|
||||
/// Primary storage location (index in data_dirs) for each partition
|
||||
/// Primary storage location (index in `data_dirs`) for each partition
|
||||
/// = the location where the data is supposed to be, blocks are always
|
||||
/// written there (copies in other dirs may be deleted if they exist)
|
||||
pub(crate) part_prim: Vec<Idx>,
|
||||
@@ -159,7 +159,7 @@ impl DataLayout {
|
||||
for (idir, parts) in dir_prim.iter().enumerate() {
|
||||
for part in parts.iter() {
|
||||
assert!(part_prim[*part].is_none());
|
||||
part_prim[*part] = Some(idir as Idx)
|
||||
part_prim[*part] = Some(idir as Idx);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ use opentelemetry::{global, metrics::*};
|
||||
|
||||
use garage_db as db;
|
||||
|
||||
/// TableMetrics reference all counter used for metrics
|
||||
/// `TableMetrics` reference all counter used for metrics
|
||||
pub struct BlockManagerMetrics {
|
||||
pub(crate) _compression_level: ValueObserver<u64>,
|
||||
pub(crate) _rc_size: ValueObserver<u64>,
|
||||
@@ -52,7 +52,7 @@ impl BlockManagerMetrics {
|
||||
_rc_size: meter
|
||||
.u64_value_observer("block.rc_size", move |observer| {
|
||||
if let Ok(value) = rc_tree.approximate_len() {
|
||||
observer.observe(value as u64, &[])
|
||||
observer.observe(value as u64, &[]);
|
||||
}
|
||||
})
|
||||
.with_description("Number of blocks known to the reference counter")
|
||||
@@ -78,7 +78,7 @@ impl BlockManagerMetrics {
|
||||
|
||||
_buffer_free_kb: meter
|
||||
.u64_value_observer("block.ram_buffer_free_kb", move |observer| {
|
||||
observer.observe(buffer_semaphore.available_permits() as u64, &[])
|
||||
observer.observe(buffer_semaphore.available_permits() as u64, &[]);
|
||||
})
|
||||
.with_description(
|
||||
"Available RAM in KiB to use for buffering data blocks to be written to remote nodes",
|
||||
|
||||
@@ -37,7 +37,7 @@ impl BlockRc {
|
||||
match old_rc.increment().serialize() {
|
||||
Some(x) => tx.insert(&self.rc_table, hash, x)?,
|
||||
None => unreachable!(),
|
||||
};
|
||||
}
|
||||
Ok(old_rc.is_zero())
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ impl BlockRc {
|
||||
match new_rc.serialize() {
|
||||
Some(x) => tx.insert(&self.rc_table, hash, x)?,
|
||||
None => tx.remove(&self.rc_table, hash)?,
|
||||
};
|
||||
}
|
||||
Ok(matches!(new_rc, RcEntry::Deletable { .. }))
|
||||
}
|
||||
|
||||
@@ -72,7 +72,7 @@ impl BlockRc {
|
||||
tx.remove(&self.rc_table, hash)?;
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
Ok(())
|
||||
@@ -136,14 +136,14 @@ impl BlockRc {
|
||||
pub(crate) enum RcEntry {
|
||||
/// Present: the block has `count` references, with `count` > 0.
|
||||
///
|
||||
/// This is stored as u64::to_be_bytes(count)
|
||||
/// This is stored as `u64::to_be_bytes(count)`
|
||||
Present { count: u64 },
|
||||
|
||||
/// Deletable: the block has zero references, and can be deleted
|
||||
/// once time (returned by now_msec) is larger than at_time
|
||||
/// once time (returned by `now_msec`) is larger than `at_time`
|
||||
/// (in millis since Unix epoch)
|
||||
///
|
||||
/// This is stored as [0u8; 8] followed by u64::to_be_bytes(at_time),
|
||||
/// This is stored as [0u8; 8] followed by `u64::to_be_bytes(at_time)`,
|
||||
/// (this allows for the data format to be backwards compatible with
|
||||
/// previous Garage versions that didn't have this intermediate state)
|
||||
Deletable { at_time: u64 },
|
||||
|
||||
@@ -127,7 +127,7 @@ impl Worker for RepairWorker {
|
||||
self.manager
|
||||
.resync
|
||||
.put_to_resync(&hash, Duration::from_secs(0))?;
|
||||
self.next_start = Some(hash)
|
||||
self.next_start = Some(hash);
|
||||
}
|
||||
|
||||
Ok(WorkerState::Busy)
|
||||
@@ -440,7 +440,7 @@ impl Worker for ScrubWorker {
|
||||
Ok(cmd) => self.handle_cmd(cmd).await,
|
||||
Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerState::Done),
|
||||
Err(mpsc::error::TryRecvError::Empty) => (),
|
||||
};
|
||||
}
|
||||
|
||||
match &mut self.work {
|
||||
ScrubWorkerState::Running { iterator, t_cp } => {
|
||||
@@ -455,7 +455,7 @@ impl Worker for ScrubWorker {
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
_ => (),
|
||||
};
|
||||
}
|
||||
|
||||
if now - *t_cp > 60 * 1000 {
|
||||
self.persister
|
||||
@@ -570,7 +570,7 @@ impl Worker for RebalanceWorker {
|
||||
format!("Started: {}", msec_to_rfc3339(self.t_started)),
|
||||
];
|
||||
if let Some(t_fin) = self.t_finished {
|
||||
freeform.push(format!("Finished: {}", msec_to_rfc3339(t_fin)))
|
||||
freeform.push(format!("Finished: {}", msec_to_rfc3339(t_fin)));
|
||||
}
|
||||
WorkerStatus {
|
||||
progress: Some(format!("{:.2}%", self.block_iter.progress() * 100.)),
|
||||
|
||||
@@ -588,7 +588,7 @@ impl Worker for ResyncWorker {
|
||||
|
||||
async fn wait_for_work(&mut self) -> WorkerState {
|
||||
while self.index >= self.persister.get_with(|x| x.n_workers) {
|
||||
self.manager.resync.notify.notified().await
|
||||
self.manager.resync.notify.notified().await;
|
||||
}
|
||||
|
||||
select! {
|
||||
|
||||
@@ -33,3 +33,6 @@ bundled-libs = ["rusqlite?/bundled"]
|
||||
lmdb = ["heed"]
|
||||
fjall = ["dep:fjall", "dep:parking_lot"]
|
||||
sqlite = ["rusqlite", "r2d2", "r2d2_sqlite"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -564,7 +564,7 @@ fn bounds_sql<'r>(low: Bound<&'r [u8]>, high: Bound<&'r [u8]>) -> (String, Vec<V
|
||||
params.push(b.to_vec());
|
||||
}
|
||||
Bound::Unbounded => (),
|
||||
};
|
||||
}
|
||||
|
||||
match high {
|
||||
Bound::Included(b) => {
|
||||
|
||||
@@ -119,3 +119,6 @@ system-libs = [
|
||||
"garage_rpc/system-libs",
|
||||
"sodiumoxide/use-pkg-config",
|
||||
]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -7,7 +7,7 @@ use garage_db::*;
|
||||
/// K2V command line interface
|
||||
#[derive(StructOpt, Debug)]
|
||||
pub struct ConvertDbOpt {
|
||||
/// Input database path (not the same as metadata_dir, see
|
||||
/// Input database path (not the same as `metadata_dir`, see
|
||||
/// <https://garagehq.deuxfleurs.fr/documentation/reference-manual/configuration/#db_engine>
|
||||
#[structopt(short = "i")]
|
||||
input_path: PathBuf,
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::borrow::Cow;
|
||||
|
||||
use format_table::format_table;
|
||||
|
||||
use chrono::Local;
|
||||
@@ -42,18 +44,18 @@ impl Cli {
|
||||
table_list_abbr(&tok.scope)
|
||||
};
|
||||
let exp = if tok.expired {
|
||||
"expired".to_string()
|
||||
Cow::Borrowed("expired")
|
||||
} else {
|
||||
tok.expiration
|
||||
.map(|x| x.with_timezone(&Local).to_string())
|
||||
.unwrap_or("never".into())
|
||||
.map(|x| x.with_timezone(&Local).to_string().into())
|
||||
.unwrap_or(Cow::Borrowed("never"))
|
||||
};
|
||||
table.push(format!(
|
||||
"{}\t{}\t{}\t{}\t{}",
|
||||
tok.id.as_deref().unwrap_or("-"),
|
||||
tok.created
|
||||
.map(|x| x.with_timezone(&Local).date_naive().to_string())
|
||||
.unwrap_or("-".into()),
|
||||
.map(|x| x.with_timezone(&Local).date_naive().to_string().into())
|
||||
.unwrap_or(Cow::Borrowed("-")),
|
||||
tok.name,
|
||||
exp,
|
||||
scope,
|
||||
@@ -236,8 +238,8 @@ fn print_token_info(token: &GetAdminTokenInfoResponse) {
|
||||
"Expiration:\t{}",
|
||||
token
|
||||
.expiration
|
||||
.map(|x| x.with_timezone(&Local).to_string())
|
||||
.unwrap_or("never".into())
|
||||
.map(|x| x.with_timezone(&Local).to_string().into())
|
||||
.unwrap_or(Cow::Borrowed("never"))
|
||||
),
|
||||
String::new(),
|
||||
];
|
||||
|
||||
@@ -295,13 +295,20 @@ impl Cli {
|
||||
));
|
||||
}
|
||||
|
||||
// Destructure becket info to allow separate use of `id` and `website_config`
|
||||
let GetBucketInfoResponse {
|
||||
id: bucket_id,
|
||||
website_config: bucket_website_config,
|
||||
..
|
||||
} = bucket;
|
||||
|
||||
let wa = if opt.allow {
|
||||
UpdateBucketWebsiteAccess {
|
||||
enabled: true,
|
||||
index_document: Some(opt.index_document.clone()),
|
||||
error_document: opt
|
||||
.error_document
|
||||
.or(bucket.website_config.and_then(|x| x.error_document.clone())),
|
||||
.or_else(|| bucket_website_config.and_then(|x| x.error_document.clone())),
|
||||
}
|
||||
} else {
|
||||
UpdateBucketWebsiteAccess {
|
||||
@@ -313,7 +320,7 @@ impl Cli {
|
||||
|
||||
let res = self
|
||||
.api_request(UpdateBucketRequest {
|
||||
id: bucket.id,
|
||||
id: bucket_id,
|
||||
body: UpdateBucketRequestBody {
|
||||
website_access: Some(wa),
|
||||
quotas: None,
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::borrow::Cow;
|
||||
|
||||
use format_table::format_table;
|
||||
|
||||
use chrono::Local;
|
||||
@@ -33,11 +35,11 @@ impl Cli {
|
||||
let mut table = vec!["ID\tCreated\tName\tExpiration".to_string()];
|
||||
for key in keys.0.iter() {
|
||||
let exp = if key.expired {
|
||||
"expired".to_string()
|
||||
Cow::from("expired")
|
||||
} else {
|
||||
key.expiration
|
||||
.map(|x| x.with_timezone(&Local).to_string())
|
||||
.unwrap_or("never".into())
|
||||
.map(|x| x.with_timezone(&Local).to_string().into())
|
||||
.unwrap_or(Cow::Borrowed("never"))
|
||||
};
|
||||
table.push(format!(
|
||||
"{}\t{}\t{}\t{}",
|
||||
@@ -288,8 +290,8 @@ fn print_key_info(key: &GetKeyInfoResponse) {
|
||||
format!(
|
||||
"Expiration:\t{}",
|
||||
key.expiration
|
||||
.map(|x| x.with_timezone(&Local).to_string())
|
||||
.unwrap_or("never".into())
|
||||
.map(|x| x.with_timezone(&Local).to_string().into())
|
||||
.unwrap_or(Cow::Borrowed("never"))
|
||||
),
|
||||
String::new(),
|
||||
format!("Can create buckets:\t{}", key.permissions.create_bucket),
|
||||
|
||||
@@ -394,7 +394,7 @@ pub fn print_cluster_layout(layout: &GetClusterLayoutResponse, empty_msg: &str)
|
||||
role.zone,
|
||||
capacity_string(role.capacity),
|
||||
));
|
||||
};
|
||||
}
|
||||
}
|
||||
if table.len() > 1 {
|
||||
format_table(table);
|
||||
|
||||
@@ -102,7 +102,7 @@ impl Cli {
|
||||
s => {
|
||||
table.push(format!("Worker state:\t{}", format_worker_state(s)));
|
||||
}
|
||||
};
|
||||
}
|
||||
if let Some(tql) = info.tranquility {
|
||||
table.push(format!("Tranquility:\t{}", tql));
|
||||
}
|
||||
|
||||
@@ -71,7 +71,7 @@ pub enum Command {
|
||||
/// The result is printed to `stdout` in JSON format.
|
||||
#[structopt(name = "json-api", version = garage_version())]
|
||||
JsonApi {
|
||||
/// The admin API endpoint to invoke, e.g. GetClusterStatus
|
||||
/// The admin API endpoint to invoke, e.g. `GetClusterStatus`
|
||||
endpoint: String,
|
||||
/// The JSON payload, or `-` to read from `stdin`
|
||||
#[structopt(default_value = "null")]
|
||||
@@ -475,7 +475,7 @@ pub struct KeyNewOpt {
|
||||
#[structopt(default_value = "Unnamed key")]
|
||||
pub name: String,
|
||||
/// Set an expiration time for the access key
|
||||
/// (see docs.rs/parse_duration for date format)
|
||||
/// (see `docs.rs/parse_duration` for date format)
|
||||
#[structopt(long = "expires-in")]
|
||||
pub expires_in: Option<String>,
|
||||
}
|
||||
@@ -486,7 +486,7 @@ pub struct KeySetOpt {
|
||||
pub key_pattern: String,
|
||||
|
||||
/// Set an expiration time for the access key
|
||||
/// (see docs.rs/parse_duration for date format)
|
||||
/// (see `docs.rs/parse_duration` for date format)
|
||||
#[structopt(long = "expires-in")]
|
||||
pub expires_in: Option<String>,
|
||||
/// Set the access key to never expire
|
||||
@@ -518,7 +518,7 @@ pub struct KeyPermOpt {
|
||||
/// ID or name of the key
|
||||
pub key_pattern: String,
|
||||
|
||||
/// Flag that allows key to create buckets using S3's CreateBucket call
|
||||
/// Flag that allows key to create buckets using S3's `CreateBucket` call
|
||||
#[structopt(long = "create-bucket")]
|
||||
pub create_bucket: bool,
|
||||
}
|
||||
@@ -597,12 +597,12 @@ pub enum AdminTokenOperation {
|
||||
pub struct AdminTokenCreateOp {
|
||||
/// Set a name for the token
|
||||
pub name: Option<String>,
|
||||
/// Set an expiration time for the token (see docs.rs/parse_duration for date
|
||||
/// Set an expiration time for the token (see `docs.rs/parse_duration` for date
|
||||
/// format)
|
||||
#[structopt(long = "expires-in")]
|
||||
pub expires_in: Option<String>,
|
||||
/// Set a limited scope for the token, as a comma-separated list of
|
||||
/// admin API functions (e.g. GetClusterStatus, etc.). The default scope
|
||||
/// admin API functions (e.g. `GetClusterStatus`, etc.). The default scope
|
||||
/// is `*`, which allows access to all admin API functions.
|
||||
/// Note that granting a scope that allows `CreateAdminToken` or
|
||||
/// `UpdateAdminToken` allows for privilege escalation, and is therefore
|
||||
@@ -619,7 +619,7 @@ pub struct AdminTokenSetOp {
|
||||
/// Name or prefix of the ID of the token to modify
|
||||
pub api_token: String,
|
||||
|
||||
/// Set an expiration time for the token (see docs.rs/parse_duration for date
|
||||
/// Set an expiration time for the token (see `docs.rs/parse_duration` for date
|
||||
/// format)
|
||||
#[structopt(long = "expires-in")]
|
||||
pub expires_in: Option<String>,
|
||||
@@ -628,7 +628,7 @@ pub struct AdminTokenSetOp {
|
||||
pub never_expires: bool,
|
||||
|
||||
/// Set a limited scope for the token, as a comma-separated list of
|
||||
/// admin API functions (e.g. GetClusterStatus, etc.), or `*` to allow
|
||||
/// admin API functions (e.g. `GetClusterStatus`, etc.), or `*` to allow
|
||||
/// all admin API functions.
|
||||
/// Use `--scope=+Scope1,Scope2` to add scopes to the existing list,
|
||||
/// and `--scope=-Scope1,Scope2` to remove scopes from the existing list.
|
||||
|
||||
@@ -17,32 +17,32 @@ pub struct Secrets {
|
||||
)]
|
||||
pub allow_world_readable_secrets: Option<bool>,
|
||||
|
||||
/// RPC secret network key, used to replace rpc_secret in config.toml when running the
|
||||
/// RPC secret network key, used to replace `rpc_secret` in config.toml when running the
|
||||
/// daemon or doing admin operations
|
||||
#[structopt(short = "s", long = "rpc-secret", env = "GARAGE_RPC_SECRET")]
|
||||
pub rpc_secret: Option<String>,
|
||||
|
||||
/// RPC secret network key, used to replace rpc_secret in config.toml and rpc-secret
|
||||
/// RPC secret network key, used to replace `rpc_secret` in config.toml and rpc-secret
|
||||
/// when running the daemon or doing admin operations
|
||||
#[structopt(long = "rpc-secret-file", env = "GARAGE_RPC_SECRET_FILE")]
|
||||
pub rpc_secret_file: Option<PathBuf>,
|
||||
|
||||
/// Admin API authentication token, replaces admin.admin_token in config.toml when
|
||||
/// Admin API authentication token, replaces `admin.admin_token` in config.toml when
|
||||
/// running the Garage daemon
|
||||
#[structopt(long = "admin-token", env = "GARAGE_ADMIN_TOKEN")]
|
||||
pub admin_token: Option<String>,
|
||||
|
||||
/// Admin API authentication token file path, replaces admin.admin_token in config.toml
|
||||
/// Admin API authentication token file path, replaces `admin.admin_token` in config.toml
|
||||
/// and admin-token when running the Garage daemon
|
||||
#[structopt(long = "admin-token-file", env = "GARAGE_ADMIN_TOKEN_FILE")]
|
||||
pub admin_token_file: Option<PathBuf>,
|
||||
|
||||
/// Metrics API authentication token, replaces admin.metrics_token in config.toml when
|
||||
/// Metrics API authentication token, replaces `admin.metrics_token` in config.toml when
|
||||
/// running the Garage daemon
|
||||
#[structopt(long = "metrics-token", env = "GARAGE_METRICS_TOKEN")]
|
||||
pub metrics_token: Option<String>,
|
||||
|
||||
/// Metrics API authentication token file path, replaces admin.metrics_token in config.toml
|
||||
/// Metrics API authentication token file path, replaces `admin.metrics_token` in config.toml
|
||||
/// and metrics-token when running the Garage daemon
|
||||
#[structopt(long = "metrics-token-file", env = "GARAGE_METRICS_TOKEN_FILE")]
|
||||
pub metrics_token_file: Option<PathBuf>,
|
||||
|
||||
@@ -75,7 +75,7 @@ impl Context {
|
||||
bucket_name
|
||||
}
|
||||
|
||||
/// Build a K2vClient for a given bucket
|
||||
/// Build a `K2vClient` for a given bucket
|
||||
#[cfg(feature = "k2v")]
|
||||
pub fn k2v_client(&self, bucket: &str) -> K2vClient {
|
||||
let config = k2v_client::K2vClientConfig {
|
||||
|
||||
@@ -123,7 +123,7 @@ async fn test_listobjectsv2() {
|
||||
(Some(k), None) if k.len() == 1 => cnt_key += 1,
|
||||
(None, Some(pfx)) if pfx.len() == 1 => cnt_pfx += 1,
|
||||
_ => unreachable!("logic error"),
|
||||
};
|
||||
}
|
||||
if next.is_none() {
|
||||
break;
|
||||
}
|
||||
@@ -331,7 +331,7 @@ async fn test_listobjectsv1() {
|
||||
(Some(k), None) if k.len() == 1 => cnt_key += 1,
|
||||
(None, Some(pfx)) if pfx.len() == 1 => cnt_pfx += 1,
|
||||
_ => unreachable!("logic error"),
|
||||
};
|
||||
}
|
||||
if next.is_none() {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ mod telemetry {
|
||||
use garage_util::data::Uuid;
|
||||
use garage_util::error::Error;
|
||||
|
||||
#[expect(clippy::unnecessary_wraps)]
|
||||
pub fn init_tracing(_: &str, _: Uuid) -> Result<(), Error> {
|
||||
error!("Garage was built without OTLP exporter, admin.trace_sink is ignored.");
|
||||
Ok(())
|
||||
|
||||
@@ -45,3 +45,6 @@ path = "lib.rs"
|
||||
name = "k2v-cli"
|
||||
path = "bin/k2v-cli.rs"
|
||||
required-features = ["cli"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -95,7 +95,7 @@ impl K2vClient {
|
||||
})
|
||||
}
|
||||
|
||||
/// Perform a ReadItem request, reading the value(s) stored for a single pk+sk.
|
||||
/// Perform a `ReadItem` request, reading the value(s) stored for a single pk+sk.
|
||||
pub async fn read_item(
|
||||
&self,
|
||||
partition_key: &str,
|
||||
@@ -134,7 +134,7 @@ impl K2vClient {
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform a PollItem request, waiting for the value(s) stored for a single pk+sk to be
|
||||
/// Perform a `PollItem` request, waiting for the value(s) stored for a single pk+sk to be
|
||||
/// updated.
|
||||
pub async fn poll_item(
|
||||
&self,
|
||||
@@ -190,7 +190,7 @@ impl K2vClient {
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform a PollRange request, waiting for any change in a given range of keys
|
||||
/// Perform a `PollRange` request, waiting for any change in a given range of keys
|
||||
/// to occur
|
||||
pub async fn poll_range(
|
||||
&self,
|
||||
@@ -239,7 +239,7 @@ impl K2vClient {
|
||||
}))
|
||||
}
|
||||
|
||||
/// Perform an InsertItem request, inserting a value for a single pk+sk.
|
||||
/// Perform an `InsertItem` request, inserting a value for a single pk+sk.
|
||||
pub async fn insert_item(
|
||||
&self,
|
||||
partition_key: &str,
|
||||
@@ -258,7 +258,7 @@ impl K2vClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Perform a DeleteItem request, deleting the value(s) stored for a single pk+sk.
|
||||
/// Perform a `DeleteItem` request, deleting the value(s) stored for a single pk+sk.
|
||||
pub async fn delete_item(
|
||||
&self,
|
||||
partition_key: &str,
|
||||
@@ -274,7 +274,7 @@ impl K2vClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Perform a ReadIndex request, listing partition key which have at least one associated
|
||||
/// Perform a `ReadIndex` request, listing partition key which have at least one associated
|
||||
/// sort key, and which matches the filter.
|
||||
pub async fn read_index(
|
||||
&self,
|
||||
@@ -300,7 +300,7 @@ impl K2vClient {
|
||||
})
|
||||
}
|
||||
|
||||
/// Perform an InsertBatch request, inserting multiple values at once. Note: this operation is
|
||||
/// Perform an `InsertBatch` request, inserting multiple values at once. Note: this operation is
|
||||
/// *not* atomic: it is possible for some sub-operations to fails and others to success. In
|
||||
/// that case, failure is reported.
|
||||
pub async fn insert_batch(&self, operations: &[BatchInsertOp<'_>]) -> Result<(), Error> {
|
||||
@@ -312,7 +312,7 @@ impl K2vClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Perform a ReadBatch request, reading multiple values or range of values at once.
|
||||
/// Perform a `ReadBatch` request, reading multiple values or range of values at once.
|
||||
pub async fn read_batch(
|
||||
&self,
|
||||
operations: &[BatchReadOp<'_>],
|
||||
@@ -346,7 +346,7 @@ impl K2vClient {
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Perform a DeleteBatch request, deleting multiple values or range of values at once, without
|
||||
/// Perform a `DeleteBatch` request, deleting multiple values or range of values at once, without
|
||||
/// providing causality information.
|
||||
pub async fn delete_batch(&self, operations: &[BatchDeleteOp<'_>]) -> Result<Vec<u64>, Error> {
|
||||
let url = self.build_url(None, &[("delete", "")]);
|
||||
@@ -588,7 +588,7 @@ impl Serialize for K2vValue {
|
||||
}
|
||||
}
|
||||
|
||||
/// A set of K2vValue and associated causality information.
|
||||
/// A set of `K2vValue` and associated causality information.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct CausalValue {
|
||||
pub causality: CausalityToken,
|
||||
@@ -621,12 +621,12 @@ pub struct PollRangeFilter<'a> {
|
||||
pub prefix: Option<&'a str>,
|
||||
}
|
||||
|
||||
/// Response to a poll_range query
|
||||
/// Response to a `poll_range` query
|
||||
#[derive(Debug, Default, Clone, Serialize)]
|
||||
pub struct PollRangeResult {
|
||||
/// List of items that have changed since last PollRange call.
|
||||
/// List of items that have changed since last `PollRange` call.
|
||||
pub items: BTreeMap<String, CausalValue>,
|
||||
/// opaque string representing items already seen for future PollRange calls.
|
||||
/// opaque string representing items already seen for future `PollRange` calls.
|
||||
pub seen_marker: String,
|
||||
}
|
||||
|
||||
@@ -696,7 +696,7 @@ pub struct PartitionInfo {
|
||||
pub bytes: u64,
|
||||
}
|
||||
|
||||
/// Single sub-operation of an InsertBatch.
|
||||
/// Single sub-operation of an `InsertBatch`.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct BatchInsertOp<'a> {
|
||||
#[serde(rename = "pk")]
|
||||
@@ -709,7 +709,7 @@ pub struct BatchInsertOp<'a> {
|
||||
pub value: K2vValue,
|
||||
}
|
||||
|
||||
/// Single sub-operation of a ReadBatch.
|
||||
/// Single sub-operation of a `ReadBatch`.
|
||||
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct BatchReadOp<'a> {
|
||||
@@ -743,7 +743,7 @@ struct BatchReadItem {
|
||||
v: Vec<K2vValue>,
|
||||
}
|
||||
|
||||
/// Single sub-operation of a DeleteBatch
|
||||
/// Single sub-operation of a `DeleteBatch`
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct BatchDeleteOp<'a> {
|
||||
|
||||
@@ -46,3 +46,6 @@ k2v = ["garage_util/k2v"]
|
||||
lmdb = ["garage_db/lmdb"]
|
||||
sqlite = ["garage_db/sqlite"]
|
||||
fjall = ["garage_db/fjall"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -73,7 +73,7 @@ impl Crdt for AdminApiTokenScope {
|
||||
|
||||
impl AdminApiToken {
|
||||
/// Create a new admin API token.
|
||||
/// Returns the AdminApiToken object, which contains the hashed bearer token,
|
||||
/// Returns the `AdminApiToken` object, which contains the hashed bearer token,
|
||||
/// as well as the plaintext bearer token.
|
||||
pub fn new(name: &str) -> (Self, String) {
|
||||
use argon2::{
|
||||
|
||||
@@ -35,7 +35,7 @@ mod v08 {
|
||||
/// Map of aliases that are or have been given to this bucket
|
||||
/// in the global namespace
|
||||
/// (not authoritative: this is just used as an indication to
|
||||
/// map back to aliases when doing ListBuckets)
|
||||
/// map back to aliases when doing `ListBuckets`)
|
||||
pub aliases: crdt::LwwMap<String, bool>,
|
||||
/// Map of aliases that are or have been given to this bucket
|
||||
/// in namespaces local to keys
|
||||
@@ -148,7 +148,7 @@ mod v2 {
|
||||
/// Map of aliases that are or have been given to this bucket
|
||||
/// in the global namespace
|
||||
/// (not authoritative: this is just used as an indication to
|
||||
/// map back to aliases when doing ListBuckets)
|
||||
/// map back to aliases when doing `ListBuckets`)
|
||||
pub aliases: crdt::LwwMap<String, bool>,
|
||||
/// Map of aliases that are or have been given to this bucket
|
||||
/// in namespaces local to keys
|
||||
@@ -241,7 +241,7 @@ impl AutoCrdt for BucketQuotas {
|
||||
}
|
||||
|
||||
impl BucketParams {
|
||||
/// Create an empty BucketParams with no authorized keys and no website access
|
||||
/// Create an empty `BucketParams` with no authorized keys and no website access
|
||||
fn new() -> Self {
|
||||
BucketParams {
|
||||
creation_date: now_msec(),
|
||||
|
||||
@@ -79,7 +79,7 @@ impl<'a> BucketHelper<'a> {
|
||||
/// aliases directly using the data provided in the `api_key` parameter.
|
||||
/// As a consequence, it does not provide read-after-write guarantees.
|
||||
///
|
||||
/// In case no such bucket is found, this function returns a NoSuchBucket error.
|
||||
/// In case no such bucket is found, this function returns a `NoSuchBucket` error.
|
||||
#[allow(clippy::ptr_arg)]
|
||||
pub fn resolve_bucket_fast(
|
||||
&self,
|
||||
@@ -147,7 +147,7 @@ impl<'a> BucketHelper<'a> {
|
||||
///
|
||||
/// - this function does quorum reads to ensure consistency.
|
||||
/// - this function fetches the Key entry from the key table to ensure up-to-date data
|
||||
/// - this function returns None if the bucket is not found, instead of HelperError::NoSuchBucket
|
||||
/// - this function returns None if the bucket is not found, instead of `HelperError::NoSuchBucket`
|
||||
#[allow(clippy::ptr_arg)]
|
||||
pub async fn resolve_bucket(
|
||||
&self,
|
||||
|
||||
@@ -17,13 +17,13 @@ use crate::helper::key::KeyHelper;
|
||||
use crate::key_table::*;
|
||||
use crate::permission::BucketKeyPerm;
|
||||
|
||||
/// A LockedHelper is the mandatory struct to hold when doing operations
|
||||
/// A `LockedHelper` is the mandatory struct to hold when doing operations
|
||||
/// that modify access keys or bucket aliases. This structure takes
|
||||
/// a lock to a unit value that is in the globally-shared Garage struct.
|
||||
///
|
||||
/// This avoid several concurrent requests to modify the list of buckets
|
||||
/// and aliases at the same time, ending up in inconsistent states.
|
||||
/// This DOES NOT FIX THE FUNDAMENTAL ISSUE as CreateBucket requests handled
|
||||
/// This DOES NOT FIX THE FUNDAMENTAL ISSUE as `CreateBucket` requests handled
|
||||
/// by different API nodes can still break the cluster, but it is a first
|
||||
/// fix that allows consistency to be maintained if all such requests are
|
||||
/// directed to a single node, which is doable for many deployments.
|
||||
@@ -37,7 +37,7 @@ pub struct LockedHelper<'a>(
|
||||
impl<'a> Drop for LockedHelper<'a> {
|
||||
fn drop(&mut self) {
|
||||
// make it explicit that the mutexguard lives until here
|
||||
drop(self.1.take())
|
||||
drop(self.1.take());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -167,7 +167,7 @@ impl<'a> LockedHelper<'a> {
|
||||
}
|
||||
|
||||
/// Ensures a bucket does not have a certain global alias.
|
||||
/// Contrarily to unset_global_bucket_alias, this does not
|
||||
/// Contrarily to `unset_global_bucket_alias`, this does not
|
||||
/// fail on any condition other than:
|
||||
/// - bucket cannot be found (its fine if it is in deleted state)
|
||||
/// - alias cannot be found (its fine if it points to nothing or
|
||||
@@ -335,7 +335,7 @@ impl<'a> LockedHelper<'a> {
|
||||
}
|
||||
|
||||
/// Ensures a bucket does not have a certain local alias.
|
||||
/// Contrarily to unset_local_bucket_alias, this does not
|
||||
/// Contrarily to `unset_local_bucket_alias`, this does not
|
||||
/// fail on any condition other than:
|
||||
/// - bucket cannot be found (its fine if it is in deleted state)
|
||||
/// - key cannot be found (its fine if alias in key points to nothing
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
//! Implements a CausalContext, which is a set of timestamps for each
|
||||
//! Implements a `CausalContext`, which is a set of timestamps for each
|
||||
//! node -- a vector clock --, indicating that the versions with
|
||||
//! timestamps <= these numbers have been seen and can be
|
||||
//! overwritten by a subsequent write.
|
||||
//!
|
||||
//! The textual representation of a CausalContext, which we call a
|
||||
//! The textual representation of a `CausalContext`, which we call a
|
||||
//! "causality token", is used in the API and must be sent along with
|
||||
//! each write or delete operation to indicate the previously seen
|
||||
//! versions that we want to overwrite or delete.
|
||||
|
||||
@@ -56,7 +56,7 @@ mod v08 {
|
||||
pub use v08::*;
|
||||
|
||||
impl K2VItem {
|
||||
/// Creates a new K2VItem when no previous entry existed in the db
|
||||
/// Creates a new `K2VItem` when no previous entry existed in the db
|
||||
pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self {
|
||||
Self {
|
||||
partition: K2VItemPartition {
|
||||
@@ -67,7 +67,7 @@ impl K2VItem {
|
||||
items: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
/// Updates a K2VItem with a new value or a deletion event
|
||||
/// Updates a `K2VItem` with a new value or a deletion event
|
||||
pub fn update(
|
||||
&mut self,
|
||||
this_node: Uuid,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
//! Implements a RangeSeenMarker, a data type used in the PollRange API
|
||||
//! Implements a `RangeSeenMarker`, a data type used in the `PollRange` API
|
||||
//! to indicate which items in the range have already been seen
|
||||
//! and which have not been seen yet.
|
||||
//!
|
||||
@@ -71,7 +71,7 @@ impl RangeSeenMarker {
|
||||
|
||||
pub fn canonicalize(&mut self) {
|
||||
let self_vc = &self.vector_clock;
|
||||
self.items.retain(|_sk, vc| vclock_gt(vc, self_vc))
|
||||
self.items.retain(|_sk, vc| vclock_gt(vc, self_vc));
|
||||
}
|
||||
|
||||
pub fn encode(&mut self) -> Result<String, Error> {
|
||||
|
||||
@@ -27,7 +27,7 @@ mod v08 {
|
||||
/// Configuration for a key
|
||||
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct KeyParams {
|
||||
/// The secret_key associated (immutable)
|
||||
/// The `secret_key` associated (immutable)
|
||||
pub secret_key: String,
|
||||
|
||||
/// Name for the key
|
||||
@@ -73,7 +73,7 @@ mod v2 {
|
||||
/// Key's creation date, if known (older versions of Garage didn't keep track
|
||||
/// of this information)
|
||||
pub created: Option<u64>,
|
||||
/// The secret_key associated (immutable)
|
||||
/// The `secret_key` associated (immutable)
|
||||
pub secret_key: String,
|
||||
|
||||
/// Name for the key
|
||||
|
||||
@@ -65,7 +65,11 @@ pub fn register_bg_vars(
|
||||
vars: &mut vars::BgVars,
|
||||
) {
|
||||
vars.register_ro(persister, "lifecycle-last-completed", |p| {
|
||||
p.get_with(|x| x.last_completed.clone().unwrap_or("never".to_string()))
|
||||
p.get_with(|x| {
|
||||
x.last_completed
|
||||
.clone()
|
||||
.unwrap_or_else(|| "never".to_string())
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -31,12 +31,12 @@ mod v09 {
|
||||
/// The timestamp at which the multipart upload was created
|
||||
pub timestamp: u64,
|
||||
/// Is this multipart upload deleted
|
||||
/// The MultipartUpload is marked as deleted as soon as the
|
||||
/// The `MultipartUpload` is marked as deleted as soon as the
|
||||
/// multipart upload is either completed or aborted
|
||||
pub deleted: crdt::Bool,
|
||||
/// List of uploaded parts, key = (part number, timestamp)
|
||||
/// In case of retries, all versions for each part are kept
|
||||
/// Everything is cleaned up only once the MultipartUpload is marked deleted
|
||||
/// Everything is cleaned up only once the `MultipartUpload` is marked deleted
|
||||
pub parts: crdt::Map<MpuPartKey, MpuPart>,
|
||||
|
||||
// Back link to bucket+key so that we can find the object this mpu
|
||||
@@ -58,9 +58,9 @@ mod v09 {
|
||||
/// The version of an uploaded part
|
||||
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct MpuPart {
|
||||
/// Links to a Version in VersionTable
|
||||
/// Links to a Version in `VersionTable`
|
||||
pub version: Uuid,
|
||||
/// ETag of the content of this part (known only once done uploading)
|
||||
/// `ETag` of the content of this part (known only once done uploading)
|
||||
pub etag: Option<String>,
|
||||
/// Checksum requested by x-amz-checksum-algorithm
|
||||
#[serde(default)]
|
||||
|
||||
@@ -249,7 +249,7 @@ mod v010 {
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum ObjectVersionEncryption {
|
||||
SseC {
|
||||
/// Encrypted serialized ObjectVersionInner struct.
|
||||
/// Encrypted serialized `ObjectVersionInner` struct.
|
||||
/// This is never compressed, just encrypted using AES256-GCM.
|
||||
#[serde(with = "serde_bytes")]
|
||||
inner: Vec<u8>,
|
||||
@@ -460,7 +460,7 @@ mod v2 {
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum ObjectVersionEncryption {
|
||||
SseC {
|
||||
/// Encrypted serialized ObjectVersionInner struct.
|
||||
/// Encrypted serialized `ObjectVersionInner` struct.
|
||||
/// This is never compressed, just encrypted using AES256-GCM.
|
||||
#[serde(with = "serde_bytes")]
|
||||
inner: Vec<u8>,
|
||||
@@ -480,7 +480,7 @@ mod v2 {
|
||||
}
|
||||
|
||||
/// Vector of headers, as tuples of the format (header name, header value)
|
||||
/// Note: checksum can be Some(_) with checksum_type = None for objects that
|
||||
/// Note: checksum can be Some(_) with `checksum_type` = None for objects that
|
||||
/// have been migrated from Garage version before v2.0, as the distinction between
|
||||
/// full-object and composite checksums was not implemented yet.
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||
@@ -665,9 +665,9 @@ impl ObjectVersion {
|
||||
|
||||
/// Is the object version currently being uploaded
|
||||
///
|
||||
/// matches only multipart uploads if check_multipart is Some(true)
|
||||
/// matches only non-multipart uploads if check_multipart is Some(false)
|
||||
/// matches both if check_multipart is None
|
||||
/// matches only multipart uploads if `check_multipart` is Some(true)
|
||||
/// matches only non-multipart uploads if `check_multipart` is Some(false)
|
||||
/// matches both if `check_multipart` is None
|
||||
pub fn is_uploading(&self, check_multipart: Option<bool>) -> bool {
|
||||
match &self.state {
|
||||
ObjectVersionState::Uploading { multipart, .. } => {
|
||||
@@ -763,9 +763,9 @@ pub enum ObjectFilter {
|
||||
IsData,
|
||||
/// Is the object version currently being uploaded
|
||||
///
|
||||
/// matches only multipart uploads if check_multipart is Some(true)
|
||||
/// matches only non-multipart uploads if check_multipart is Some(false)
|
||||
/// matches both if check_multipart is None
|
||||
/// matches only multipart uploads if `check_multipart` is Some(true)
|
||||
/// matches only non-multipart uploads if `check_multipart` is Some(false)
|
||||
/// matches both if `check_multipart` is None
|
||||
IsUploading { check_multipart: Option<bool> },
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ static SNAPSHOT_MUTEX: Mutex<()> = Mutex::new(());
|
||||
|
||||
// ================ snapshotting logic =====================
|
||||
|
||||
/// Run snapshot_metadata in a blocking thread and async await on it
|
||||
/// Run `snapshot_metadata` in a blocking thread and async await on it
|
||||
pub async fn async_snapshot_metadata(garage: &Arc<Garage>) -> Result<(), Error> {
|
||||
let garage = garage.clone();
|
||||
let worker = tokio::task::spawn_blocking(move || snapshot_metadata(&garage));
|
||||
|
||||
@@ -42,3 +42,6 @@ opentelemetry-contrib = { workspace = true, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_env_logger.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -17,7 +17,7 @@ pub struct BytesBuf {
|
||||
}
|
||||
|
||||
impl BytesBuf {
|
||||
/// Creates a new empty BytesBuf
|
||||
/// Creates a new empty `BytesBuf`
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
buf: VecDeque::new(),
|
||||
@@ -25,13 +25,13 @@ impl BytesBuf {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of bytes stored in the BytesBuf
|
||||
/// Returns the number of bytes stored in the `BytesBuf`
|
||||
#[inline]
|
||||
pub fn len(&self) -> usize {
|
||||
self.buf_len
|
||||
}
|
||||
|
||||
/// Returns true iff the BytesBuf contains zero bytes
|
||||
/// Returns true iff the `BytesBuf` contains zero bytes
|
||||
#[inline]
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.buf_len == 0
|
||||
@@ -63,7 +63,7 @@ impl BytesBuf {
|
||||
}
|
||||
}
|
||||
|
||||
/// Takes at most max_len bytes from the left of the buffer
|
||||
/// Takes at most `max_len` bytes from the left of the buffer
|
||||
pub fn take_max(&mut self, max_len: usize) -> Bytes {
|
||||
if self.buf_len <= max_len {
|
||||
self.take_all()
|
||||
@@ -73,7 +73,7 @@ impl BytesBuf {
|
||||
}
|
||||
|
||||
/// Take exactly len bytes from the left of the buffer, returns None if
|
||||
/// the BytesBuf doesn't contain enough data
|
||||
/// the `BytesBuf` doesn't contain enough data
|
||||
pub fn take_exact(&mut self, len: usize) -> Option<Bytes> {
|
||||
if self.buf_len < len {
|
||||
None
|
||||
|
||||
@@ -277,7 +277,7 @@ impl Stream for CancelOnDropStream {
|
||||
let res = this.stream.poll_next(cx);
|
||||
if matches!(res, Poll::Ready(None)) {
|
||||
if let Some(c) = this.cancel.take() {
|
||||
std::mem::forget(c)
|
||||
std::mem::forget(c);
|
||||
}
|
||||
}
|
||||
res
|
||||
|
||||
@@ -72,7 +72,7 @@ where
|
||||
fn log_err(self, msg: &'static str) {
|
||||
if let Err(e) = self {
|
||||
error!("Error: {}: {}", msg, Into::<Error>::into(e));
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -101,9 +101,9 @@ pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static
|
||||
// ----
|
||||
|
||||
/// The `Req<M>` is a helper object used to create requests and attach them
|
||||
/// a stream of data. If the stream is a fixed Bytes and not a ByteStream,
|
||||
/// a stream of data. If the stream is a fixed Bytes and not a `ByteStream`,
|
||||
/// `Req<M>` is cheaply cloneable to allow the request to be sent to different
|
||||
/// peers (Clone will panic if the stream is a ByteStream).
|
||||
/// peers (Clone will panic if the stream is a `ByteStream`).
|
||||
pub struct Req<M: Message> {
|
||||
pub(crate) msg: Arc<M>,
|
||||
pub(crate) msg_ser: Option<Bytes>,
|
||||
@@ -382,7 +382,7 @@ impl AttachedStream {
|
||||
|
||||
// ---- ----
|
||||
|
||||
/// Encoding for requests into a ByteStream:
|
||||
/// Encoding for requests into a `ByteStream`:
|
||||
/// - priority: u8
|
||||
/// - path length: u8
|
||||
/// - path: [u8; path length]
|
||||
@@ -457,7 +457,7 @@ impl ReqEnc {
|
||||
}
|
||||
}
|
||||
|
||||
/// Encoding for responses into a ByteStream:
|
||||
/// Encoding for responses into a `ByteStream`:
|
||||
///
|
||||
/// IF SUCCESS:
|
||||
/// - 0: u8
|
||||
@@ -468,7 +468,7 @@ impl ReqEnc {
|
||||
/// IF ERROR:
|
||||
/// - message length + 1: u8
|
||||
/// - error code: u8
|
||||
/// - message: [u8; message_length]
|
||||
/// - message: [u8; `message_length`]
|
||||
pub(crate) struct RespEnc {
|
||||
msg: Bytes,
|
||||
stream: Option<ByteStream>,
|
||||
|
||||
@@ -34,12 +34,12 @@ pub type NetworkKey = sodiumoxide::crypto::auth::Key;
|
||||
/// composed of 8 bytes for Netapp version and 8 bytes for client version
|
||||
pub(crate) type VersionTag = [u8; 16];
|
||||
|
||||
/// Value of garage_net version used in the version tag
|
||||
/// We are no longer using prefix `netapp` as garage_net is forked from the netapp crate.
|
||||
/// Since Garage v1.0, we have replaced the prefix by `grgnet` (shorthand for garage_net).
|
||||
/// Value of `garage_net` version used in the version tag
|
||||
/// We are no longer using prefix `netapp` as `garage_net` is forked from the netapp crate.
|
||||
/// Since Garage v1.0, we have replaced the prefix by `grgnet` (shorthand for `garage_net`).
|
||||
pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6772676e65740010; // grgnet 0x0010 (1.0)
|
||||
|
||||
/// HelloMessage is sent by the client on a Netapp connection to indicate
|
||||
/// `HelloMessage` is sent by the client on a Netapp connection to indicate
|
||||
/// that they are also a server and ready to receive incoming connections
|
||||
/// at the specified address and port. If the client doesn't know their
|
||||
/// public address, they don't need to specify it and we look at the
|
||||
@@ -57,9 +57,9 @@ impl Message for HelloMessage {
|
||||
type OnConnectHandler = Box<dyn Fn(NodeID, SocketAddr, bool) + Send + Sync>;
|
||||
type OnDisconnectHandler = Box<dyn Fn(NodeID, bool) + Send + Sync>;
|
||||
|
||||
/// NetApp is the main class that handles incoming and outgoing connections.
|
||||
/// `NetApp` is the main class that handles incoming and outgoing connections.
|
||||
///
|
||||
/// NetApp can be used in a stand-alone fashion or together with a peering strategy.
|
||||
/// `NetApp` can be used in a stand-alone fashion or together with a peering strategy.
|
||||
/// If using it alone, you will want to set `on_connect` and `on_disconnect` events
|
||||
/// in order to manage information about the current peer list.
|
||||
pub struct NetApp {
|
||||
@@ -91,7 +91,7 @@ struct ListenParams {
|
||||
}
|
||||
|
||||
impl NetApp {
|
||||
/// Creates a new instance of NetApp, which can serve either as a full p2p node,
|
||||
/// Creates a new instance of `NetApp`, which can serve either as a full p2p node,
|
||||
/// or just as a passive client. To upgrade to a full p2p node, spawn a listener
|
||||
/// using `.listen()`
|
||||
///
|
||||
@@ -180,13 +180,13 @@ impl NetApp {
|
||||
.is_some()
|
||||
{
|
||||
panic!("Redefining endpoint: {}", path);
|
||||
};
|
||||
}
|
||||
endpoint
|
||||
}
|
||||
|
||||
/// Main listening process for our app. This future runs during the whole
|
||||
/// run time of our application.
|
||||
/// If this is not called, the NetApp instance remains a passive client.
|
||||
/// If this is not called, the `NetApp` instance remains a passive client.
|
||||
pub async fn listen(
|
||||
self: Arc<Self>,
|
||||
listen_addr: SocketAddr,
|
||||
|
||||
@@ -119,7 +119,7 @@ impl PeerInfo {
|
||||
}
|
||||
}
|
||||
|
||||
/// PeerConnState: possible states for our tentative connections to given peer
|
||||
/// `PeerConnState`: possible states for our tentative connections to given peer
|
||||
/// This structure is only interested in recording connection info for outgoing
|
||||
/// TCP connections
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
|
||||
@@ -42,8 +42,8 @@ impl Drop for Sender {
|
||||
}
|
||||
}
|
||||
|
||||
/// The RecvLoop trait, which is implemented both by the client and the server
|
||||
/// connection objects (ServerConn and ClientConn) adds a method `.recv_loop()`
|
||||
/// The `RecvLoop` trait, which is implemented both by the client and the server
|
||||
/// connection objects (`ServerConn` and `ClientConn`) adds a method `.recv_loop()`
|
||||
/// and a prototype of a handler for received messages `.recv_handler()` that
|
||||
/// must be filled by implementors. `.recv_loop()` receives messages in a loop
|
||||
/// according to the protocol defined above: chunks of message in progress of being
|
||||
@@ -70,7 +70,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
|
||||
Ok(_) => (),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
}
|
||||
let id = RequestID::from_be_bytes(header_id);
|
||||
|
||||
let mut header_size = [0u8; ChunkLength::BITS as usize / 8];
|
||||
|
||||
@@ -264,8 +264,8 @@ impl DataFrame {
|
||||
}
|
||||
}
|
||||
|
||||
/// The SendLoop trait, which is implemented both by the client and the server
|
||||
/// connection objects (ServerConna and ClientConn) adds a method `.send_loop()`
|
||||
/// The `SendLoop` trait, which is implemented both by the client and the server
|
||||
/// connection objects (`ServerConna` and `ClientConn`) adds a method `.send_loop()`
|
||||
/// that takes a channel of messages to send and an asynchronous writer,
|
||||
/// and sends messages from the channel to the async writer, putting them in a queue
|
||||
/// before being sent and doing the round-robin sending strategy.
|
||||
@@ -319,7 +319,7 @@ pub(crate) trait SendLoop: Sync {
|
||||
order_tag,
|
||||
data: ByteStreamReader::new(data),
|
||||
sent: 0,
|
||||
})
|
||||
});
|
||||
}
|
||||
Some(SendItem::Cancel(id)) => {
|
||||
trace!("send_loop({}): cancelling {}", debug_name, id);
|
||||
@@ -332,7 +332,7 @@ pub(crate) trait SendLoop: Sync {
|
||||
None => {
|
||||
msg_recv = None;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
(id, data) = send_fut => {
|
||||
trace!(
|
||||
|
||||
@@ -14,18 +14,18 @@ use crate::bytes_buf::BytesBuf;
|
||||
/// When sent through Netapp, the Vec may be split in smaller chunk in such a way
|
||||
/// consecutive Vec may get merged, but Vec and error code may not be reordered
|
||||
///
|
||||
/// Items sent in the ByteStream may be errors of type `std::io::Error`.
|
||||
/// An error indicates the end of the ByteStream: a reader should no longer read
|
||||
/// Items sent in the `ByteStream` may be errors of type `std::io::Error`.
|
||||
/// An error indicates the end of the `ByteStream`: a reader should no longer read
|
||||
/// after receiving an error, and a writer should stop writing after sending an error.
|
||||
pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>;
|
||||
|
||||
/// A packet sent in a ByteStream, which may contain either
|
||||
/// A packet sent in a `ByteStream`, which may contain either
|
||||
/// a Bytes object or an error
|
||||
pub type Packet = Result<Bytes, std::io::Error>;
|
||||
|
||||
// ----
|
||||
|
||||
/// A helper struct to read defined lengths of data from a BytesStream
|
||||
/// A helper struct to read defined lengths of data from a `BytesStream`
|
||||
pub struct ByteStreamReader {
|
||||
stream: ByteStream,
|
||||
buf: BytesBuf,
|
||||
@@ -201,7 +201,7 @@ pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 's
|
||||
tokio_util::io::StreamReader::new(stream)
|
||||
}
|
||||
|
||||
/// Reads all of the content of a `ByteStream` into a BytesBuf
|
||||
/// Reads all of the content of a `ByteStream` into a `BytesBuf`
|
||||
/// that contains everything
|
||||
pub async fn read_stream_to_end(mut stream: ByteStream) -> Result<BytesBuf, std::io::Error> {
|
||||
let mut buf = BytesBuf::new();
|
||||
|
||||
@@ -15,12 +15,12 @@ use crate::NodeID;
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn test_with_basic_scheduler() {
|
||||
pretty_env_logger::init();
|
||||
run_test(19980).await
|
||||
run_test(19980).await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn test_with_threaded_scheduler() {
|
||||
run_test(19990).await
|
||||
run_test(19990).await;
|
||||
}
|
||||
|
||||
async fn run_test(port_base: u16) {
|
||||
|
||||
@@ -7,7 +7,7 @@ use tokio::sync::watch;
|
||||
|
||||
use crate::netapp::*;
|
||||
|
||||
/// Utility function: encodes any serializable value in MessagePack binary format
|
||||
/// Utility function: encodes any serializable value in `MessagePack` binary format
|
||||
/// using the RMP library.
|
||||
///
|
||||
/// Field names and variant names are included in the serialization.
|
||||
@@ -80,7 +80,7 @@ pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec<SocketAddr
|
||||
Some((pubkey, hosts))
|
||||
}
|
||||
|
||||
/// async version of parse_and_resolve_peer_addr
|
||||
/// async version of `parse_and_resolve_peer_addr`
|
||||
pub async fn parse_and_resolve_peer_addr_async(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> {
|
||||
let delim = peer.find('@')?;
|
||||
let (key, host) = peer.split_at(delim);
|
||||
|
||||
@@ -51,3 +51,6 @@ opentelemetry.workspace = true
|
||||
kubernetes-discovery = ["kube", "k8s-openapi", "schemars"]
|
||||
consul-discovery = ["reqwest", "thiserror"]
|
||||
system-libs = ["sodiumoxide/use-pkg-config"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -118,7 +118,7 @@ impl ConsulDiscovery {
|
||||
builder = builder.default_headers(headers);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let client: reqwest::Client = builder.build()?;
|
||||
|
||||
|
||||
@@ -66,7 +66,7 @@ pub async fn get_kubernetes_nodes(
|
||||
.and_then(|k| NodeID::from_slice(&k[..]));
|
||||
|
||||
if let Some(pubkey) = pubkey {
|
||||
ret.push((*pubkey, SocketAddr::new(node.spec.address, node.spec.port)))
|
||||
ret.push((*pubkey, SocketAddr::new(node.spec.address, node.spec.port)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -108,7 +108,7 @@ pub async fn publish_kubernetes_node(
|
||||
.await?;
|
||||
} else {
|
||||
nodes.create(&PostParams::default(), &node).await?;
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ impl Edge for WeightedEdge {}
|
||||
|
||||
/// Struct for the graph structure. We do encapsulation here to be able to both
|
||||
/// provide user friendly Vertex enum to address vertices, and to use internally usize
|
||||
/// indices and Vec instead of HashMap in the graph algorithm to optimize execution speed.
|
||||
/// indices and Vec instead of `HashMap` in the graph algorithm to optimize execution speed.
|
||||
pub struct Graph<E: Edge> {
|
||||
vertex_to_id: HashMap<Vertex, usize>,
|
||||
id_to_vertex: Vec<Vertex>,
|
||||
@@ -253,7 +253,7 @@ impl Graph<FlowEdge> {
|
||||
|
||||
/// This function takes a flow, and a cost function on the edges, and tries to find an
|
||||
/// equivalent flow with a better cost, by finding improving overflow cycles. It uses
|
||||
/// as subroutine the Bellman Ford algorithm run up to path_length.
|
||||
/// as subroutine the Bellman Ford algorithm run up to `path_length`.
|
||||
/// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and
|
||||
/// only one needs to be present in the cost function.
|
||||
pub fn optimize_flow_with_cost(
|
||||
@@ -290,7 +290,7 @@ impl Graph<FlowEdge> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Construct the weighted graph G_f from the flow and the cost function
|
||||
/// Construct the weighted graph `G_f` from the flow and the cost function
|
||||
fn build_cost_graph(&self, cost: &CostFunction) -> Result<Graph<WeightedEdge>, String> {
|
||||
let mut g = Graph::<WeightedEdge>::new(&self.id_to_vertex);
|
||||
let nb_vertices = self.id_to_vertex.len();
|
||||
@@ -323,11 +323,11 @@ impl Graph<WeightedEdge> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This function lists the negative cycles it manages to find after path_length
|
||||
/// This function lists the negative cycles it manages to find after `path_length`
|
||||
/// iterations of the main loop of the Bellman-Ford algorithm. For the classical
|
||||
/// algorithm, path_length needs to be equal to the number of vertices. However,
|
||||
/// algorithm, `path_length` needs to be equal to the number of vertices. However,
|
||||
/// for particular graph structures like in our case, the algorithm is still correct
|
||||
/// when path_length is the length of the longest possible simple path.
|
||||
/// when `path_length` is the length of the longest possible simple path.
|
||||
/// See the formal description of the algorithm for more details.
|
||||
fn list_negative_cycles(&self, path_length: usize) -> Vec<Vec<Vertex>> {
|
||||
let nb_vertices = self.graph.len();
|
||||
|
||||
@@ -162,14 +162,14 @@ impl LayoutHelper {
|
||||
}
|
||||
|
||||
/// Returns the latest layout version for which it is safe to read data from,
|
||||
/// i.e. the version whose version number is sync_map_min
|
||||
/// i.e. the version whose version number is `sync_map_min`
|
||||
pub fn read_version(&self) -> Result<&LayoutVersion, Error> {
|
||||
let sync_min = self.sync_map_min;
|
||||
let versions = self.versions()?;
|
||||
Ok(versions
|
||||
.iter()
|
||||
.find(|x| x.version == sync_min)
|
||||
.or(versions.last())
|
||||
.or_else(|| versions.last())
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
@@ -271,7 +271,7 @@ impl LayoutHelper {
|
||||
.map(|x| x.load(Ordering::Relaxed) == 0)
|
||||
.unwrap_or(true)
|
||||
})
|
||||
.unwrap_or(self.inner().current().version);
|
||||
.unwrap_or_else(|| self.inner().current().version);
|
||||
let changed = self.update(|layout| {
|
||||
layout
|
||||
.update_trackers
|
||||
|
||||
@@ -23,7 +23,7 @@ pub use version::*;
|
||||
|
||||
/// A partition id, which is stored on 16 bits
|
||||
/// i.e. we have up to 2**16 partitions.
|
||||
/// (in practice we have exactly 2**PARTITION_BITS partitions)
|
||||
/// (in practice we have exactly 2**`PARTITION_BITS` partitions)
|
||||
pub type Partition = u16;
|
||||
|
||||
// TODO: make this constant parametrizable in the config file
|
||||
@@ -114,7 +114,7 @@ mod v09 {
|
||||
/// to know to what extent does it change with the layout update.
|
||||
pub partition_size: u64,
|
||||
/// Parameters used to compute the assignment currently given by
|
||||
/// ring_assignment_data
|
||||
/// `ring_assignment_data`
|
||||
pub parameters: LayoutParameters,
|
||||
|
||||
pub roles: LwwMap<Uuid, NodeRoleV>,
|
||||
@@ -229,7 +229,7 @@ mod v010 {
|
||||
use std::collections::BTreeMap;
|
||||
pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy};
|
||||
|
||||
/// Number of old (non-live) versions to keep, see LayoutHistory::old_versions
|
||||
/// Number of old (non-live) versions to keep, see `LayoutHistory::old_versions`
|
||||
pub const OLD_VERSION_COUNT: usize = 5;
|
||||
|
||||
/// The history of cluster layouts, with trackers to keep a record
|
||||
@@ -238,8 +238,8 @@ mod v010 {
|
||||
pub struct LayoutHistory {
|
||||
/// The versions currently in use in the cluster
|
||||
pub versions: Vec<LayoutVersion>,
|
||||
/// At most 5 of the previous versions, not used by the garage_table
|
||||
/// module, but useful for the garage_block module to find data blocks
|
||||
/// At most 5 of the previous versions, not used by the `garage_table`
|
||||
/// module, but useful for the `garage_block` module to find data blocks
|
||||
/// that have not yet been moved
|
||||
pub old_versions: Vec<LayoutVersion>,
|
||||
|
||||
@@ -260,7 +260,7 @@ mod v010 {
|
||||
/// Roles assigned to nodes in this version
|
||||
pub roles: LwwMap<Uuid, NodeRoleV>,
|
||||
/// Parameters used to compute the assignment currently given by
|
||||
/// ring_assignment_data
|
||||
/// `ring_assignment_data`
|
||||
pub parameters: LayoutParameters,
|
||||
|
||||
/// The number of replicas for each data partition
|
||||
@@ -269,17 +269,17 @@ mod v010 {
|
||||
/// to know to what extent does it change with the layout update.
|
||||
pub partition_size: u64,
|
||||
|
||||
/// node_id_vec: a vector of node IDs with a role assigned
|
||||
/// `node_id_vec`: a vector of node IDs with a role assigned
|
||||
/// in the system (this includes gateway nodes).
|
||||
/// The order here is different than the vec stored by `roles`, because:
|
||||
/// 1. non-gateway nodes are first so that they have lower numbers
|
||||
/// 2. nodes that don't have a role are excluded (but they need to
|
||||
/// stay in the CRDT as tombstones)
|
||||
pub node_id_vec: Vec<Uuid>,
|
||||
/// number of non-gateway nodes, which are the first ids in node_id_vec
|
||||
/// number of non-gateway nodes, which are the first ids in `node_id_vec`
|
||||
pub nongateway_node_count: usize,
|
||||
/// The assignation of data partitions to nodes, the values
|
||||
/// are indices in node_id_vec
|
||||
/// are indices in `node_id_vec`
|
||||
#[serde(with = "serde_bytes")]
|
||||
pub ring_assignment_data: Vec<CompactNodeType>,
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ use std::cmp::min;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use garage_util::crdt::Crdt;
|
||||
use garage_util::error::*;
|
||||
|
||||
use crate::layout::*;
|
||||
use crate::replication_mode::ReplicationFactor;
|
||||
@@ -21,14 +20,14 @@ use crate::replication_mode::ReplicationFactor;
|
||||
// number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2)
|
||||
// With these parameters, the naive algo fails, whereas there is a solution:
|
||||
// (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E)
|
||||
fn check_against_naive(cl: &LayoutVersion) -> Result<bool, Error> {
|
||||
fn check_against_naive(cl: &LayoutVersion) -> bool {
|
||||
let over_size = cl.partition_size + 1;
|
||||
let mut zone_token = HashMap::<String, usize>::new();
|
||||
|
||||
let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?;
|
||||
let (zones, zone_to_id) = cl.generate_nongateway_zone_ids();
|
||||
|
||||
if zones.is_empty() {
|
||||
return Ok(false);
|
||||
return false;
|
||||
}
|
||||
|
||||
for z in zones.iter() {
|
||||
@@ -66,7 +65,7 @@ fn check_against_naive(cl: &LayoutVersion) -> Result<bool, Error> {
|
||||
{
|
||||
curr_zone += 1;
|
||||
if curr_zone >= zones.len() {
|
||||
return Ok(true);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
id_zone_token[curr_zone] -= 1;
|
||||
@@ -77,7 +76,7 @@ fn check_against_naive(cl: &LayoutVersion) -> Result<bool, Error> {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
false
|
||||
}
|
||||
|
||||
fn show_msg(msg: &Message) {
|
||||
@@ -127,7 +126,7 @@ fn test_assignment() {
|
||||
let (mut cl, msg) = cl.apply_staged_changes(v + 1).unwrap();
|
||||
show_msg(&msg);
|
||||
assert_eq!(cl.check(), Ok(()));
|
||||
assert!(check_against_naive(cl.current()).unwrap());
|
||||
assert!(check_against_naive(cl.current()));
|
||||
|
||||
node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000];
|
||||
node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"];
|
||||
@@ -136,7 +135,7 @@ fn test_assignment() {
|
||||
let (mut cl, msg) = cl.apply_staged_changes(v + 1).unwrap();
|
||||
show_msg(&msg);
|
||||
assert_eq!(cl.check(), Ok(()));
|
||||
assert!(check_against_naive(cl.current()).unwrap());
|
||||
assert!(check_against_naive(cl.current()));
|
||||
|
||||
node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000];
|
||||
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3);
|
||||
@@ -144,7 +143,7 @@ fn test_assignment() {
|
||||
let (mut cl, msg) = cl.apply_staged_changes(v + 1).unwrap();
|
||||
show_msg(&msg);
|
||||
assert_eq!(cl.check(), Ok(()));
|
||||
assert!(check_against_naive(cl.current()).unwrap());
|
||||
assert!(check_against_naive(cl.current()));
|
||||
|
||||
node_capacity_vec = vec![
|
||||
4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000,
|
||||
@@ -154,5 +153,5 @@ fn test_assignment() {
|
||||
let (cl, msg) = cl.apply_staged_changes(v + 1).unwrap();
|
||||
show_msg(&msg);
|
||||
assert_eq!(cl.check(), Ok(()));
|
||||
assert!(check_against_naive(cl.current()).unwrap());
|
||||
assert!(check_against_naive(cl.current()));
|
||||
}
|
||||
|
||||
@@ -85,7 +85,7 @@ impl LayoutVersion {
|
||||
let mut count = 0;
|
||||
for nod in self.ring_assignment_data.iter() {
|
||||
if i as u8 == *nod {
|
||||
count += 1
|
||||
count += 1;
|
||||
}
|
||||
}
|
||||
return Ok(count);
|
||||
@@ -164,7 +164,7 @@ impl LayoutVersion {
|
||||
total_capacity
|
||||
}
|
||||
|
||||
/// Returns the effective value of the zone_redundancy parameter
|
||||
/// Returns the effective value of the `zone_redundancy` parameter
|
||||
pub(crate) fn effective_zone_redundancy(&self) -> usize {
|
||||
match self.parameters.zone_redundancy {
|
||||
ZoneRedundancy::AtLeast(v) => v,
|
||||
@@ -271,7 +271,7 @@ impl LayoutVersion {
|
||||
// Check that the partition size stored is the one computed by the asignation
|
||||
// algorithm.
|
||||
let cl2 = self.clone();
|
||||
let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().unwrap();
|
||||
let (_, zone_to_id) = cl2.generate_nongateway_zone_ids();
|
||||
match cl2.compute_optimal_partition_size(&zone_to_id, zone_redundancy) {
|
||||
Ok(s) if s != self.partition_size => {
|
||||
return Err(format!(
|
||||
@@ -311,7 +311,7 @@ impl LayoutVersion {
|
||||
/// the former assignment (if any) to minimize the amount of
|
||||
/// data to be moved.
|
||||
/// Staged role changes must be merged with nodes roles before calling this function,
|
||||
/// hence it must only be called from apply_staged_changes() and hence is not public.
|
||||
/// hence it must only be called from `apply_staged_changes()` and hence is not public.
|
||||
fn calculate_partition_assignment(&mut self) -> Result<Message, Error> {
|
||||
// We update the node ids, since the node role list might have changed with the
|
||||
// changes in the layout. We retrieve the old_assignment reframed with new ids
|
||||
@@ -330,7 +330,7 @@ impl LayoutVersion {
|
||||
|
||||
// We generate for once numerical ids for the zones of non gateway nodes,
|
||||
// to use them as indices in the flow graphs.
|
||||
let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?;
|
||||
let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids();
|
||||
|
||||
if self.nongateway_nodes().len() < self.replication_factor {
|
||||
return Err(Error::Message(format!(
|
||||
@@ -402,11 +402,11 @@ impl LayoutVersion {
|
||||
Ok(msg)
|
||||
}
|
||||
|
||||
/// The LwwMap of node roles might have changed. This function updates the node_id_vec
|
||||
/// The `LwwMap` of node roles might have changed. This function updates the `node_id_vec`
|
||||
/// and returns the assignment given by ring, with the new indices of the nodes, and
|
||||
/// None if the node is not present anymore.
|
||||
/// We work with the assumption that only this function and calculate_new_assignment
|
||||
/// do modify assignment_ring and node_id_vec.
|
||||
/// We work with the assumption that only this function and `calculate_new_assignment`
|
||||
/// do modify `assignment_ring` and `node_id_vec`.
|
||||
fn update_node_id_vec(&mut self) -> Result<Option<Vec<Vec<usize>>>, Error> {
|
||||
// (1) We compute the new node list
|
||||
// Non gateway nodes should be coded on 8bits, hence they must be first in the list
|
||||
@@ -488,10 +488,8 @@ impl LayoutVersion {
|
||||
}
|
||||
|
||||
/// This function generates ids for the zone of the nodes appearing in
|
||||
/// self.node_id_vec.
|
||||
pub(crate) fn generate_nongateway_zone_ids(
|
||||
&self,
|
||||
) -> Result<(Vec<String>, HashMap<String, usize>), Error> {
|
||||
/// `self.node_id_vec`.
|
||||
pub(crate) fn generate_nongateway_zone_ids(&self) -> (Vec<String>, HashMap<String, usize>) {
|
||||
let mut id_to_zone = Vec::<String>::new();
|
||||
let mut zone_to_id = HashMap::<String, usize>::new();
|
||||
|
||||
@@ -502,7 +500,7 @@ impl LayoutVersion {
|
||||
id_to_zone.push(r.zone.clone());
|
||||
}
|
||||
}
|
||||
Ok((id_to_zone, zone_to_id))
|
||||
(id_to_zone, zone_to_id)
|
||||
}
|
||||
|
||||
/// This function computes by dichotomy the largest realizable partition size, given
|
||||
@@ -527,16 +525,16 @@ impl LayoutVersion {
|
||||
let mut s_up = self.get_total_capacity();
|
||||
while s_down + 1 < s_up {
|
||||
g = self.generate_flow_graph(
|
||||
(s_down + s_up) / 2,
|
||||
u64::midpoint(s_down, s_up),
|
||||
zone_to_id,
|
||||
&empty_set,
|
||||
zone_redundancy,
|
||||
)?;
|
||||
g.compute_maximal_flow()?;
|
||||
if g.get_flow_value()? < (NB_PARTITIONS * self.replication_factor) as i64 {
|
||||
s_up = (s_down + s_up) / 2;
|
||||
s_up = u64::midpoint(s_down, s_up);
|
||||
} else {
|
||||
s_down = (s_down + s_up) / 2;
|
||||
s_down = u64::midpoint(s_down, s_up);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -560,7 +558,7 @@ impl LayoutVersion {
|
||||
|
||||
/// Generates the graph to compute the maximal flow corresponding to the optimal
|
||||
/// partition assignment.
|
||||
/// exclude_assoc is the set of (partition, node) association that we are forbidden
|
||||
/// `exclude_assoc` is the set of (partition, node) association that we are forbidden
|
||||
/// to use (hence we do not add the corresponding edge to the graph). This parameter
|
||||
/// is used to compute a first flow that uses only edges appearing in the previous
|
||||
/// assignment. This produces a solution that heuristically should be close to the
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use opentelemetry::{global, metrics::*};
|
||||
|
||||
/// TableMetrics reference all counter used for metrics
|
||||
/// `TableMetrics` reference all counter used for metrics
|
||||
pub struct RpcMetrics {
|
||||
pub(crate) rpc_counter: Counter<u64>,
|
||||
pub(crate) rpc_timeout_counter: Counter<u64>,
|
||||
|
||||
@@ -66,7 +66,7 @@ impl Clone for RequestStrategy<()> {
|
||||
}
|
||||
|
||||
impl RequestStrategy<()> {
|
||||
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
|
||||
/// Create a `RequestStrategy` with default timeout and not interrupting when quorum reached
|
||||
pub fn with_priority(prio: RequestPriority) -> Self {
|
||||
RequestStrategy {
|
||||
rs_quorum: None,
|
||||
@@ -109,7 +109,7 @@ impl<T> RequestStrategy<T> {
|
||||
self.rs_timeout = Timeout::Custom(timeout);
|
||||
self
|
||||
}
|
||||
/// Extract drop_on_complete item
|
||||
/// Extract `drop_on_complete` item
|
||||
fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) {
|
||||
(
|
||||
RequestStrategy {
|
||||
@@ -272,7 +272,7 @@ impl RpcHelper {
|
||||
/// Make a RPC call to multiple servers, returning either a Vec of responses,
|
||||
/// or an error if quorum could not be reached due to too many errors
|
||||
///
|
||||
/// If RequestStrategy has send_all_at_once set, then all requests will be
|
||||
/// If `RequestStrategy` has `send_all_at_once` set, then all requests will be
|
||||
/// sent at once, and `try_call_many` will return as soon as a quorum of
|
||||
/// responses is achieved, dropping and cancelling the remaining requests.
|
||||
///
|
||||
@@ -374,7 +374,7 @@ impl RpcHelper {
|
||||
// reach quorum, start some new requests.
|
||||
while send_all_at_once || successes.len() + resp_stream.len() < quorum {
|
||||
if let Some(fut) = requests.next() {
|
||||
resp_stream.push(fut)
|
||||
resp_stream.push(fut);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
@@ -413,7 +413,7 @@ impl RpcHelper {
|
||||
/// Make a RPC call to multiple servers, returning either a Vec of responses,
|
||||
/// or an error if quorum could not be reached due to too many errors
|
||||
///
|
||||
/// Contrary to try_call_many, this function is especially made for broadcast
|
||||
/// Contrary to `try_call_many`, this function is especially made for broadcast
|
||||
/// write operations. In particular:
|
||||
///
|
||||
/// - The request are sent to all specified nodes as soon as `try_write_many_sets`
|
||||
@@ -582,7 +582,7 @@ impl RpcHelper {
|
||||
let mut vernodes = vec![];
|
||||
for ver in layout.versions()?.iter() {
|
||||
let nodes = ver.nodes_of(position);
|
||||
vernodes.push(rpc_helper.request_order(current_layout, nodes))
|
||||
vernodes.push(rpc_helper.request_order(current_layout, nodes));
|
||||
}
|
||||
|
||||
let mut ret = if vernodes.len() == 1 {
|
||||
|
||||
@@ -57,7 +57,7 @@ pub enum SystemRpc {
|
||||
Ok,
|
||||
/// Request to connect to a specific node (in `<pubkey>@<host>:<port>` format, pubkey = full-length node ID)
|
||||
Connect(String),
|
||||
/// Advertise Garage status. Answered with another AdvertiseStatus.
|
||||
/// Advertise Garage status. Answered with another `AdvertiseStatus`.
|
||||
/// Exchanged with every node on a regular basis.
|
||||
AdvertiseStatus(NodeStatus),
|
||||
/// Get known nodes states
|
||||
@@ -65,9 +65,9 @@ pub enum SystemRpc {
|
||||
/// Return known nodes
|
||||
ReturnKnownNodes(Vec<KnownNodeInfo>),
|
||||
|
||||
/// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
|
||||
/// Ask other node its cluster layout. Answered with `AdvertiseClusterLayout`
|
||||
PullClusterLayout,
|
||||
/// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
|
||||
/// Advertisement of cluster layout. Sent spontanously or in response to `PullClusterLayout`
|
||||
AdvertiseClusterLayout(LayoutHistory),
|
||||
/// Ask other node its cluster layout update trackers.
|
||||
PullClusterLayoutTrackers,
|
||||
@@ -481,7 +481,7 @@ impl System {
|
||||
.iter()
|
||||
.filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity.is_some()))
|
||||
.map(|(n, _, _)| *n),
|
||||
)
|
||||
);
|
||||
}
|
||||
let storage_nodes_ok = storage_nodes.iter().filter(|x| node_up(x)).count();
|
||||
|
||||
@@ -682,7 +682,7 @@ impl System {
|
||||
|
||||
// Add peer list from list stored on disk
|
||||
if let Ok(peers) = self.persist_peer_list.load_async().await {
|
||||
ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr)))
|
||||
ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr)));
|
||||
}
|
||||
|
||||
// Fetch peer list from Consul
|
||||
@@ -705,9 +705,9 @@ impl System {
|
||||
match create_kubernetes_crd().await {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
error!("Failed to create kubernetes custom resource: {}", e)
|
||||
error!("Failed to create kubernetes custom resource: {}", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
match get_kubernetes_nodes(k).await {
|
||||
@@ -908,7 +908,7 @@ impl NodeStatus {
|
||||
}
|
||||
|
||||
/// Obtain the list of currently available IP addresses on all non-loopback
|
||||
/// interfaces, optionally filtering them to be inside a given IpNet.
|
||||
/// interfaces, optionally filtering them to be inside a given `IpNet`.
|
||||
fn get_default_ip(filter_ipnet: Option<ipnet::IpNet>) -> Option<IpAddr> {
|
||||
pnet_datalink::interfaces()
|
||||
.into_iter()
|
||||
|
||||
@@ -5,7 +5,7 @@ use opentelemetry::{global, metrics::*, KeyValue};
|
||||
|
||||
use crate::system::{ClusterHealthStatus, System};
|
||||
|
||||
/// TableMetrics reference all counter used for metrics
|
||||
/// `TableMetrics` reference all counter used for metrics
|
||||
pub struct SystemMetrics {
|
||||
// Static values
|
||||
pub(crate) _garage_build_info: ValueObserver<u64>,
|
||||
@@ -60,7 +60,7 @@ impl SystemMetrics {
|
||||
KeyValue::new("rustversion", garage_util::version::rust_version()),
|
||||
KeyValue::new("version", garage_util::version::garage_version()),
|
||||
],
|
||||
)
|
||||
);
|
||||
})
|
||||
.with_description("Garage build info")
|
||||
.init(),
|
||||
@@ -68,7 +68,7 @@ impl SystemMetrics {
|
||||
let replication_factor = system.replication_factor;
|
||||
meter
|
||||
.u64_value_observer("garage_replication_factor", move |observer| {
|
||||
observer.observe(usize::from(replication_factor) as u64, &[])
|
||||
observer.observe(usize::from(replication_factor) as u64, &[]);
|
||||
})
|
||||
.with_description("Garage replication factor setting")
|
||||
.init()
|
||||
|
||||
@@ -33,3 +33,6 @@ serde_bytes.workspace = true
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -334,7 +334,7 @@ impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
|
||||
}
|
||||
}
|
||||
|
||||
/// An entry stored in the gc_todo db tree associated with the table
|
||||
/// An entry stored in the `gc_todo` db tree associated with the table
|
||||
/// Contains helper function for parsing, saving, and removing
|
||||
/// such entry in the db
|
||||
///
|
||||
@@ -352,7 +352,7 @@ pub(crate) struct GcTodoEntry {
|
||||
}
|
||||
|
||||
impl GcTodoEntry {
|
||||
/// Creates a new GcTodoEntry (not saved in the db) from its components:
|
||||
/// Creates a new `GcTodoEntry` (not saved in the db) from its components:
|
||||
/// the key of an entry in the table, and the hash of the associated
|
||||
/// serialized value
|
||||
pub(crate) fn new(key: Vec<u8>, value_hash: Hash) -> Self {
|
||||
@@ -364,7 +364,7 @@ impl GcTodoEntry {
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree
|
||||
/// Parses a `GcTodoEntry` from a (k, v) pair stored in the `gc_todo` tree
|
||||
pub(crate) fn parse(db_k: &[u8], db_v: &[u8]) -> Self {
|
||||
Self {
|
||||
tombstone_timestamp: u64::from_be_bytes(db_k[0..8].try_into().unwrap()),
|
||||
@@ -374,13 +374,13 @@ impl GcTodoEntry {
|
||||
}
|
||||
}
|
||||
|
||||
/// Saves the GcTodoEntry in the gc_todo tree
|
||||
/// Saves the `GcTodoEntry` in the `gc_todo` tree
|
||||
pub(crate) fn save(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
|
||||
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes the GcTodoEntry from the gc_todo tree if the
|
||||
/// Removes the `GcTodoEntry` from the `gc_todo` tree if the
|
||||
/// hash of the serialized value is the same here as in the tree.
|
||||
/// This is useful to remove a todo entry only under the condition
|
||||
/// that it has not changed since the time it was read, i.e.
|
||||
|
||||
@@ -2,7 +2,7 @@ use opentelemetry::{global, metrics::*, KeyValue};
|
||||
|
||||
use garage_db as db;
|
||||
|
||||
/// TableMetrics reference all counter used for metrics
|
||||
/// `TableMetrics` reference all counter used for metrics
|
||||
pub struct TableMetrics {
|
||||
pub(crate) _table_size: ValueObserver<u64>,
|
||||
pub(crate) _merkle_tree_size: ValueObserver<u64>,
|
||||
|
||||
@@ -22,7 +22,7 @@ impl PartitionKey for String {
|
||||
}
|
||||
}
|
||||
|
||||
/// Values of type FixedBytes32 are assumed to be random,
|
||||
/// Values of type `FixedBytes32` are assumed to be random,
|
||||
/// either a hash or a random UUID. This means we can use
|
||||
/// them directly as an index into the hash table.
|
||||
impl PartitionKey for FixedBytes32 {
|
||||
|
||||
@@ -52,3 +52,6 @@ mktemp.workspace = true
|
||||
|
||||
[features]
|
||||
k2v = []
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -27,7 +27,7 @@ pub struct WorkerInfo {
|
||||
pub last_error: Option<(String, u64)>,
|
||||
}
|
||||
|
||||
/// WorkerStatus is a struct returned by the worker with a bunch of canonical
|
||||
/// `WorkerStatus` is a struct returned by the worker with a bunch of canonical
|
||||
/// fields to indicate their status to CLI users. All fields are optional.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct WorkerStatus {
|
||||
@@ -39,7 +39,7 @@ pub struct WorkerStatus {
|
||||
}
|
||||
|
||||
impl BackgroundRunner {
|
||||
/// Create a new BackgroundRunner
|
||||
/// Create a new `BackgroundRunner`
|
||||
pub fn new(stop_signal: watch::Receiver<bool>) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
|
||||
let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
|
||||
|
||||
|
||||
@@ -34,11 +34,11 @@ pub trait Worker: Send {
|
||||
}
|
||||
|
||||
/// Work: do a basic unit of work, if one is available (otherwise, should return
|
||||
/// WorkerState::Idle immediately). We will do our best to not interrupt this future in the
|
||||
/// `WorkerState::Idle` immediately). We will do our best to not interrupt this future in the
|
||||
/// middle of processing, it will only be interrupted at the last minute when Garage is trying
|
||||
/// to exit and this hasn't returned yet. This function may return an error to indicate that
|
||||
/// its unit of work could not be processed due to an error: the error will be logged and
|
||||
/// .work() will be called again after a short delay.
|
||||
/// .`work()` will be called again after a short delay.
|
||||
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>;
|
||||
|
||||
/// Wait for work: await for some task to become available. This future can be interrupted in
|
||||
|
||||
@@ -47,7 +47,7 @@ pub struct Config {
|
||||
|
||||
/// Maximum number of parallel block writes per PUT request
|
||||
/// Higher values improve throughput but increase memory usage
|
||||
/// Default: 3, Recommended: 10-30 for NVMe, 3-10 for HDD
|
||||
/// Default: 3, Recommended: 10-30 for `NVMe`, 3-10 for HDD
|
||||
#[serde(default = "default_block_max_concurrent_writes_per_request")]
|
||||
pub block_max_concurrent_writes_per_request: usize,
|
||||
/// Number of replicas. Can be any positive integer, but uneven numbers are more favorable.
|
||||
@@ -95,7 +95,7 @@ pub struct Config {
|
||||
pub rpc_secret_file: Option<PathBuf>,
|
||||
/// Address to bind for RPC
|
||||
pub rpc_bind_addr: SocketAddr,
|
||||
/// Bind outgoing sockets to rpc_bind_addr's IP address as well
|
||||
/// Bind outgoing sockets to `rpc_bind_addr`'s IP address as well
|
||||
#[serde(default)]
|
||||
pub rpc_bind_outgoing: bool,
|
||||
/// Public IP address of this node
|
||||
@@ -154,7 +154,7 @@ pub struct Config {
|
||||
pub allow_punycode: bool,
|
||||
}
|
||||
|
||||
/// Value for data_dir: either a single directory or a list of dirs with attributes
|
||||
/// Value for `data_dir`: either a single directory or a list of dirs with attributes
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
#[serde(untagged)]
|
||||
pub enum DataDirEnum {
|
||||
@@ -166,7 +166,7 @@ pub enum DataDirEnum {
|
||||
pub struct DataDir {
|
||||
/// Path to the data directory
|
||||
pub path: PathBuf,
|
||||
/// Capacity of the drive (required if read_only is false)
|
||||
/// Capacity of the drive (required if `read_only` is false)
|
||||
#[serde(default)]
|
||||
pub capacity: Option<String>,
|
||||
/// Whether this is a legacy read-only path (capacity should be None)
|
||||
@@ -305,6 +305,7 @@ fn default_consistency_mode() -> String {
|
||||
"consistent".into()
|
||||
}
|
||||
|
||||
#[expect(clippy::unnecessary_wraps)]
|
||||
fn default_compression() -> Option<i32> {
|
||||
Some(1)
|
||||
}
|
||||
|
||||
@@ -29,10 +29,10 @@ pub trait Crdt {
|
||||
/// `Option<T>` implements Crdt for any type T, even if T doesn't implement CRDT itself: when
|
||||
/// different values are detected, they are always merged to None. This can be used for value
|
||||
/// types which shoulnd't be merged, instead of trying to merge things when we know we don't want
|
||||
/// to merge them (which is what the AutoCrdt trait is used for most of the time). This cases
|
||||
/// arises very often, for example with a Lww or a LwwMap: the value type has to be a CRDT so that
|
||||
/// to merge them (which is what the `AutoCrdt` trait is used for most of the time). This cases
|
||||
/// arises very often, for example with a Lww or a `LwwMap`: the value type has to be a CRDT so that
|
||||
/// we have a rule for what to do when timestamps aren't enough to disambiguate (in a distributed
|
||||
/// system, anything can happen!), and with AutoCrdt the rule is to make an arbitrary (but
|
||||
/// system, anything can happen!), and with `AutoCrdt` the rule is to make an arbitrary (but
|
||||
/// deterministic) choice between the two. When using an `Option<T>` instead with this impl, ambiguity
|
||||
/// cases are explicitly stored as None, which allows us to detect the ambiguity and handle it in
|
||||
/// the way we want. (this can only work if we are happy with losing the value when an ambiguity
|
||||
@@ -52,7 +52,7 @@ where
|
||||
/// defined by the merge rule: `a ⊔ b = max(a, b)`. Implement this trait for your type
|
||||
/// to enable this behavior.
|
||||
pub trait AutoCrdt: Ord + Clone + std::fmt::Debug {
|
||||
/// WARN_IF_DIFFERENT: emit a warning when values differ. Set this to true if
|
||||
/// `WARN_IF_DIFFERENT`: emit a warning when values differ. Set this to true if
|
||||
/// different values in your application should never happen. Set this to false
|
||||
/// if you are actually relying on the semantics of `a ⊔ b = max(a, b)`.
|
||||
const WARN_IF_DIFFERENT: bool;
|
||||
|
||||
@@ -19,7 +19,7 @@ use crate::crdt::crdt::*;
|
||||
/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
|
||||
/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
|
||||
/// such that two values can be compared for equality based on their hashes). As a consequence,
|
||||
/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps.
|
||||
/// insertions take `O(n)` time. This means that `LWWMap` should be used for reasonably small maps.
|
||||
/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
|
||||
/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
|
||||
/// actually not losing anything here.
|
||||
|
||||
@@ -73,7 +73,7 @@ impl FixedBytes32 {
|
||||
pub fn to_vec(self) -> Vec<u8> {
|
||||
self.0.to_vec()
|
||||
}
|
||||
/// Try building a FixedBytes32 from a slice
|
||||
/// Try building a `FixedBytes32` from a slice
|
||||
/// Return None if the slice is not 32 bytes long
|
||||
pub fn try_from(by: &[u8]) -> Option<Self> {
|
||||
if by.len() != 32 {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Serialize to MessagePack, without versioning
|
||||
/// (see garage_util::migrate for functions that manage versioned
|
||||
/// Serialize to `MessagePack`, without versioning
|
||||
/// (see `garage_util::migrate` for functions that manage versioned
|
||||
/// data formats)
|
||||
pub fn nonversioned_encode<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
|
||||
where
|
||||
@@ -13,8 +13,8 @@ where
|
||||
Ok(wr)
|
||||
}
|
||||
|
||||
/// Deserialize from MessagePack, without versioning
|
||||
/// (see garage_util::migrate for functions that manage versioned
|
||||
/// Deserialize from `MessagePack`, without versioning
|
||||
/// (see `garage_util::migrate` for functions that manage versioned
|
||||
/// data formats)
|
||||
pub fn nonversioned_decode<T>(bytes: &[u8]) -> Result<T, rmp_serde::decode::Error>
|
||||
where
|
||||
|
||||
@@ -129,7 +129,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait to map any error type to Error::Message
|
||||
/// Trait to map any error type to `Error::Message`
|
||||
pub trait OkOrMessage {
|
||||
type S;
|
||||
fn ok_or_message<M: Into<String>>(self, message: M) -> Result<Self::S, Error>;
|
||||
|
||||
@@ -54,7 +54,7 @@ impl<T: InitialFormat> Migrate for T {
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal type used by InitialFormat, not meant for general use.
|
||||
/// Internal type used by `InitialFormat`, not meant for general use.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum NoPrevious {}
|
||||
|
||||
|
||||
@@ -31,3 +31,6 @@ hyper.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
opentelemetry.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -106,7 +106,7 @@ impl WebServer {
|
||||
}
|
||||
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
|
||||
if path.exists() {
|
||||
fs::remove_file(path)?
|
||||
fs::remove_file(path)?;
|
||||
}
|
||||
|
||||
let listener = UnixListener::bind(path)?;
|
||||
@@ -684,10 +684,10 @@ fn compute_redirect_target(redirect: &bucket_table::Redirect, suffix: Option<&st
|
||||
if let Some(replace_key_prefix) = &redirect.replace_key_prefix {
|
||||
res.push_str(replace_key_prefix);
|
||||
if let Some(suffix) = suffix {
|
||||
res.push_str(suffix)
|
||||
res.push_str(suffix);
|
||||
}
|
||||
} else if let Some(replace_key) = &redirect.replace_key {
|
||||
res.push_str(replace_key)
|
||||
res.push_str(replace_key);
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user