Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mTLS support for batch and custom http object #2442

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 37 additions & 26 deletions googleapiclient/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ def build_from_document(

schema = Schemas(service)

is_mtls = False

# If the http client is not specified, then we must construct an http client
# to make requests. If the service has scopes, then we also need to setup
# authentication.
Expand Down Expand Up @@ -685,36 +687,38 @@ def build_from_document(
)
http_channel.add_certificate(key_path, cert_path, "", passphrase)

# If user doesn't provide api endpoint via client options, decide which
# api endpoint to use.
if "mtlsRootUrl" in service and (
not client_options or not client_options.api_endpoint
):
mtls_endpoint = urllib.parse.urljoin(
service["mtlsRootUrl"], service["servicePath"]
)
use_mtls_endpoint = os.getenv(GOOGLE_API_USE_MTLS_ENDPOINT, "auto")

if not use_mtls_endpoint in ("never", "auto", "always"):
raise MutualTLSChannelError(
"Unsupported GOOGLE_API_USE_MTLS_ENDPOINT value. Accepted values: never, auto, always"
)

# Switch to mTLS endpoint, if environment variable is "always", or
# environment varibable is "auto" and client cert exists.
if use_mtls_endpoint == "always" or (
use_mtls_endpoint == "auto" and client_cert_to_use
):
if HAS_UNIVERSE and universe_domain != universe.DEFAULT_UNIVERSE:
raise MutualTLSChannelError(
f"mTLS is not supported in any universe other than {universe.DEFAULT_UNIVERSE}."
)
base = mtls_endpoint
else:
# Check google-api-core >= 2.18.0 if credentials' universe != "googleapis.com".
http_credentials = getattr(http, "credentials", None)
_check_api_core_compatible_with_credentials_universe(http_credentials)

# If user doesn't provide api endpoint via client options, decide which
# api endpoint to use.
if "mtlsRootUrl" in service and (
not client_options or not client_options.api_endpoint
):
mtls_endpoint = urllib.parse.urljoin(
service["mtlsRootUrl"], service["servicePath"]
)
use_mtls_endpoint = os.getenv(GOOGLE_API_USE_MTLS_ENDPOINT, "auto")

if not use_mtls_endpoint in ("never", "auto", "always"):
raise MutualTLSChannelError(
"Unsupported GOOGLE_API_USE_MTLS_ENDPOINT value. Accepted values: never, auto, always"
)

# Switch to mTLS endpoint, if environment variable is "always", or
# environment varibable is "auto" and client cert exists.
if use_mtls_endpoint == "always" or (
use_mtls_endpoint == "auto" and client_cert_to_use
):
if HAS_UNIVERSE and universe_domain != universe.DEFAULT_UNIVERSE:
raise MutualTLSChannelError(
f"mTLS is not supported in any universe other than {universe.DEFAULT_UNIVERSE}."
)
base = mtls_endpoint
is_mtls = True

if model is None:
features = service.get("features", [])
model = JsonModel("dataWrapper" in features)
Expand All @@ -729,6 +733,7 @@ def build_from_document(
rootDesc=service,
schema=schema,
universe_domain=universe_domain,
is_mtls=is_mtls
)


Expand Down Expand Up @@ -1406,6 +1411,7 @@ def __init__(
rootDesc,
schema,
universe_domain=universe.DEFAULT_UNIVERSE if HAS_UNIVERSE else "",
is_mtls=False
):
"""Build a Resource from the API description.

Expand Down Expand Up @@ -1438,6 +1444,7 @@ def __init__(
self._schema = schema
self._universe_domain = universe_domain
self._credentials_validated = False
self.is_mtls = is_mtls

self._set_service_methods()

Expand Down Expand Up @@ -1494,8 +1501,12 @@ def _set_service_methods(self):
def _add_basic_methods(self, resourceDesc, rootDesc, schema):
# If this is the root Resource, add a new_batch_http_request() method.
if resourceDesc == rootDesc:
if self.is_mtls and "mtlsRootUrl" in rootDesc:
root_url = rootDesc["mtlsRootUrl"]
else:
root_url = rootDesc["rootUrl"]
batch_uri = "%s%s" % (
rootDesc["rootUrl"],
root_url,
rootDesc.get("batchPath", "batch"),
)

Expand Down