Skip to content

Commit

Permalink
[release-1.22] Fix outbound race condition where we don't have the so…
Browse files Browse the repository at this point in the history
…urce workload from XDS yet (#1111)

* Wait without demand (fixes #51193)

Signed-off-by: Benjamin Leggett <[email protected]>

* Add some tests

Signed-off-by: Benjamin Leggett <[email protected]>

* Fixup

Signed-off-by: Benjamin Leggett <[email protected]>

---------

Signed-off-by: Benjamin Leggett <[email protected]>
Co-authored-by: Benjamin Leggett <[email protected]>
  • Loading branch information
istio-testing and bleggett committed Jun 4, 2024
1 parent 9b00957 commit d7b65de
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 6 deletions.
14 changes: 11 additions & 3 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;

use std::time::Instant;
use std::time::{Duration, Instant};

use drain::Watch;

Expand Down Expand Up @@ -465,9 +465,17 @@ impl OutboundConnection {
network: self.pi.cfg.network.clone(),
address: downstream,
};
let source_workload = match self.pi.state.fetch_workload(&downstream_network_addr).await {

let source_workload = match self
.pi
.state
.wait_for_workload(&downstream_network_addr, Duration::from_millis(500))
.await
{
Some(wl) => wl,
None => return Err(Error::UnknownSource(downstream)),
None => {
return Err(Error::UnknownSource(downstream));
}
};
if let Some(ref wl_info) = self.pi.proxy_workload_info {
// make sure that the workload we fetched matches the workload info we got over ZDS.
Expand Down
139 changes: 139 additions & 0 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use std::fmt;
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::Duration;
use tracing::{debug, error, trace, warn};

pub mod policy;
Expand Down Expand Up @@ -630,6 +631,47 @@ impl DemandProxyState {
fetch(addr)
}

// same as fetch_workload, but if the caller knows the workload is enroute already,
// will retry on cache miss for a configured amount of time - returning the workload
// when we get it, or nothing if the timeout is exceeded, whichever happens first
pub async fn wait_for_workload(
&self,
addr: &NetworkAddress,
deadline: Duration,
) -> Option<Arc<Workload>> {
debug!(%addr, "wait for workload");

// Take a watch listener *before* checking state (so we don't miss anything)
let mut wl_sub = self.state.read().unwrap().workloads.new_subscriber();

debug!(%addr, "got sub, waiting for workload");

if let Some(wl) = self.fetch_workload(addr).await {
return Some(wl);
}

// We didn't find the workload we expected, so
// loop until the subscriber wakes us on new workload,
// or we hit the deadline timeout and give up
let timeout = tokio::time::sleep(deadline);
let subscriber = wl_sub.changed();
tokio::pin!(timeout);
tokio::pin!(subscriber);
loop {
tokio::select! {
_ = &mut timeout => {
warn!("timed out waiting for workload from xds");
break None;
},
_ = &mut subscriber => {
if let Some(wl) = self.fetch_workload(addr).await {
break Some(wl);
}
}
};
}
}

// only support workload
pub async fn fetch_workload(&self, addr: &NetworkAddress) -> Option<Arc<Workload>> {
// Wait for it on-demand, *if* needed
Expand Down Expand Up @@ -873,6 +915,103 @@ mod tests {
use crate::test_helpers::TEST_SERVICE_NAMESPACE;
use crate::{strng, test_helpers};

#[tokio::test]
async fn test_wait_for_workload() {
let mut state = ProxyState::default();
let delayed_wl = Arc::new(test_helpers::test_default_workload());
state.workloads.insert(delayed_wl.clone(), true);

let mut registry = Registry::default();
let metrics = Arc::new(crate::proxy::Metrics::new(&mut registry));
let mock_proxy_state = DemandProxyState::new(
Arc::new(RwLock::new(state)),
None,
ResolverConfig::default(),
ResolverOpts::default(),
metrics,
);

// Some from Address
let dst = NetworkAddress {
network: strng::EMPTY,
address: IpAddr::V4(Ipv4Addr::LOCALHOST),
};

test_helpers::assert_eventually(
Duration::from_secs(1),
|| mock_proxy_state.wait_for_workload(&dst, Duration::from_millis(50)),
Some(delayed_wl),
)
.await;
}

#[tokio::test]
async fn test_wait_for_workload_delay_fails() {
let state = ProxyState::default();

let mut registry = Registry::default();
let metrics = Arc::new(crate::proxy::Metrics::new(&mut registry));
let mock_proxy_state = DemandProxyState::new(
Arc::new(RwLock::new(state)),
None,
ResolverConfig::default(),
ResolverOpts::default(),
metrics,
);

// Some from Address
let dst = NetworkAddress {
network: strng::EMPTY,
address: IpAddr::V4(Ipv4Addr::LOCALHOST),
};

test_helpers::assert_eventually(
Duration::from_millis(10),
|| mock_proxy_state.wait_for_workload(&dst, Duration::from_millis(5)),
None,
)
.await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_wait_for_workload_eventually() {
let state = ProxyState::default();
let wrap_state = Arc::new(RwLock::new(state));
let delayed_wl = Arc::new(test_helpers::test_default_workload());

let mut registry = Registry::default();
let metrics = Arc::new(crate::proxy::Metrics::new(&mut registry));
let mock_proxy_state = DemandProxyState::new(
wrap_state.clone(),
None,
ResolverConfig::default(),
ResolverOpts::default(),
metrics,
);

// Some from Address
let dst = NetworkAddress {
network: strng::EMPTY,
address: IpAddr::V4(Ipv4Addr::LOCALHOST),
};

let expected_wl = delayed_wl.clone();
let t = tokio::spawn(async move {
test_helpers::assert_eventually(
Duration::from_millis(500),
|| mock_proxy_state.wait_for_workload(&dst, Duration::from_millis(250)),
Some(expected_wl),
)
.await;
});
wrap_state
.write()
.unwrap()
.workloads
.insert(delayed_wl, true);
t.await.expect("should not fail");
}

#[tokio::test]
async fn lookup_address() {
let mut state = ProxyState::default();
Expand Down
31 changes: 30 additions & 1 deletion src/state/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::{fmt, net};
use thiserror::Error;
use tokio::sync::watch::{Receiver, Sender};
use tracing::{error, trace};
use xds::istio::workload::ApplicationTunnel as XdsApplicationTunnel;
use xds::istio::workload::GatewayAddress as XdsGatewayAddress;
Expand Down Expand Up @@ -589,8 +590,13 @@ pub fn network_addr(network: Strng, vip: IpAddr) -> NetworkAddress {
}

/// A WorkloadStore encapsulates all information about workloads in the mesh
#[derive(Default, Debug)]
#[derive(Debug)]
pub struct WorkloadStore {
// TODO this could be expanded to Sender<Workload> + a full subscriber/streaming
// model, but for now just notifying watchers to wake when _any_ insert happens
// is simpler (and only requires a channelsize of 1)
insert_notifier: Sender<()>,

/// byAddress maps workload network addresses to workloads
pub(super) by_addr: HashMap<NetworkAddress, Arc<Workload>>,
/// byUid maps workload UIDs to workloads
Expand All @@ -601,7 +607,26 @@ pub struct WorkloadStore {
by_identity: HashMap<Identity, HashSet<Strng>>,
}

impl Default for WorkloadStore {
fn default() -> Self {
WorkloadStore {
insert_notifier: Sender::new(()),
by_addr: Default::default(),
by_hostname: Default::default(),
by_identity: Default::default(),
by_uid: Default::default(),
}
}
}

impl WorkloadStore {
// Returns a new subscriber. Note that subscribers are only guaranteed to be notified on
// new values sent _after_ their creation, so callers should create, check current state,
// then sub.
pub fn new_subscriber(&self) -> Receiver<()> {
self.insert_notifier.subscribe()
}

pub fn insert(&mut self, w: Arc<Workload>, track_identity: bool) {
// First, remove the entry entirely to make sure things are cleaned up properly.
self.remove(&w.uid);
Expand All @@ -621,6 +646,10 @@ impl WorkloadStore {
.or_default()
.insert(w.uid.clone());
}

// We have stored a newly inserted workload, notify watchers
// (if any) to wake.
self.insert_notifier.send_replace(());
}

pub fn remove(&mut self, uid: &Strng) -> Option<Workload> {
Expand Down
4 changes: 2 additions & 2 deletions src/xds/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,8 +848,8 @@ mod tests {
};
while start_time.elapsed().unwrap() < TEST_TIMEOUT && !matched {
sleep(POLL_RATE).await;
let wl = source.fetch_workload(&ip_network_addr).await;
matched = wl.as_deref() == converted.as_ref(); // Option<Workload> is Ok to compare without needing to unwrap
let wl = source.fetch_workload(&ip_network_addr);
matched = wl.await.as_deref() == converted.as_ref(); // Option<Workload> is Ok to compare without needing to unwrap
}
}

Expand Down

0 comments on commit d7b65de

Please sign in to comment.