Skip to content

Commit

Permalink
feat: a generic oidc token validator and update to business rules to …
Browse files Browse the repository at this point in the history
…not assume keycloak (#14900)

Co-authored-by: Jon Tan <[email protected]>
Co-authored-by: Joey Marshment-Howell <[email protected]>
  • Loading branch information
3 people committed Jan 9, 2025
1 parent 6816034 commit f7a9258
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 9 deletions.
2 changes: 2 additions & 0 deletions airbyte-api/server-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14289,6 +14289,8 @@ components:
defaultRealm:
type: string
default: "airbyte"
authorizationServerUrl:
type: string
LicenseStatus:
type: string
enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public AuthenticatedUser resolveUser(final String expectedAuthUserId) {
final String email = (String) jwtMap.get(JWT_USER_EMAIL);
// Default name to email address if name is not found
final String name = (String) jwtMap.getOrDefault(JWT_USER_NAME, email);
final AuthProvider authProvider = (AuthProvider) jwtMap.getOrDefault(JWT_AUTH_PROVIDER, null);
// TODO: the default should maybe be OIDC?
final AuthProvider authProvider = (AuthProvider) jwtMap.getOrDefault(JWT_AUTH_PROVIDER, AuthProvider.AIRBYTE);
return new AuthenticatedUser().withName(name).withEmail(email).withAuthUserId(authUserId).withAuthProvider(authProvider);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.auth.config

import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton

@Singleton
@Requires(property = "airbyte.auth.identity-provider.type", value = "generic-oidc")
data class OidcEndpointConfig(
@Property(name = "airbyte.auth.identity-provider.oidc.endpoints.authorization-server-endpoint")
val authorizationServerEndpoint: String,
@Property(name = "airbyte.auth.identity-provider.oidc.endpoints.user-info-endpoint")
var userInfoEndpoint: String,
// TODO: I don't love that this lives here,
// I would have called this class OidcConfig though, but that already exists.
@Property(name = "airbyte.auth.identity-provider.oidc.endpoints.client-id")
var clientId: String,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.auth.config

import io.micronaut.context.annotation.ConfigurationProperties
import io.micronaut.context.annotation.Requires

@ConfigurationProperties("airbyte.auth.identity-provider.oidc.endpoints.fields")
@Requires(property = "airbyte.auth.identity-provider.type", value = "generic-oidc")
class OidcFieldMappingConfig {
var sub: String = "sub"
var email: String = "email"
var name: String = "name"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.micrometer.common.util.StringUtils;
import io.micronaut.context.annotation.Primary;
import io.micronaut.http.HttpRequest;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.authentication.AuthenticationException;
Expand All @@ -45,6 +46,7 @@
* against the Keycloak server.
*/
@Singleton
@Primary
@RequiresAuthMode(AuthMode.OIDC)
@SuppressWarnings({"PMD.PreserveStackTrace", "PMD.UseTryWithResources", "PMD.UnusedFormalParameter", "PMD.UnusedPrivateMethod",
"PMD.ExceptionAsFlowControl"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.api.model.generated.WorkspaceUpdate;
import io.airbyte.commons.auth.config.AuthConfigs;
import io.airbyte.commons.auth.config.AuthMode;
import io.airbyte.commons.auth.config.OidcEndpointConfig;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.license.ActiveAirbyteLicense;
import io.airbyte.commons.license.AirbyteLicense;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class InstanceConfigurationHandler {
private final AuthConfigs authConfigs;
private final PermissionService permissionService;
private final Clock clock;
private final Optional<OidcEndpointConfig> oidcEndpointConfig;
private final Optional<KubernetesClientPermissionHelper> kubernetesClientPermissionHelper;

public InstanceConfigurationHandler(@Named("airbyteUrl") final Optional<String> airbyteUrl,
Expand All @@ -88,6 +90,7 @@ public InstanceConfigurationHandler(@Named("airbyteUrl") final Optional<String>
final AuthConfigs authConfigs,
final PermissionService permissionService,
final Optional<Clock> clock,
final Optional<OidcEndpointConfig> oidcEndpointConfig,
final Optional<KubernetesClientPermissionHelper> kubernetesClientPermissionHelper) {
this.airbyteUrl = airbyteUrl;
this.trackingStrategy = trackingStrategy;
Expand All @@ -101,6 +104,7 @@ public InstanceConfigurationHandler(@Named("airbyteUrl") final Optional<String>
this.authConfigs = authConfigs;
this.permissionService = permissionService;
this.clock = clock.orElse(Clock.systemUTC());
this.oidcEndpointConfig = oidcEndpointConfig;
this.kubernetesClientPermissionHelper = kubernetesClientPermissionHelper;
}

Expand Down Expand Up @@ -155,12 +159,17 @@ private AuthConfiguration getAuthConfiguration() {

// if Enterprise configurations are present, set OIDC-specific configs
if (authConfigs.getAuthMode().equals(AuthMode.OIDC)) {
// OIDC depends on Keycloak configuration being present
if (authConfigs.getKeycloakConfig() == null) {
throw new IllegalStateException("Keycloak configuration is required for OIDC mode.");
if (oidcEndpointConfig.isPresent()) {
authConfig.setAuthorizationServerUrl(oidcEndpointConfig.get().getAuthorizationServerEndpoint());
authConfig.setClientId(oidcEndpointConfig.get().getClientId());
} else if (authConfigs.getKeycloakConfig() != null && airbyteUrl.isPresent()) {
authConfig.setClientId(authConfigs.getKeycloakConfig().getWebClientId());
authConfig.setAuthorizationServerUrl(
airbyteUrl.get() + "/auth/realms/" + authConfigs.getKeycloakConfig().getAirbyteRealm());
} else {
// TODO: This is a bad error message. Once we figure out what the final config should look like
throw new IllegalStateException("OIDC must be configured either for Keycloak or in generic oidc mode.");
}
authConfig.setClientId(authConfigs.getKeycloakConfig().getWebClientId());
authConfig.setDefaultRealm(authConfigs.getKeycloakConfig().getAirbyteRealm());
}

return authConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.server.authorization

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import io.airbyte.commons.auth.RequiresAuthMode
import io.airbyte.commons.auth.config.AuthMode
import io.airbyte.commons.auth.config.OidcEndpointConfig
import io.airbyte.commons.auth.config.OidcFieldMappingConfig
import io.airbyte.metrics.lib.MetricAttribute
import io.airbyte.metrics.lib.MetricClient
import io.airbyte.metrics.lib.MetricTags
import io.airbyte.metrics.lib.OssMetricsRegistry
import io.micronaut.cache.annotation.CacheConfig
import io.micronaut.cache.annotation.Cacheable
import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Secondary
import io.micronaut.http.HttpRequest
import io.micronaut.security.authentication.Authentication
import io.micronaut.security.token.validator.TokenValidator
import jakarta.inject.Named
import jakarta.inject.Singleton
import okhttp3.OkHttpClient
import okhttp3.Request
import org.apache.http.HttpHeaders
import org.reactivestreams.Publisher
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono.empty
import reactor.core.publisher.Mono.just
import java.util.Optional

@Singleton
@Secondary
@RequiresAuthMode(AuthMode.OIDC)
@Requires(property = "airbyte.auth.identity-provider.type", value = "generic-oidc")
@Replaces(KeycloakTokenValidator::class)
@CacheConfig("user-info-endpoint-response")
open class OidcTokenValidator(
// Config
private val endpointConfig: OidcEndpointConfig,
private val fieldMappingConfig: OidcFieldMappingConfig,
// Clients
@Named("keycloakTokenValidatorHttpClient")
private val client: OkHttpClient,
private val metricClient: Optional<MetricClient>,
private val tokenRoleResolver: TokenRoleResolver,
) : TokenValidator<HttpRequest<Any>> {
companion object {
val log: Logger = LoggerFactory.getLogger(OidcTokenValidator::class.java)
const val EXTERNAL_USER: String = "external-user"
}

/**
* Validate the Token in the following way:
* 1. Call the configured UserInfo endpoint with the Token. If that request is not a 200, fail.
* 2. Take the UserInfo response JSON and map that into our userAttributesMap
* * This response is cached for a short amount of time (1m) as Commercial OIDC providers have
* rate limits on the UserInfo endpoint, e.g. 10 req/minute by UserId.
* 3. Find the Roles associated with the user and build the Authentication Response
*/
override fun validateToken(
token: String?,
request: HttpRequest<Any>,
): Publisher<Authentication> {
// Fail fast if the token is null or blank
if (token.isNullOrBlank()) {
return empty()
}

val authentication: Authentication? = validateTokenWithUserInfoEndpoint(token = token, request = request)
authentication?.let {
updateMetric(success = true, request = request)
return just(authentication)
}

// pass to the next validator, if one exists
log.debug("Token was not a valid token: {}", token)
updateMetric(success = false, request = request)
return empty()
}

/**
* Takes the raw token and makes a request to the UserInfo endpoint. The response is then parsed
* and mapped into our userAttributesMap. If that parsing fails, an Empty Optional is returned instead.
*/
@Cacheable
open fun validateTokenWithUserInfoEndpoint(
token: String,
request: HttpRequest<Any>,
): Authentication? {
log.debug("Validating token: $token\nwith endpoint ${endpointConfig.userInfoEndpoint}")

client.newCall(
Request.Builder()
.addHeader(HttpHeaders.CONTENT_TYPE, "application/json")
.addHeader(HttpHeaders.AUTHORIZATION, "Bearer $token")
.url(endpointConfig.userInfoEndpoint)
.get()
.build(),
).execute().use { response ->
if (response.isSuccessful && response.body != null) {
return convertUserInfoResponseToAuthentication(response = response.body!!.string(), request = request)
}
log.warn("The response from the userinfo endpoint was not valid. The status code was: ${response.code} with body: \n${response.body}")
return null
}
}

/**
* Going through UserInfo response and extract fields our backend cares about into an Authentication object.
*/
private fun convertUserInfoResponseToAuthentication(
response: String,
request: HttpRequest<Any>,
): Authentication? {
log.debug("Response from userinfo endpoint: {}", response)
val userInfoMap: Map<String, String> = jacksonObjectMapper().readValue(response)
log.debug("Authenticating with jwtmap {}: ", userInfoMap)

// Validate the payload, everything downstream can assume the map is correct.
if (userInfoMapIsValid(userInfoMap)) {
return Authentication.build(
userInfoMap[fieldMappingConfig.sub],
tokenRoleResolver.resolveRoles(
userInfoMap[fieldMappingConfig.sub],
request,
),
userInfoMap,
)
}
return null
}

/**
* Check that the UserInfo map contains all the key/value pairs we care about.
*/
private fun userInfoMapIsValid(userInfoMap: Map<String, String>): Boolean {
return if (userInfoMap[fieldMappingConfig.name].isNullOrBlank()) {
log.warn("The token did not contain a claim for key ${fieldMappingConfig.name}")
false
} else if (userInfoMap[fieldMappingConfig.email].isNullOrBlank()) {
log.warn("The token did not contain a claim for key ${fieldMappingConfig.email}")
false
} else if (userInfoMap[fieldMappingConfig.sub].isNullOrBlank()) {
log.warn("The token did not contain a claim for key ${fieldMappingConfig.sub}")
false
} else {
true
}
}

/**
* Helper method for reporting success/failure metrics.
*/
private fun updateMetric(
success: Boolean,
request: HttpRequest<Any>,
) {
metricClient.ifPresent { m: MetricClient ->
m.count(
OssMetricsRegistry.OIDC_TOKEN_VALIDATION,
1,
if (success) {
MetricAttribute(MetricTags.AUTHENTICATION_RESPONSE, "success")
} else {
MetricAttribute(MetricTags.AUTHENTICATION_RESPONSE, "failure")
},
MetricAttribute(MetricTags.USER_TYPE, EXTERNAL_USER),
MetricAttribute(MetricTags.AUTHENTICATION_REQUEST_URI_ATTRIBUTE_KEY, request.uri.path),
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class InstanceConfigurationHandlerTest {

private static final String AIRBYTE_URL = "http://localhost:8000";
private static final String AIRBYTE_REALM = "airbyte";
private static final String AUTHORIZATION_SERVER_URL = "http://localhost:8000/auth/realms/airbyte";
private static final String WEB_CLIENT_ID = "airbyte-webapp";
private static final UUID WORKSPACE_ID = UUID.randomUUID();
private static final UUID USER_ID = UUID.randomUUID();
Expand Down Expand Up @@ -139,6 +140,7 @@ void testGetInstanceConfiguration(final boolean isEnterprise, final boolean isIn
.auth(isEnterprise ? new AuthConfiguration()
.mode(ModeEnum.OIDC)
.clientId(WEB_CLIENT_ID)
.authorizationServerUrl(AUTHORIZATION_SERVER_URL)
.defaultRealm(AIRBYTE_REALM) : new AuthConfiguration().mode(ModeEnum.NONE))
.initialSetupComplete(isInitialSetupComplete)
.defaultUserId(USER_ID)
Expand Down Expand Up @@ -181,6 +183,7 @@ void testGetInstanceConfigurationTrackingStrategy(final String envValue, final T
mAuthConfigs,
permissionService,
Optional.empty(),
Optional.empty(),
mKubernetesClientHelper);

final var result = handler.getInstanceConfiguration();
Expand Down Expand Up @@ -245,7 +248,8 @@ void testSetupInstanceConfiguration(final boolean userNamePresent, final boolean
.auth(new AuthConfiguration()
.mode(ModeEnum.OIDC)
.clientId(WEB_CLIENT_ID)
.defaultRealm(AIRBYTE_REALM))
.defaultRealm(AIRBYTE_REALM)
.authorizationServerUrl(AUTHORIZATION_SERVER_URL))
.initialSetupComplete(true)
.defaultUserId(USER_ID)
.defaultOrganizationId(ORGANIZATION_ID)
Expand Down Expand Up @@ -337,6 +341,7 @@ void testLicenseInfoWithUsedNodes() {
mAuthConfigs,
permissionService,
Optional.empty(),
Optional.empty(),
Optional.of(kubernetesClientPermissionHelperMock));

final var licenseInfoResponse = handler.licenseInfo();
Expand Down Expand Up @@ -364,6 +369,7 @@ void testInvalidLicenseTest() {
mAuthConfigs,
permissionService,
Optional.empty(),
Optional.empty(),
mKubernetesClientHelper);
assertEquals(handler.currentLicenseStatus(), LicenseStatus.INVALID);
}
Expand All @@ -383,6 +389,7 @@ void testExpiredLicenseTest() {
mAuthConfigs,
permissionService,
Optional.of(Clock.fixed(Instant.MAX, ZoneId.systemDefault())),
Optional.empty(),
mKubernetesClientHelper);
assertEquals(handler.currentLicenseStatus(), LicenseStatus.EXPIRED);
}
Expand All @@ -403,6 +410,7 @@ void testExceededEditorsLicenseTest() {
mAuthConfigs,
permissionService,
Optional.empty(),
Optional.empty(),
mKubernetesClientHelper);
when(permissionService.listPermissions()).thenReturn(
Stream.generate(UUID::randomUUID)
Expand Down Expand Up @@ -451,6 +459,7 @@ private InstanceConfigurationHandler getInstanceConfigurationHandler(final boole
mAuthConfigs,
permissionService,
Optional.empty(),
Optional.empty(),
mKubernetesClientHelper);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public enum OssMetricsRegistry implements MetricsRegistry {
"keycloak_token_validation",
"increments when a keycloak auth token validation occurs"),

OIDC_TOKEN_VALIDATION(
MetricEmittingApps.SERVER,
"oidc_token_validation",
"increments when a oidc auth token validation occurs"),

BREAKING_SCHEMA_CHANGE_DETECTED(MetricEmittingApps.SERVER,
"breaking_change_detected",
"a breaking schema change has been detected"),
Expand Down
Loading

0 comments on commit f7a9258

Please sign in to comment.