Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ possible include a PR number for easier tracking.

## Next

* feat(grpc): add exponential backoff for reconnection attempts (#789)
* feat: run integration tests on more platforms (#760)
* ROX-34502: reload mTLS certificates on each gRPC connection attempt (#788)
* chore: add formatting and linting to integration test code (#783, #784)
Expand Down
45 changes: 42 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ openssl = "0.10.75"
prometheus-client = { version = "0.25.0", default-features = false }
prost = "0.14.0"
prost-types = "0.14.0"
rand = { version = "0.10.1", default-features = false, features = ["thread_rng"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.142"
shlex = "2.0.1"
Expand Down
1 change: 1 addition & 0 deletions fact/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tokio-stream = { workspace = true }
prometheus-client = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
shlex = { workspace = true }
Expand Down
185 changes: 166 additions & 19 deletions fact/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ const CONFIG_FILES: [&str; 4] = [
"fact.yaml",
];

#[derive(Debug, Default, PartialEq, Eq, Clone)]
fn yaml_to_duration_secs(v: &Yaml) -> Option<Duration> {
v.as_f64()
.or_else(|| v.as_i64().map(|i| i as f64))
.filter(|s| s.is_finite() && *s >= 0.0)
.map(Duration::from_secs_f64)
}

#[derive(Debug, Default, PartialEq, Clone)]
pub struct FactConfig {
paths: Option<Vec<PathBuf>>,
pub grpc: GrpcConfig,
Expand Down Expand Up @@ -218,20 +225,11 @@ impl TryFrom<Vec<Yaml>> for FactConfig {
config.hotreload = Some(hotreload);
}
"scan_interval" => {
// scan_internal == 0 disables the scanner
if let Some(scan_interval) = v.as_f64() {
if scan_interval < 0.0 {
bail!("invalid scan_interval: {scan_interval}");
}
config.scan_interval = Some(Duration::from_secs_f64(scan_interval));
} else if let Some(scan_interval) = v.as_i64() {
if scan_interval < 0 {
bail!("invalid scan_interval: {scan_interval}");
}
config.scan_interval = Some(Duration::from_secs(scan_interval as u64))
} else {
bail!("scan_interval field has incorrect type: {v:?}");
}
// scan_interval == 0 disables the scanner
let Some(scan_interval) = yaml_to_duration_secs(v) else {
bail!("invalid scan_interval: {v:?}");
};
config.scan_interval = Some(scan_interval);
}
"rate_limit" => {
// rate_limit == 0 means unlimited (no throttling)
Expand Down Expand Up @@ -328,10 +326,97 @@ impl TryFrom<&yaml::Hash> for EndpointConfig {
}
}

#[derive(Debug, Default, PartialEq, Eq, Clone)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct BackoffConfig {
initial: Option<Duration>,
max: Option<Duration>,
jitter: Option<bool>,
multiplier: Option<f64>,
}

impl BackoffConfig {
fn update(&mut self, from: &BackoffConfig) {
if let Some(initial) = from.initial {
self.initial = Some(initial);
}
if let Some(max) = from.max {
self.max = Some(max);
}
if let Some(jitter) = from.jitter {
self.jitter = Some(jitter);
}
if let Some(multiplier) = from.multiplier {
self.multiplier = Some(multiplier);
}
}

pub fn initial(&self) -> Duration {
self.initial.unwrap_or(Duration::from_secs(1))
}

pub fn max(&self) -> Duration {
self.max.unwrap_or(Duration::from_secs(60))
}

pub fn jitter(&self) -> bool {
self.jitter.unwrap_or(true)
}

pub fn multiplier(&self) -> f64 {
self.multiplier.unwrap_or(1.5)
}
}

impl TryFrom<&yaml::Hash> for BackoffConfig {
type Error = anyhow::Error;

fn try_from(value: &yaml::Hash) -> Result<Self, Self::Error> {
let mut backoff = BackoffConfig::default();
for (k, v) in value.iter() {
let Some(k) = k.as_str() else {
bail!("key is not string: {k:?}");
};
match k {
"initial" => {
let Some(initial) = yaml_to_duration_secs(v).filter(|d| !d.is_zero()) else {
bail!("invalid grpc.backoff.initial: {v:?}");
};
backoff.initial = Some(initial);
}
"max" => {
let Some(max) = yaml_to_duration_secs(v).filter(|d| !d.is_zero()) else {
bail!("invalid grpc.backoff.max: {v:?}");
};
backoff.max = Some(max);
}
"jitter" => {
let Some(jitter) = v.as_bool() else {
bail!("grpc.backoff.jitter field has incorrect type: {v:?}");
};
backoff.jitter = Some(jitter);
}
"multiplier" => {
let Some(multiplier) = v
.as_f64()
.or_else(|| v.as_i64().map(|v| v as f64))
.filter(|mult| mult.is_finite() && *mult > 1.0)
else {
bail!("invalid grpc.backoff.multiplier: {v:?}");
};
backoff.multiplier = Some(multiplier);
}
name => bail!("Invalid field 'grpc.backoff.{name}' with value: {v:?}"),
}
}
Ok(backoff)
}
}

#[derive(Debug, Default, PartialEq, Clone)]
pub struct GrpcConfig {
url: Option<String>,
certs: Option<PathBuf>,
pub backoff: BackoffConfig,
}

impl GrpcConfig {
Expand All @@ -343,6 +428,8 @@ impl GrpcConfig {
if let Some(certs) = from.certs.as_deref() {
self.certs = Some(certs.to_owned());
}

self.backoff.update(&from.backoff);
}

pub fn url(&self) -> Option<&str> {
Expand Down Expand Up @@ -377,6 +464,12 @@ impl TryFrom<&yaml::Hash> for GrpcConfig {
};
grpc.certs = Some(PathBuf::from(certs));
}
"backoff" => {
let Some(backoff) = v.as_hash() else {
bail!("grpc.backoff section has incorrect type: {v:?}");
};
grpc.backoff = BackoffConfig::try_from(backoff)?;
}
name => bail!("Invalid field 'grpc.{name}' with value: {v:?}"),
}
}
Expand Down Expand Up @@ -450,6 +543,30 @@ impl TryFrom<&yaml::Hash> for BpfConfig {
}
}

fn parse_duration_secs(s: &str) -> anyhow::Result<Duration> {
let f = s.parse::<f64>()?;
if !f.is_finite() || f < 0.0 {
bail!("value must be a non-negative finite number, got {f}");
}
Ok(Duration::from_secs_f64(f))
}

fn parse_positive_duration_secs(s: &str) -> anyhow::Result<Duration> {
let d = parse_duration_secs(s)?;
if d.is_zero() {
bail!("value must be greater than zero");
}
Ok(d)
}

fn parse_multiplier(s: &str) -> anyhow::Result<f64> {
let mult = s.parse::<f64>()?;
if !mult.is_finite() || mult <= 1.0 {
bail!("multiplier must be > 1.0, got {mult}");
}
Ok(mult)
}

#[derive(Debug, Parser)]
#[clap(version = crate::version::FACT_VERSION, about)]
pub struct FactCli {
Expand All @@ -465,6 +582,30 @@ pub struct FactCli {
#[arg(short, long, env = "FACT_CERTS")]
certs: Option<PathBuf>,

/// Initial backoff delay in seconds for gRPC reconnection
///
/// Default value is 1 second
#[arg(long, env = "FACT_GRPC_BACKOFF_INITIAL_DURATION", value_parser = parse_positive_duration_secs)]
backoff_initial: Option<Duration>,

/// Maximum backoff delay in seconds for gRPC reconnection
///
/// Default value is 60 seconds
#[arg(long, env = "FACT_GRPC_BACKOFF_MAX_DURATION", value_parser = parse_positive_duration_secs)]
backoff_max: Option<Duration>,

/// Backoff multiplier for gRPC reconnection
///
/// Must be > 1.0. Default value is 1.5
#[arg(long, env = "FACT_GRPC_BACKOFF_MULTIPLIER", value_parser = parse_multiplier)]
backoff_multiplier: Option<f64>,

/// Enable jitter for gRPC reconnection backoff
///
/// Default value is true
#[arg(long, env = "FACT_GRPC_BACKOFF_JITTER")]
backoff_jitter: Option<bool>,

/// The port to bind for all exposed endpoints
#[arg(long, short, env = "FACT_ENDPOINT_ADDRESS")]
address: Option<SocketAddr>,
Expand Down Expand Up @@ -535,8 +676,8 @@ pub struct FactCli {
/// The seconds can use a decimal point for fractions of seconds.
///
/// Default value is 30 seconds
#[arg(long, short, env = "FACT_SCAN_INTERVAL")]
scan_interval: Option<f64>,
#[arg(long, short, env = "FACT_SCAN_INTERVAL", value_parser = parse_duration_secs)]
scan_interval: Option<Duration>,

/// Maximum number of file events to allow per second
///
Expand All @@ -555,6 +696,12 @@ impl FactCli {
grpc: GrpcConfig {
url: self.url.clone(),
certs: self.certs.clone(),
backoff: BackoffConfig {
initial: self.backoff_initial,
max: self.backoff_max,
jitter: self.backoff_jitter,
multiplier: self.backoff_multiplier,
},
},
endpoint: EndpointConfig {
address: self.address,
Expand All @@ -568,7 +715,7 @@ impl FactCli {
skip_pre_flight: resolve_bool_arg(self.skip_pre_flight, self.no_skip_pre_flight),
json: resolve_bool_arg(self.json, self.no_json),
hotreload: resolve_bool_arg(self.hotreload, self.no_hotreload),
scan_interval: self.scan_interval.map(Duration::from_secs_f64),
scan_interval: self.scan_interval,
rate_limit: self.rate_limit,
}
}
Expand Down
Loading
Loading