Skip to content

Commit

Permalink
allow missing svc port for sandwich (#1054)
Browse files Browse the repository at this point in the history
* allow missing svc port for sandwich

* it coverage

* ut
  • Loading branch information
stevenctl committed May 18, 2024
1 parent 2d41d21 commit 80ac30e
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 12 deletions.
127 changes: 116 additions & 11 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ use std::net::{IpAddr, SocketAddr};
use std::sync::{Arc, RwLock, RwLockReadGuard};
use tracing::{debug, error, trace, warn};

use self::workload::ApplicationTunnel;

pub mod policy;
pub mod service;
pub mod workload;
Expand Down Expand Up @@ -251,14 +253,6 @@ impl ProxyState {
.services
.get_by_vip(&network_addr(network.clone(), addr.ip()))
{
let Some(target_port) = svc.ports.get(&addr.port()) else {
debug!(
"found VIP {}, but port {} was unknown",
addr.ip(),
addr.port()
);
return None;
};
// Randomly pick an upstream
// TODO: do this more efficiently, and not just randomly
let Some(ep) = self.load_balance(source_workload, &svc) else {
Expand All @@ -269,11 +263,29 @@ impl ProxyState {
debug!("failed to fetch workload for {}", ep.workload_uid);
return None;
};
// If endpoint overrides the target port, use that instead
let target_port = ep.port.get(&addr.port()).unwrap_or(target_port);

let target_port = if let Some(&port) = ep.port.get(&addr.port()) {
// prefer endpoint port mapping
port
} else if let Some(&port) = svc.ports.get(&addr.port()) {
// otherwise, see if the service has this port
port
} else if let Some(ApplicationTunnel { port: Some(_), .. }) = &wl.application_tunnel {
// when using app tunnel, we don't require the port to be found on the service
addr.port()
} else {
// no app tunnel or port mapping, error
debug!(
"found VIP {}, but port {} was unknown",
addr.ip(),
addr.port()
);
return None;
};

let us = Upstream {
workload: wl,
port: *target_port,
port: target_port,
sans: svc.subject_alt_names.clone(),
destination_service: Some(ServiceDescription::from(svc.as_ref())),
};
Expand Down Expand Up @@ -874,9 +886,12 @@ mod tests {
use crate::state::workload::Locality;
use std::{net::Ipv4Addr, net::SocketAddrV4, time::Duration};

use self::workload::{application_tunnel::Protocol as AppProtocol, ApplicationTunnel};

use super::*;
use crate::test_helpers::TEST_SERVICE_NAMESPACE;
use crate::{strng, test_helpers};
use test_case::test_case;

#[tokio::test]
async fn lookup_address() {
Expand Down Expand Up @@ -946,6 +961,96 @@ mod tests {
.await;
}

enum PortMappingTestCase {
EndpointMapping,
ServiceMapping,
AppTunnel,
}

impl PortMappingTestCase {
fn service_mapping(&self) -> HashMap<u16, u16> {
if let PortMappingTestCase::ServiceMapping = self {
return HashMap::from([(80, 8080)]);
}
HashMap::from([])
}

fn endpoint_mapping(&self) -> HashMap<u16, u16> {
if let PortMappingTestCase::EndpointMapping = self {
return HashMap::from([(80, 9090)]);
}
HashMap::from([])
}

fn app_tunnel(&self) -> Option<ApplicationTunnel> {
if let PortMappingTestCase::AppTunnel = self {
return Some(ApplicationTunnel {
protocol: AppProtocol::PROXY,
port: Some(15088),
});
}
None
}

fn expected_port(&self) -> u16 {
match self {
PortMappingTestCase::ServiceMapping => 8080,
PortMappingTestCase::EndpointMapping => 9090,
_ => 80,
}
}
}

#[test_case(PortMappingTestCase::EndpointMapping; "ep mapping")]
#[test_case(PortMappingTestCase::ServiceMapping; "svc mapping")]
#[test_case(PortMappingTestCase::AppTunnel; "app tunnel")]
#[tokio::test]
async fn find_upstream_port_mappings(tc: PortMappingTestCase) {
let wl = Workload {
uid: "cluster1//v1/Pod/default/ep_no_port_mapping".into(),
name: "ep_no_port_mapping".into(),
namespace: "default".into(),
workload_ips: vec![IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1))],
application_tunnel: tc.app_tunnel(),
..test_helpers::test_default_workload()
};
let svc = Service {
name: "test-svc".into(),
hostname: "example.com".into(),
namespace: "default".into(),
vips: vec![NetworkAddress {
address: "10.0.0.1".parse().unwrap(),
network: "".into(),
}],
endpoints: HashMap::from([(
"cluster1//v1/Pod/default/ep_no_port_mapping".into(),
Endpoint {
workload_uid: "cluster1//v1/Pod/default/ep_no_port_mapping".into(),
service: NamespacedHostname {
namespace: "default".into(),
hostname: "example.com".into(),
},
address: Some(NetworkAddress {
address: "192.168.0.1".parse().unwrap(),
network: "".into(),
}),
port: tc.endpoint_mapping(),
},
)]),
ports: tc.service_mapping(),
..test_helpers::mock_default_service()
};

let mut state = ProxyState::default();
state.workloads.insert(wl.clone().into(), true);
state.services.insert(svc);

let us = state
.find_upstream("".into(), &wl, "10.0.0.1:80".parse().unwrap())
.expect("upstream to be found");
assert_eq!(us.port, tc.expected_port());
}

#[tokio::test]
async fn assert_rbac_with_dest_workload_info() {
let mut state = ProxyState::default();
Expand Down
18 changes: 17 additions & 1 deletion tests/namespaced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,25 @@ mod namespaced {

let _zt = manager.deploy_ztunnel(DEFAULT_NODE).await?;

// waypoint referenced via vip
let waypoint_ip = TEST_VIP.parse::<IpAddr>()?;
// service with no ports (workload app tunnel makes this work)
manager
.service_builder("waypoint")
.addresses(vec![NetworkAddress {
network: strng::EMPTY,
address: waypoint_ip,
}])
.register()
.await?;

let waypoint = manager
.workload_builder("waypoint", DEFAULT_NODE)
.service(
"default/waypoint.default.svc.cluster.local",
PROXY_PROTOCOL_PORT,
PROXY_PROTOCOL_PORT,
)
.mutate_workload(|w| {
w.application_tunnel = Some(ApplicationTunnel {
protocol: Protocol::PROXY,
Expand All @@ -320,7 +337,6 @@ mod namespaced {
})
.register()
.await?;
let waypoint_ip = waypoint.ip();

let server = manager
.workload_builder("server", DEFAULT_NODE)
Expand Down

0 comments on commit 80ac30e

Please sign in to comment.