New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details
Socket
Book a DemoSign in
Socket

google-auth

Package Overview
Dependencies
Maintainers
2
Versions
186
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

google-auth - pypi Package Compare versions

Comparing version
2.40.3
to
2.41.0
+15
-3
google_auth.egg-info/PKG-INFO

@@ -1,4 +0,4 @@

Metadata-Version: 2.1
Metadata-Version: 2.4
Name: google-auth
Version: 2.40.3
Version: 2.41.0
Summary: Google Authentication Library

@@ -28,3 +28,3 @@ Home-page: https://github.com/googleapis/google-auth-library-python

License-File: LICENSE
Requires-Dist: cachetools<6.0,>=2.0.0
Requires-Dist: cachetools<7.0,>=2.0.0
Requires-Dist: pyasn1-modules>=0.2.1

@@ -78,2 +78,14 @@ Requires-Dist: rsa<5,>=3.1.4

Requires-Dist: packaging; extra == "urllib3"
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: home-page
Dynamic: keywords
Dynamic: license
Dynamic: license-file
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

@@ -80,0 +92,0 @@ Google Auth Python Library

+1
-1

@@ -1,2 +0,2 @@

cachetools<6.0,>=2.0.0
cachetools<7.0,>=2.0.0
pyasn1-modules>=0.2.1

@@ -3,0 +3,0 @@ rsa<5,>=3.1.4

@@ -0,3 +1,4 @@

dist
google
scripts
testing

@@ -62,2 +62,34 @@ # Copyright 2015 Google Inc.

_GENERIC_LOAD_METHOD_WARNING = """\
The {} method is deprecated because of a potential security risk.
This method does not validate the credential configuration. The security
risk occurs when a credential configuration is accepted from a source that
is not under your control and used without validation on your side.
If you know that you will be loading credential configurations of a
specific type, it is recommended to use a credential-type-specific
load method.
This will ensure that an unexpected credential type with potential for
malicious intent is not loaded unintentionally. You might still have to do
validation for certain credential types. Please follow the recommendations
for that method. For example, if you want to load only service accounts,
you can create the service account credentials explicitly:
```
from google.oauth2 import service_account
creds = service_account.Credentials.from_service_account_file(filename)
```
If you are loading your credential configuration from an untrusted source and have
not mitigated the risks (e.g. by validating the configuration yourself), make
these changes as soon as possible to prevent security risks to your environment.
Regardless of the method used, it is always your responsibility to validate
configurations received from external sources.
Refer to https://cloud.google.com/docs/authentication/external/externally-sourced-credentials
for more details.
"""
# The subject token type used for AWS external_account credentials.

@@ -80,2 +112,16 @@ _AWS_SUBJECT_TOKEN_TYPE = "urn:ietf:params:aws:token-type:aws4_request"

def _warn_about_generic_load_method(method_name): # pragma: NO COVER
"""Warns that a generic load method is being used.
This is to discourage use of the generic load methods in favor of
more specific methods. The generic methods are more likely to lead to
security issues if the input is not validated.
Args:
method_name (str): The name of the method being used.
"""
warnings.warn(_GENERIC_LOAD_METHOD_WARNING.format(method_name), DeprecationWarning)
def load_credentials_from_file(

@@ -126,2 +172,4 @@ filename, scopes=None, default_scopes=None, quota_project_id=None, request=None

"""
_warn_about_generic_load_method("load_credentials_from_file")
if not os.path.exists(filename):

@@ -190,2 +238,3 @@ raise exceptions.DefaultCredentialsError(

"""
_warn_about_generic_load_method("load_credentials_from_dict")
if not isinstance(info, dict):

@@ -192,0 +241,0 @@ raise exceptions.DefaultCredentialsError(

@@ -24,2 +24,3 @@ # Copyright 2015 Google Inc.

import logging
import os
import sys

@@ -291,2 +292,42 @@ from typing import Any, Dict, Mapping, Optional, Union

def get_bool_from_env(variable_name, default=False):
"""Gets a boolean value from an environment variable.
The environment variable is interpreted as a boolean with the following
(case-insensitive) rules:
- "true", "1" are considered true.
- "false", "0" are considered false.
Any other values will raise an exception.
Args:
variable_name (str): The name of the environment variable.
default (bool): The default value if the environment variable is not
set.
Returns:
bool: The boolean value of the environment variable.
Raises:
google.auth.exceptions.InvalidValue: If the environment variable is
set to a value that can not be interpreted as a boolean.
"""
value = os.environ.get(variable_name)
if value is None:
return default
value = value.lower()
if value in ("true", "1"):
return True
elif value in ("false", "0"):
return False
else:
raise exceptions.InvalidValue(
'Environment variable "{}" must be one of "true", "false", "1", or "0".'.format(
variable_name
)
)
def is_python_3():

@@ -293,0 +334,0 @@ """Check if the Python interpreter is Python 2 or 3.

@@ -33,3 +33,7 @@ # Copyright 2016 Google LLC

_TRUST_BOUNDARY_LOOKUP_ENDPOINT = (
"https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations"
)
class Credentials(

@@ -39,2 +43,3 @@ credentials.Scoped,

credentials.CredentialsWithUniverseDomain,
credentials.CredentialsWithTrustBoundary,
):

@@ -66,2 +71,3 @@ """Compute Engine Credentials.

universe_domain=None,
trust_boundary=None,
):

@@ -82,2 +88,3 @@ """

domain endpoint, then the default googleapis.com will be used.
trust_boundary (Mapping[str,str]): A credential trust boundary.
"""

@@ -93,2 +100,3 @@ super(Credentials, self).__init__()

self._universe_domain_cached = True
self._trust_boundary = trust_boundary

@@ -108,2 +116,8 @@ def _retrieve_info(self, request):

if not info or "email" not in info:
raise exceptions.RefreshError(
"Unexpected response from metadata server: "
"service account info is missing 'email' field."
)
self._service_account_email = info["email"]

@@ -113,3 +127,3 @@

if self._scopes is None:
self._scopes = info["scopes"]
self._scopes = info.get("scopes")

@@ -119,3 +133,3 @@ def _metric_header_for_usage(self):

def refresh(self, request):
def _refresh_token(self, request):
"""Refresh the access token and scopes.

@@ -143,2 +157,33 @@

def _build_trust_boundary_lookup_url(self):
"""Builds and returns the URL for the trust boundary lookup API for GCE."""
# If the service account email is 'default', we need to get the
# actual email address from the metadata server.
if self._service_account_email == "default":
from google.auth.transport import requests as google_auth_requests
request = google_auth_requests.Request()
try:
info = _metadata.get_service_account_info(request, "default")
if not info or "email" not in info:
raise exceptions.RefreshError(
"Unexpected response from metadata server: "
"service account info is missing 'email' field."
)
self._service_account_email = info["email"]
except exceptions.TransportError as e:
# If fetching the service account email fails due to a transport error,
# it means we cannot build the trust boundary lookup URL.
# Wrap this in a RefreshError so it's caught by _refresh_trust_boundary.
raise exceptions.RefreshError(
"Failed to get service account email for trust boundary lookup: {}".format(
e
)
) from e
return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
self.universe_domain, self.service_account_email
)
@property

@@ -185,4 +230,5 @@ def service_account_email(self):

default_scopes=self._default_scopes,
universe_domain=self._universe_domain,
trust_boundary=self._trust_boundary,
)
creds._universe_domain = self._universe_domain
creds._universe_domain_cached = self._universe_domain_cached

@@ -201,4 +247,5 @@ return creds

quota_project_id=self._quota_project_id,
universe_domain=self._universe_domain,
trust_boundary=self._trust_boundary,
)
creds._universe_domain = self._universe_domain
creds._universe_domain_cached = self._universe_domain_cached

@@ -214,6 +261,20 @@ return creds

quota_project_id=self._quota_project_id,
trust_boundary=self._trust_boundary,
universe_domain=universe_domain,
)
@_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
def with_trust_boundary(self, trust_boundary):
creds = self.__class__(
service_account_email=self._service_account_email,
quota_project_id=self._quota_project_id,
scopes=self._scopes,
default_scopes=self._default_scopes,
universe_domain=self._universe_domain,
trust_boundary=trust_boundary,
)
creds._universe_domain_cached = self._universe_domain_cached
return creds
_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds

@@ -290,3 +351,3 @@ _DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"

if token_uri or additional_claims or service_account_email or signer:
raise exceptions.MalformedError(
raise ValueError(
"If use_metadata_identity_endpoint is set, token_uri, "

@@ -382,3 +443,3 @@ "additional_claims, service_account_email, signer arguments"

if self._use_metadata_identity_endpoint:
raise exceptions.MalformedError(
raise ValueError(
"If use_metadata_identity_endpoint is set, token_uri" " must not be set"

@@ -385,0 +446,0 @@ )

@@ -21,2 +21,3 @@ # Copyright 2016 Google LLC

import os
from typing import List

@@ -27,5 +28,8 @@ from google.auth import _helpers, environment_vars

from google.auth._credentials_base import _BaseCredentials
from google.auth._default import _LOGGER
from google.auth._refresh_worker import RefreshThreadManager
DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
NO_OP_TRUST_BOUNDARY_LOCATIONS: List[str] = []
NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0"

@@ -183,18 +187,3 @@

"""
self._apply(headers, token=token)
"""Trust boundary value will be a cached value from global lookup.
The response of trust boundary will be a list of regions and a hex
encoded representation.
An example of global lookup response:
{
"locations": [
"us-central1", "us-east1", "europe-west1", "asia-east1"
]
"encoded_locations": "0xA30"
}
"""
if self._trust_boundary is not None:
headers["x-allowed-locations"] = self._trust_boundary["encoded_locations"]
self._apply(headers, token)
if self.quota_project_id:

@@ -305,2 +294,157 @@ headers["x-goog-user-project"] = self.quota_project_id

class CredentialsWithTrustBoundary(Credentials):
"""Abstract base for credentials supporting ``with_trust_boundary`` factory"""
@abc.abstractmethod
def _refresh_token(self, request):
"""Refreshes the access token.
Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.
Raises:
google.auth.exceptions.RefreshError: If the credentials could
not be refreshed.
"""
raise NotImplementedError("_refresh_token must be implemented")
def with_trust_boundary(self, trust_boundary):
"""Returns a copy of these credentials with a modified trust boundary.
Args:
trust_boundary Mapping[str, str]: The trust boundary to use for the
credential. This should be a map with a "locations" key that maps to
a list of GCP regions, and a "encodedLocations" key that maps to a
hex string.
Returns:
google.auth.credentials.Credentials: A new credentials instance.
"""
raise NotImplementedError("This credential does not support trust boundaries.")
def _is_trust_boundary_lookup_required(self):
"""Checks if a trust boundary lookup is required.
A lookup is required if the feature is enabled via an environment
variable, the universe domain is supported, and a no-op boundary
is not already cached.
Returns:
bool: True if a trust boundary lookup is required, False otherwise.
"""
# 1. Check if the feature is enabled via environment variable.
if not _helpers.get_bool_from_env(
environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, default=False
):
return False
# 2. Skip trust boundary flow for non-default universe domains.
if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN:
return False
# 3. Do not trigger refresh if credential has a cached no-op trust boundary.
return not self._has_no_op_trust_boundary()
def _get_trust_boundary_header(self):
if self._trust_boundary is not None:
if self._has_no_op_trust_boundary():
# STS expects an empty string if the trust boundary value is no-op.
return {"x-allowed-locations": ""}
else:
return {"x-allowed-locations": self._trust_boundary["encodedLocations"]}
return {}
def apply(self, headers, token=None):
"""Apply the token to the authentication header."""
super().apply(headers, token)
headers.update(self._get_trust_boundary_header())
def refresh(self, request):
"""Refreshes the access token and the trust boundary.
This method calls the subclass's token refresh logic and then
refreshes the trust boundary if applicable.
"""
self._refresh_token(request)
self._refresh_trust_boundary(request)
def _refresh_trust_boundary(self, request):
"""Triggers a refresh of the trust boundary and updates the cache if necessary.
Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.
Raises:
google.auth.exceptions.RefreshError: If the trust boundary could
not be refreshed and no cached value is available.
"""
if not self._is_trust_boundary_lookup_required():
return
try:
self._trust_boundary = self._lookup_trust_boundary(request)
except exceptions.RefreshError as error:
# If the call to the lookup API failed, check if there is a trust boundary
# already cached. If there is, do nothing. If not, then throw the error.
if self._trust_boundary is None:
raise error
if _helpers.is_logging_enabled(_LOGGER):
_LOGGER.debug(
"Using cached trust boundary due to refresh error: %s", error
)
return
def _lookup_trust_boundary(self, request):
"""Calls the trust boundary lookup API to refresh the trust boundary cache.
Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.
Returns:
trust_boundary (dict): The trust boundary object returned by the lookup API.
Raises:
google.auth.exceptions.RefreshError: If the trust boundary could not be
retrieved.
"""
from google.oauth2 import _client
url = self._build_trust_boundary_lookup_url()
if not url:
raise exceptions.InvalidValue("Failed to build trust boundary lookup URL.")
headers = {}
self._apply(headers)
headers.update(self._get_trust_boundary_header())
return _client._lookup_trust_boundary(request, url, headers=headers)
@abc.abstractmethod
def _build_trust_boundary_lookup_url(self):
"""
Builds and returns the URL for the trust boundary lookup API.
This method should be implemented by subclasses to provide the
specific URL based on the credential type and its properties.
Returns:
str: The URL for the trust boundary lookup endpoint, or None
if lookup should be skipped (e.g., for non-applicable universe domains).
"""
raise NotImplementedError(
"_build_trust_boundary_lookup_url must be implemented"
)
def _has_no_op_trust_boundary(self):
# A no-op trust boundary is indicated by encodedLocations being "0x0".
# The "locations" list may or may not be present as an empty list.
if self._trust_boundary is None:
return False
return (
self._trust_boundary.get("encodedLocations")
== NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS
)
class AnonymousCredentials(Credentials):

@@ -389,4 +533,3 @@ """Credentials that do not provide any authentication information.

def requires_scopes(self):
"""True if these credentials require scopes to obtain an access token.
"""
"""True if these credentials require scopes to obtain an access token."""
return False

@@ -393,0 +536,0 @@

@@ -85,1 +85,5 @@ # Copyright 2016 Google LLC

AWS_DEFAULT_REGION = "AWS_DEFAULT_REGION"
GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED = "GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED"
"""Environment variable controlling whether to enable trust boundary feature.
The default value is false. Users have to explicitly set this value to true."""

@@ -63,4 +63,11 @@ # Copyright 2022 Google LLC

uri, use `with_token_uri`.
"""
**IMPORTANT**:
This class does not validate the credential configuration. A security
risk occurs when a credential configuration configured with malicious urls
is used.
When the credential configuration is accepted from an
untrusted source, you should validate it before using.
Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details."""
def __init__(

@@ -332,2 +339,10 @@ self,

**IMPORTANT**:
This method does not validate the credential configuration. A security
risk occurs when a credential configuration configured with malicious urls
is used.
When the credential configuration is accepted from an
untrusted source, you should validate it before using with this method.
Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
Args:

@@ -372,2 +387,10 @@ info (Mapping[str, str]): The external account info in Google

**IMPORTANT**:
This method does not validate the credential configuration. A security
risk occurs when a credential configuration configured with malicious urls
is used.
When the credential configuration is accepted from an
untrusted source, you should validate it before using with this method.
Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
Args:

@@ -374,0 +397,0 @@ filename (str): The path to the external account json file.

@@ -62,14 +62,14 @@ # Copyright 2020 Google LLC

"""A context class that contains information about the requested third party credential that is passed
to AWS security credential and subject token suppliers.
to AWS security credential and subject token suppliers.
Attributes:
subject_token_type (str): The requested subject token type based on the Oauth2.0 token exchange spec.
Expected values include::
Attributes:
subject_token_type (str): The requested subject token type based on the Oauth2.0 token exchange spec.
Expected values include::
“urn:ietf:params:oauth:token-type:jwt”
“urn:ietf:params:oauth:token-type:id-token”
“urn:ietf:params:oauth:token-type:saml2”
“urn:ietf:params:aws:token-type:aws4_request”
“urn:ietf:params:oauth:token-type:jwt”
“urn:ietf:params:oauth:token-type:id-token”
“urn:ietf:params:oauth:token-type:saml2”
“urn:ietf:params:aws:token-type:aws4_request”
audience (str): The requested audience for the subject token.
audience (str): The requested audience for the subject token.
"""

@@ -93,4 +93,11 @@

credentials for Google access tokens.
"""
**IMPORTANT**:
This class does not validate the credential configuration. A security
risk occurs when a credential configuration configured with malicious urls
is used.
When the credential configuration is accepted from an
untrusted source, you should validate it before using.
Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details."""
def __init__(

@@ -581,2 +588,10 @@ self,

**IMPORTANT**:
This method does not validate the credential configuration. A security
risk occurs when a credential configuration configured with malicious urls
is used.
When the credential configuration is accepted from an
untrusted source, you should validate it before using with this method.
Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
Args:

@@ -621,2 +636,10 @@ info (Mapping[str, str]): The external account info in Google

**IMPORTANT**:
This method does not validate the credential configuration. A security
risk occurs when a credential configuration configured with malicious urls
is used.
When the credential configuration is accepted from an
untrusted source, you should validate it before using with this method.
Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
Args:

@@ -623,0 +646,0 @@ filename (str): The path to the external account json file.

@@ -256,4 +256,12 @@ # Copyright 2020 Google LLC

class Credentials(external_account.Credentials):
"""External account credentials sourced from files and URLs."""
"""External account credentials sourced from files and URLs.
**IMPORTANT**:
This class does not validate the credential configuration. A security
risk occurs when a credential configuration configured with malicious urls
is used.
When the credential configuration is accepted from an
untrusted source, you should validate it before using.
Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details."""
def __init__(

@@ -501,2 +509,10 @@ self,

**IMPORTANT**:
This method does not validate the credential configuration. A security
risk occurs when a credential configuration configured with malicious urls
is used.
When the credential configuration is accepted from an
untrusted source, you should validate it before using with this method.
Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
Args:

@@ -522,2 +538,10 @@ info (Mapping[str, str]): The Identity Pool external account info in Google

**IMPORTANT**:
This method does not validate the credential configuration. A security
risk occurs when a credential configuration configured with malicious urls
is used.
When the credential configuration is accepted from an
untrusted source, you should validate it before using with this method.
Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
Args:

@@ -524,0 +548,0 @@ filename (str): The path to the IdentityPool external account json file.

@@ -49,2 +49,5 @@ # Copyright 2018 Google Inc.

_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
_TRUST_BOUNDARY_LOOKUP_ENDPOINT = (
"https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations"
)

@@ -121,3 +124,6 @@ _SOURCE_CREDENTIAL_AUTHORIZED_USER_TYPE = "authorized_user"

class Credentials(
credentials.Scoped, credentials.CredentialsWithQuotaProject, credentials.Signing
credentials.Scoped,
credentials.CredentialsWithQuotaProject,
credentials.Signing,
credentials.CredentialsWithTrustBoundary,
):

@@ -183,2 +189,10 @@ """This module defines impersonated credentials which are essentially

print(bucket.name)
**IMPORTANT**:
This class does not validate the credential configuration. A security
risk occurs when a credential configuration configured with malicious urls
is used.
When the credential configuration is accepted from an
untrusted source, you should validate it before using.
Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
"""

@@ -196,2 +210,3 @@

iam_endpoint_override=None,
trust_boundary=None,
):

@@ -227,2 +242,3 @@ """

using domain wide delegation.
trust_boundary (Mapping[str,str]): A credential trust boundary.
"""

@@ -259,2 +275,3 @@

self._cred_file_path = None
self._trust_boundary = trust_boundary

@@ -264,7 +281,3 @@ def _metric_header_for_usage(self):

@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
self._update_token(request)
def _update_token(self, request):
def _refresh_token(self, request):
"""Updates credentials with a new access_token representing

@@ -341,2 +354,24 @@ the impersonated account.

def _build_trust_boundary_lookup_url(self):
"""Builds and returns the URL for the trust boundary lookup API.
This method constructs the specific URL for the IAM Credentials API's
`allowedLocations` endpoint, using the credential's universe domain
and service account email.
Raises:
ValueError: If `self.service_account_email` is None or an empty
string, as it's required to form the URL.
Returns:
str: The URL for the trust boundary lookup endpoint.
"""
if not self.service_account_email:
raise ValueError(
"Service account email is required to build the trust boundary lookup URL."
)
return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
self.universe_domain, self.service_account_email
)
def sign_bytes(self, message):

@@ -411,2 +446,3 @@ from google.auth.transport.requests import AuthorizedSession

iam_endpoint_override=self._iam_endpoint_override,
trust_boundary=self._trust_boundary,
)

@@ -416,2 +452,8 @@ cred._cred_file_path = self._cred_file_path

@_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
def with_trust_boundary(self, trust_boundary):
cred = self._make_copy()
cred._trust_boundary = trust_boundary
return cred
@_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)

@@ -433,2 +475,10 @@ def with_quota_project(self, quota_project_id):

**IMPORTANT**:
This method does not validate the credential configuration. A security
risk occurs when a credential configuration configured with malicious urls
is used.
When the credential configuration is accepted from an
untrusted source, you should validate it before using with this method.
Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
Args:

@@ -501,6 +551,4 @@ info (Mapping[str, str]): The impersonated service account credentials info in Google

class IDTokenCredentials(credentials.CredentialsWithQuotaProject):
"""Open ID Connect ID Token-based service account credentials.
"""Open ID Connect ID Token-based service account credentials."""
"""
def __init__(

@@ -507,0 +555,0 @@ self,

@@ -60,4 +60,12 @@ # Copyright 2022 Google LLC

class Credentials(external_account.Credentials):
"""External account credentials sourced from executables."""
"""External account credentials sourced from executables.
**IMPORTANT**:
This class does not validate the credential configuration. A security
risk occurs when a credential configuration configured with malicious urls
is used.
When the credential configuration is accepted from an
untrusted source, you should validate it before using.
Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details."""
def __init__(

@@ -304,2 +312,10 @@ self,

**IMPORTANT**:
This method does not validate the credential configuration. A security
risk occurs when a credential configuration configured with malicious urls
is used.
When the credential configuration is accepted from an
untrusted source, you should validate it before using with this method.
Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
Args:

@@ -324,2 +340,10 @@ info (Mapping[str, str]): The Pluggable external account info in Google

**IMPORTANT**:
This method does not validate the credential configuration. A security
risk occurs when a credential configuration configured with malicious urls
is used.
When the credential configuration is accepted from an
untrusted source, you should validate it before using with this method.
Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details.
Args:

@@ -326,0 +350,0 @@ filename (str): The path to the Pluggable external account json file.

@@ -15,2 +15,2 @@ # Copyright 2021 Google LLC

__version__ = "2.40.3"
__version__ = "2.41.0"

@@ -340,2 +340,4 @@ # Copyright 2016 Google LLC

access_token (str): The access token used to call the IAM endpoint.
universe_domain (str): The universe domain for the request. The
default is ``googleapis.com``.

@@ -510,1 +512,116 @@ Returns:

return _handle_refresh_grant_response(response_data, refresh_token)
def _lookup_trust_boundary(request, url, headers=None):
"""Implements the global lookup of a credential trust boundary.
For the lookup, we send a request to the global lookup endpoint and then
parse the response. Service account credentials, workload identity
pools and workforce pools implementation may have trust boundaries configured.
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
url (str): The trust boundary lookup url.
headers (Optional[Mapping[str, str]]): The headers for the request.
Returns:
Mapping[str,list|str]: A dictionary containing
"locations" as a list of allowed locations as strings and
"encodedLocations" as a hex string.
e.g:
{
"locations": [
"us-central1", "us-east1", "europe-west1", "asia-east1"
],
"encodedLocations": "0xA30"
}
If the credential is not set up with explicit trust boundaries, a trust boundary
of "all" will be returned as a default response.
{
"locations": [],
"encodedLocations": "0x0"
}
Raises:
exceptions.RefreshError: If the response status code is not 200.
exceptions.MalformedError: If the response is not in a valid format.
"""
response_data = _lookup_trust_boundary_request(request, url, headers=headers)
# In case of no-op response, the "locations" list may or may not be present as an empty list.
if "encodedLocations" not in response_data:
raise exceptions.MalformedError(
"Invalid trust boundary info: {}".format(response_data)
)
return response_data
def _lookup_trust_boundary_request(request, url, can_retry=True, headers=None):
"""Makes a request to the trust boundary lookup endpoint.
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
url (str): The trust boundary lookup url.
can_retry (bool): Enable or disable request retry behavior. Defaults to true.
headers (Optional[Mapping[str, str]]): The headers for the request.
Returns:
Mapping[str, str]: The JSON-decoded response data.
Raises:
google.auth.exceptions.RefreshError: If the token endpoint returned
an error.
"""
response_status_ok, response_data, retryable_error = _lookup_trust_boundary_request_no_throw(
request, url, can_retry, headers
)
if not response_status_ok:
_handle_error_response(response_data, retryable_error)
return response_data
def _lookup_trust_boundary_request_no_throw(request, url, can_retry=True, headers=None):
"""Makes a request to the trust boundary lookup endpoint. This
function doesn't throw on response errors.
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
url (str): The trust boundary lookup url.
can_retry (bool): Enable or disable request retry behavior. Defaults to true.
headers (Optional[Mapping[str, str]]): The headers for the request.
Returns:
Tuple(bool, Mapping[str, str], Optional[bool]): A boolean indicating
if the request is successful, a mapping for the JSON-decoded response
data and in the case of an error a boolean indicating if the error
is retryable.
"""
response_data = {}
retryable_error = False
retries = _exponential_backoff.ExponentialBackoff()
for _ in retries:
response = request(method="GET", url=url, headers=headers)
response_body = (
response.data.decode("utf-8")
if hasattr(response.data, "decode")
else response.data
)
try:
# response_body should be a JSON
response_data = json.loads(response_body)
except ValueError:
response_data = response_body
if response.status == http_client.OK:
return True, response_data, None
retryable_error = _can_retry(
status_code=response.status, response_data=response_data
)
if not can_retry or not retryable_error:
return False, response_data, retryable_error
return False, response_data, retryable_error

@@ -87,2 +87,5 @@ # Copyright 2016 Google LLC

_GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"
_TRUST_BOUNDARY_LOOKUP_ENDPOINT = (
"https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations"
)

@@ -95,2 +98,3 @@

credentials.CredentialsWithTokenUri,
credentials.CredentialsWithTrustBoundary,
):

@@ -169,3 +173,3 @@ """Service account credentials

signed jwt is used for token refresh.
trust_boundary (str): String representation of trust boundary meta.
trust_boundary (Mapping[str,str]): A credential trust boundary.

@@ -200,3 +204,3 @@ .. note:: Typically one of the helper constructors

self._additional_claims = {}
self._trust_boundary = {"locations": [], "encoded_locations": "0x0"}
self._trust_boundary = trust_boundary

@@ -301,2 +305,3 @@ @classmethod

universe_domain=self._universe_domain,
trust_boundary=self._trust_boundary,
)

@@ -389,2 +394,8 @@ cred._cred_file_path = self._cred_file_path

@_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
def with_trust_boundary(self, trust_boundary):
cred = self._make_copy()
cred._trust_boundary = trust_boundary
return cred
def _make_authorization_grant_assertion(self):

@@ -433,4 +444,4 @@ """Create the OAuth 2.0 assertion.

@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
@_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
def _refresh_token(self, request):
if self._always_use_jwt_access and not self._jwt_credentials:

@@ -501,2 +512,24 @@ # If self signed jwt should be used but jwt credential is not

def _build_trust_boundary_lookup_url(self):
"""Builds and returns the URL for the trust boundary lookup API.
This method constructs the specific URL for the IAM Credentials API's
`allowedLocations` endpoint, using the credential's universe domain
and service account email.
Raises:
ValueError: If `self.service_account_email` is None or an empty
string, as it's required to form the URL.
Returns:
str: The URL for the trust boundary lookup endpoint.
"""
if not self.service_account_email:
raise ValueError(
"Service account email is required to build the trust boundary lookup URL."
)
return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
self._universe_domain, self._service_account_email
)
@_helpers.copy_docstring(credentials.Signing)

@@ -602,2 +635,3 @@ def sign_bytes(self, message):

endpoint.
.. note:: Typically one of the helper constructors

@@ -818,2 +852,3 @@ :meth:`from_service_account_file` or

jwt_credentials.refresh(request)
self.token, self.expiry = _client.call_iam_generate_id_token_endpoint(

@@ -820,0 +855,0 @@ request,

@@ -1,4 +0,4 @@

Metadata-Version: 2.1
Metadata-Version: 2.4
Name: google-auth
Version: 2.40.3
Version: 2.41.0
Summary: Google Authentication Library

@@ -28,3 +28,3 @@ Home-page: https://github.com/googleapis/google-auth-library-python

License-File: LICENSE
Requires-Dist: cachetools<6.0,>=2.0.0
Requires-Dist: cachetools<7.0,>=2.0.0
Requires-Dist: pyasn1-modules>=0.2.1

@@ -78,2 +78,14 @@ Requires-Dist: rsa<5,>=3.1.4

Requires-Dist: packaging; extra == "urllib3"
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: home-page
Dynamic: keywords
Dynamic: license
Dynamic: license-file
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

@@ -80,0 +92,0 @@ Google Auth Python Library

@@ -23,3 +23,3 @@ # Copyright 2014 Google Inc.

DEPENDENCIES = (
"cachetools>=2.0.0,<6.0",
"cachetools>=2.0.0,<7.0",
"pyasn1-modules>=0.2.1",

@@ -26,0 +26,0 @@ # rsa==4.5 is the last version to support 2.7

@@ -16,2 +16,3 @@ # Copyright 2016 Google LLC

import datetime
import os

@@ -23,2 +24,3 @@ import mock

from google.auth import _helpers
from google.auth import environment_vars
from google.auth import exceptions

@@ -29,2 +31,3 @@ from google.auth import jwt

from google.auth.transport import requests
from google.oauth2 import _client

@@ -55,3 +58,2 @@ SAMPLE_ID_TOKEN_EXP = 1584393400

)
FAKE_SERVICE_ACCOUNT_EMAIL = "foo@bar.com"

@@ -67,2 +69,5 @@ FAKE_QUOTA_PROJECT_ID = "fake-quota-project"

credentials_with_all_fields = None
VALID_TRUST_BOUNDARY = {"encodedLocations": "valid-encoded-locations"}
NO_OP_TRUST_BOUNDARY = {"encodedLocations": ""}
EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/default/allowedLocations"

@@ -182,2 +187,14 @@ @pytest.fixture(autouse=True)

@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_no_email(self, get):
get.return_value = {
# No "email" field.
"scopes": ["one", "two"]
}
with pytest.raises(exceptions.RefreshError) as excinfo:
self.credentials.refresh(None)
assert excinfo.match(r"missing 'email' field")
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_error(self, get):

@@ -250,2 +267,14 @@ get.side_effect = exceptions.TransportError("http error")

def test_with_trust_boundary(self):
creds = self.credentials_with_all_fields
new_boundary = {"encodedLocations": "new_boundary"}
new_creds = creds.with_trust_boundary(new_boundary)
assert new_creds is not creds
assert new_creds._trust_boundary == new_boundary
assert new_creds._service_account_email == creds._service_account_email
assert new_creds._quota_project_id == creds._quota_project_id
assert new_creds._scopes == creds._scopes
assert new_creds._default_scopes == creds._default_scopes
def test_token_usage_metrics(self):

@@ -290,3 +319,352 @@ self.credentials.token = "token"

@mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_trust_boundary_lookup_skipped_if_env_var_not_true(
self, mock_metadata_get, mock_lookup_tb
):
creds = self.credentials
request = mock.Mock()
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "default", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token", "expires_in": 3600},
]
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"}
):
creds.refresh(request)
mock_lookup_tb.assert_not_called()
assert creds._trust_boundary is None
@mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_trust_boundary_lookup_skipped_if_env_var_missing(
self, mock_metadata_get, mock_lookup_tb
):
creds = self.credentials
request = mock.Mock()
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "default", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token", "expires_in": 3600},
]
with mock.patch.dict(os.environ, clear=True):
creds.refresh(request)
mock_lookup_tb.assert_not_called()
assert creds._trust_boundary is None
@mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_trust_boundary_lookup_success(
self, mock_metadata_get, mock_lookup_tb
):
mock_lookup_tb.return_value = {
"locations": ["us-central1"],
"encodedLocations": "0xABC",
}
creds = self.credentials
request = mock.Mock()
# The first call to _metadata.get is for service account info, the second
# for the access token, and the third for the universe domain.
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "resolved-email@example.com", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token", "expires_in": 3600},
# from get_universe_domain
"",
]
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
creds.refresh(request)
# Verify _metadata.get was called three times.
assert mock_metadata_get.call_count == 3
# Verify lookup_trust_boundary was called with correct URL and token
expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations"
mock_lookup_tb.assert_called_once_with(
request, expected_url, headers={"authorization": "Bearer mock_token"}
)
# Verify trust boundary was set
assert creds._trust_boundary == {
"locations": ["us-central1"],
"encodedLocations": "0xABC",
}
# Verify x-allowed-locations header is set by apply()
headers_applied = {}
creds.apply(headers_applied)
assert headers_applied["x-allowed-locations"] == "0xABC"
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
def test_refresh_trust_boundary_lookup_fails_no_cache(
self, mock_lookup_tb, mock_metadata_get
):
mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed")
creds = self.credentials
request = mock.Mock()
# Mock metadata calls for token, universe domain, and service account info
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "resolved-email@example.com", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token", "expires_in": 3600},
# from get_universe_domain
"",
]
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
with pytest.raises(exceptions.RefreshError, match="Lookup failed"):
creds.refresh(request)
assert creds._trust_boundary is None
assert mock_metadata_get.call_count == 3
mock_lookup_tb.assert_called_once()
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
def test_refresh_trust_boundary_lookup_fails_with_cached_data(
self, mock_lookup_tb, mock_metadata_get
):
# First refresh: Successfully fetch a valid trust boundary.
mock_lookup_tb.return_value = {
"locations": ["us-central1"],
"encodedLocations": "0xABC",
}
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "resolved-email@example.com", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token_1", "expires_in": 3600},
# from get_universe_domain
"",
]
creds = self.credentials
request = mock.Mock()
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
creds.refresh(request)
assert creds._trust_boundary == {
"locations": ["us-central1"],
"encodedLocations": "0xABC",
}
mock_lookup_tb.assert_called_once()
# Second refresh: Mock lookup to fail, but expect cached data to be preserved.
mock_lookup_tb.reset_mock()
mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed")
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
# This refresh should not raise an error because a cached value exists.
mock_metadata_get.reset_mock()
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "resolved-email@example.com", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token_2", "expires_in": 3600},
# from get_universe_domain
"",
]
creds.refresh(request)
assert creds._trust_boundary == {
"locations": ["us-central1"],
"encodedLocations": "0xABC",
}
mock_lookup_tb.assert_called_once()
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
def test_refresh_fetches_no_op_trust_boundary(
self, mock_lookup_tb, mock_metadata_get
):
mock_lookup_tb.return_value = {"locations": [], "encodedLocations": "0x0"}
creds = self.credentials
request = mock.Mock()
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "resolved-email@example.com", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token", "expires_in": 3600},
# from get_universe_domain
"",
]
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
creds.refresh(request)
assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"}
assert mock_metadata_get.call_count == 3
expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations"
mock_lookup_tb.assert_called_once_with(
request, expected_url, headers={"authorization": "Bearer mock_token"}
)
# Verify that an empty header was added.
headers_applied = {}
creds.apply(headers_applied)
assert headers_applied["x-allowed-locations"] == ""
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
def test_refresh_starts_with_no_op_trust_boundary_skips_lookup(
self, mock_lookup_tb, mock_metadata_get
):
creds = self.credentials
# Use pre-cache universe domain to avoid an extra metadata call.
creds._universe_domain_cached = True
creds._trust_boundary = {"locations": [], "encodedLocations": "0x0"}
request = mock.Mock()
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "resolved-email@example.com", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token", "expires_in": 3600},
]
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
creds.refresh(request)
# Verify trust boundary remained NO_OP
assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"}
# Lookup should be skipped
mock_lookup_tb.assert_not_called()
# Two metadata calls for token refresh should have happened.
assert mock_metadata_get.call_count == 2
# Verify that an empty header was added.
headers_applied = {}
creds.apply(headers_applied)
assert headers_applied["x-allowed-locations"] == ""
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
@mock.patch(
"google.auth.compute_engine._metadata.get_universe_domain", autospec=True
)
def test_build_trust_boundary_lookup_url_default_email(
self, mock_get_universe_domain, mock_get_service_account_info
):
# Test with default service account email, which needs resolution
creds = self.credentials
creds._service_account_email = "default"
mock_get_service_account_info.return_value = {
"email": "resolved-email@example.com"
}
mock_get_universe_domain.return_value = "googleapis.com"
url = creds._build_trust_boundary_lookup_url()
mock_get_service_account_info.assert_called_once_with(mock.ANY, "default")
mock_get_universe_domain.assert_called_once_with(mock.ANY)
assert url == (
"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/resolved-email@example.com/allowedLocations"
)
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
@mock.patch(
"google.auth.compute_engine._metadata.get_universe_domain", autospec=True
)
def test_build_trust_boundary_lookup_url_explicit_email(
self, mock_get_universe_domain, mock_get_service_account_info
):
# Test with an explicit service account email, no resolution needed
creds = self.credentials
creds._service_account_email = FAKE_SERVICE_ACCOUNT_EMAIL
mock_get_universe_domain.return_value = "googleapis.com"
url = creds._build_trust_boundary_lookup_url()
mock_get_service_account_info.assert_not_called()
mock_get_universe_domain.assert_called_once_with(mock.ANY)
assert url == (
"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/foo@bar.com/allowedLocations"
)
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
@mock.patch(
"google.auth.compute_engine._metadata.get_universe_domain", autospec=True
)
def test_build_trust_boundary_lookup_url_non_default_universe(
self, mock_get_universe_domain, mock_get_service_account_info
):
# Test with a non-default universe domain
creds = self.credentials_with_all_fields
url = creds._build_trust_boundary_lookup_url()
# Universe domain is cached and email is explicit, so no metadata calls needed.
mock_get_service_account_info.assert_not_called()
mock_get_universe_domain.assert_not_called()
assert url == (
"https://iamcredentials.fake-universe-domain/v1/projects/-/serviceAccounts/foo@bar.com/allowedLocations"
)
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
def test_build_trust_boundary_lookup_url_get_service_account_info_fails(
self, mock_get_service_account_info
):
# Test scenario where get_service_account_info fails
mock_get_service_account_info.side_effect = exceptions.TransportError(
"Failed to get info"
)
creds = self.credentials
creds._service_account_email = "default"
with pytest.raises(
exceptions.RefreshError,
match=r"Failed to get service account email for trust boundary lookup: .*",
):
creds._build_trust_boundary_lookup_url()
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
def test_build_trust_boundary_lookup_url_no_email(
self, mock_get_service_account_info
):
# Test with default service account email, which needs resolution, but metadata
# returns no email.
creds = self.credentials
creds._service_account_email = "default"
mock_get_service_account_info.return_value = {"scopes": ["one", "two"]}
with pytest.raises(exceptions.RefreshError) as excinfo:
creds._build_trust_boundary_lookup_url()
assert excinfo.match(r"missing 'email' field")
class TestIDTokenCredentials(object):

@@ -477,3 +855,3 @@ credentials = None

def test_with_target_audience_integration(self):
""" Test that it is possible to refresh credentials
"""Test that it is possible to refresh credentials
generated from `with_target_audience`.

@@ -646,3 +1024,3 @@

def test_with_quota_project_integration(self):
""" Test that it is possible to refresh credentials
"""Test that it is possible to refresh credentials
generated from `with_quota_project`.

@@ -649,0 +1027,0 @@

@@ -327,3 +327,3 @@ # Copyright 2016 Google LLC

"fake_access_token",
"googleapis.com",
universe_domain="googleapis.com",
)

@@ -634,1 +634,176 @@

assert mock_request.call_count == 1
def test_lookup_trust_boundary():
response_data = {
"locations": ["us-central1", "us-east1"],
"encodedLocations": "0x80080000000000",
}
mock_response = mock.create_autospec(transport.Response, instance=True)
mock_response.status = http_client.OK
mock_response.data = json.dumps(response_data).encode("utf-8")
mock_request = mock.create_autospec(transport.Request)
mock_request.return_value = mock_response
url = "http://example.com"
headers = {"Authorization": "Bearer access_token"}
response = _client._lookup_trust_boundary(mock_request, url, headers=headers)
assert response["encodedLocations"] == "0x80080000000000"
assert response["locations"] == ["us-central1", "us-east1"]
mock_request.assert_called_once_with(method="GET", url=url, headers=headers)
def test_lookup_trust_boundary_no_op_response_without_locations():
response_data = {"encodedLocations": "0x0"}
mock_response = mock.create_autospec(transport.Response, instance=True)
mock_response.status = http_client.OK
mock_response.data = json.dumps(response_data).encode("utf-8")
mock_request = mock.create_autospec(transport.Request)
mock_request.return_value = mock_response
url = "http://example.com"
headers = {"Authorization": "Bearer access_token"}
# for the response to be valid, we only need encodedLocations to be present.
response = _client._lookup_trust_boundary(mock_request, url, headers=headers)
assert response["encodedLocations"] == "0x0"
assert "locations" not in response
mock_request.assert_called_once_with(method="GET", url=url, headers=headers)
def test_lookup_trust_boundary_no_op_response():
response_data = {"locations": [], "encodedLocations": "0x0"}
mock_response = mock.create_autospec(transport.Response, instance=True)
mock_response.status = http_client.OK
mock_response.data = json.dumps(response_data).encode("utf-8")
mock_request = mock.create_autospec(transport.Request)
mock_request.return_value = mock_response
url = "http://example.com"
headers = {"Authorization": "Bearer access_token"}
response = _client._lookup_trust_boundary(mock_request, url, headers=headers)
assert response["encodedLocations"] == "0x0"
assert response["locations"] == []
mock_request.assert_called_once_with(method="GET", url=url, headers=headers)
def test_lookup_trust_boundary_error():
mock_response = mock.create_autospec(transport.Response, instance=True)
mock_response.status = http_client.INTERNAL_SERVER_ERROR
mock_response.data = "this is an error message"
mock_request = mock.create_autospec(transport.Request)
mock_request.return_value = mock_response
url = "http://example.com"
headers = {"Authorization": "Bearer access_token"}
with pytest.raises(exceptions.RefreshError) as excinfo:
_client._lookup_trust_boundary(mock_request, url, headers=headers)
assert excinfo.match("this is an error message")
mock_request.assert_called_with(method="GET", url=url, headers=headers)
def test_lookup_trust_boundary_missing_encoded_locations():
response_data = {"locations": [], "bad_field": "0x0"}
mock_response = mock.create_autospec(transport.Response, instance=True)
mock_response.status = http_client.OK
mock_response.data = json.dumps(response_data).encode("utf-8")
mock_request = mock.create_autospec(transport.Request)
mock_request.return_value = mock_response
url = "http://example.com"
headers = {"Authorization": "Bearer access_token"}
with pytest.raises(exceptions.MalformedError) as excinfo:
_client._lookup_trust_boundary(mock_request, url, headers=headers)
assert excinfo.match("Invalid trust boundary info")
mock_request.assert_called_once_with(method="GET", url=url, headers=headers)
def test_lookup_trust_boundary_internal_failure_and_retry_failure_error():
retryable_error = mock.create_autospec(transport.Response, instance=True)
retryable_error.status = http_client.BAD_REQUEST
retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode(
"utf-8"
)
unretryable_error = mock.create_autospec(transport.Response, instance=True)
unretryable_error.status = http_client.BAD_REQUEST
unretryable_error.data = json.dumps({"error_description": "invalid_scope"}).encode(
"utf-8"
)
request = mock.create_autospec(transport.Request)
request.side_effect = [retryable_error, retryable_error, unretryable_error]
headers = {"Authorization": "Bearer access_token"}
with pytest.raises(exceptions.RefreshError):
_client._lookup_trust_boundary_request(
request, "http://example.com", headers=headers
)
# request should be called three times. Two retryable errors and one
# unretryable error to break the retry loop.
assert request.call_count == 3
for call in request.call_args_list:
assert call[1]["headers"] == headers
def test_lookup_trust_boundary_internal_failure_and_retry_succeeds():
retryable_error = mock.create_autospec(transport.Response, instance=True)
retryable_error.status = http_client.BAD_REQUEST
retryable_error.data = json.dumps({"error_description": "internal_failure"}).encode(
"utf-8"
)
response_data = {"locations": [], "encodedLocations": "0x0"}
response = mock.create_autospec(transport.Response, instance=True)
response.status = http_client.OK
response.data = json.dumps(response_data).encode("utf-8")
request = mock.create_autospec(transport.Request)
headers = {"Authorization": "Bearer access_token"}
request.side_effect = [retryable_error, response]
_ = _client._lookup_trust_boundary_request(
request, "http://example.com", headers=headers
)
assert request.call_count == 2
for call in request.call_args_list:
assert call[1]["headers"] == headers
def test_lookup_trust_boundary_with_headers():
response_data = {
"locations": ["us-central1", "us-east1"],
"encodedLocations": "0x80080000000000",
}
mock_response = mock.create_autospec(transport.Response, instance=True)
mock_response.status = http_client.OK
mock_response.data = json.dumps(response_data).encode("utf-8")
mock_request = mock.create_autospec(transport.Request)
mock_request.return_value = mock_response
headers = {"Authorization": "Bearer access_token", "x-test-header": "test-value"}
_client._lookup_trust_boundary(mock_request, "http://example.com", headers=headers)
mock_request.assert_called_once_with(
method="GET", url="http://example.com", headers=headers
)

@@ -23,3 +23,5 @@ # Copyright 2016 Google LLC

from google.auth import _helpers
from google.auth import credentials
from google.auth import crypt
from google.auth import environment_vars
from google.auth import exceptions

@@ -62,5 +64,21 @@ from google.auth import iam

TOKEN_URI = "https://example.com/oauth2/token"
NO_OP_TRUST_BOUNDARY = {
"locations": credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS,
"encodedLocations": credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS,
}
VALID_TRUST_BOUNDARY = {
"locations": ["us-central1", "us-east1"],
"encodedLocations": "0xVALIDHEXSA",
}
EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = (
"https://iamcredentials.googleapis.com/v1/projects/-"
"/serviceAccounts/service-account@example.com/allowedLocations"
)
@classmethod
def make_credentials(cls, universe_domain=DEFAULT_UNIVERSE_DOMAIN):
def make_credentials(
cls,
universe_domain=DEFAULT_UNIVERSE_DOMAIN,
trust_boundary=None, # Align with Credentials class default
):
return service_account.Credentials(

@@ -71,2 +89,3 @@ SIGNER,

universe_domain=universe_domain,
trust_boundary=trust_boundary,
)

@@ -257,2 +276,14 @@

def test_with_trust_boundary(self):
credentials = self.make_credentials()
new_boundary = {"encodedLocations": "new_boundary"}
new_credentials = credentials.with_trust_boundary(new_boundary)
assert new_credentials is not credentials
assert new_credentials._trust_boundary == new_boundary
assert new_credentials._signer == credentials._signer
assert (
new_credentials.service_account_email == credentials.service_account_email
)
def test__make_authorization_grant_assertion(self):

@@ -502,7 +533,39 @@ credentials = self.make_credentials()

# Check that the credentials are valid (have a token and are not
# expired)
# Check that the credentials are valid (have a token and are not expired).
assert credentials.valid
# Trust boundary should be None since env var is not set and no initial
# boundary was provided.
assert credentials._trust_boundary is None
@mock.patch("google.oauth2._client._lookup_trust_boundary")
@mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_refresh_skips_trust_boundary_lookup_non_default_universe(
self, mock_jwt_grant, mock_lookup_trust_boundary
):
# Create credentials with a non-default universe domain
credentials = self.make_credentials(universe_domain=FAKE_UNIVERSE_DOMAIN)
token = "token"
mock_jwt_grant.return_value = (
token,
_helpers.utcnow() + datetime.timedelta(seconds=500),
{},
)
request = mock.create_autospec(transport.Request, instance=True)
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
credentials.refresh(request)
# Ensure jwt_grant was called (token refresh happened)
mock_jwt_grant.assert_called_once()
# Ensure trust boundary lookup was not called
mock_lookup_trust_boundary.assert_not_called()
# Verify that x-allowed-locations header is not set by apply()
headers_applied = {}
credentials.apply(headers_applied)
assert "x-allowed-locations" not in headers_applied
@mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_before_request_refreshes(self, jwt_grant):

@@ -615,3 +678,205 @@ credentials = self.make_credentials()

@mock.patch("google.oauth2._client._lookup_trust_boundary")
@mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_refresh_success_with_valid_trust_boundary(
self, mock_jwt_grant, mock_lookup_trust_boundary
):
# Start with no boundary.
credentials = self.make_credentials(trust_boundary=None)
token = "token"
mock_jwt_grant.return_value = (
token,
_helpers.utcnow() + datetime.timedelta(seconds=500),
{},
)
request = mock.create_autospec(transport.Request, instance=True)
# Mock the trust boundary lookup to return a valid boundary.
mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
credentials.refresh(request)
assert credentials.valid
assert credentials.token == token
# Verify trust boundary was set.
assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
# Verify the mock was called with the correct URL.
mock_lookup_trust_boundary.assert_called_once_with(
request,
self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
headers={"authorization": "Bearer token"},
)
# Verify x-allowed-locations header is set correctly by apply().
headers_applied = {}
credentials.apply(headers_applied)
assert (
headers_applied["x-allowed-locations"]
== self.VALID_TRUST_BOUNDARY["encodedLocations"]
)
@mock.patch("google.oauth2._client._lookup_trust_boundary")
@mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_refresh_fetches_no_op_trust_boundary(
self, mock_jwt_grant, mock_lookup_trust_boundary
):
# Start with no trust boundary
credentials = self.make_credentials(trust_boundary=None)
token = "token"
mock_jwt_grant.return_value = (
token,
_helpers.utcnow() + datetime.timedelta(seconds=500),
{},
)
request = mock.create_autospec(transport.Request, instance=True)
mock_lookup_trust_boundary.return_value = self.NO_OP_TRUST_BOUNDARY
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
credentials.refresh(request)
assert credentials.valid
assert credentials.token == token
assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY
mock_lookup_trust_boundary.assert_called_once_with(
request,
self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
headers={"authorization": "Bearer token"},
)
headers_applied = {}
credentials.apply(headers_applied)
assert headers_applied["x-allowed-locations"] == ""
@mock.patch("google.oauth2._client._lookup_trust_boundary")
@mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_refresh_starts_with_no_op_trust_boundary_skips_lookup(
self, mock_jwt_grant, mock_lookup_trust_boundary
):
credentials = self.make_credentials(
trust_boundary=self.NO_OP_TRUST_BOUNDARY
) # Start with NO_OP
token = "token"
mock_jwt_grant.return_value = (
token,
_helpers.utcnow() + datetime.timedelta(seconds=500),
{},
)
request = mock.create_autospec(transport.Request, instance=True)
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
credentials.refresh(request)
assert credentials.valid
assert credentials.token == token
# Verify trust boundary remained NO_OP
assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY
# Lookup should be skipped
mock_lookup_trust_boundary.assert_not_called()
# Verify that an empty header was added.
headers_applied = {}
credentials.apply(headers_applied)
assert headers_applied["x-allowed-locations"] == ""
@mock.patch("google.oauth2._client._lookup_trust_boundary")
@mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_refresh_trust_boundary_lookup_fails_no_cache(
self, mock_jwt_grant, mock_lookup_trust_boundary
):
# Start with no trust boundary
credentials = self.make_credentials(trust_boundary=None)
mock_lookup_trust_boundary.side_effect = exceptions.RefreshError(
"Lookup failed"
)
mock_jwt_grant.return_value = (
"mock_access_token",
_helpers.utcnow() + datetime.timedelta(seconds=3600),
{},
)
request = mock.create_autospec(transport.Request, instance=True)
# Mock the trust boundary lookup to raise an error
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
), pytest.raises(exceptions.RefreshError, match="Lookup failed"):
credentials.refresh(request)
assert credentials._trust_boundary is None
mock_lookup_trust_boundary.assert_called_once()
@mock.patch("google.oauth2._client._lookup_trust_boundary")
@mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_refresh_trust_boundary_lookup_fails_with_cached_data(
self, mock_jwt_grant, mock_lookup_trust_boundary
):
# Initial setup: Credentials with no trust boundary.
credentials = self.make_credentials(trust_boundary=None)
token = "token"
mock_jwt_grant.return_value = (
token,
_helpers.utcnow() + datetime.timedelta(seconds=500),
{},
)
request = mock.create_autospec(transport.Request, instance=True)
# First refresh: Successfully fetch a valid trust boundary.
mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
credentials.refresh(request)
assert credentials.valid
assert credentials.token == token
assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
mock_lookup_trust_boundary.assert_called_once_with(
request,
self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
headers={"authorization": "Bearer token"},
)
# Second refresh: Mock lookup to fail, but expect cached data to be preserved.
mock_lookup_trust_boundary.reset_mock()
mock_lookup_trust_boundary.side_effect = exceptions.RefreshError(
"Lookup failed"
)
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
credentials.refresh(request) # This should NOT raise an exception
assert credentials.valid # Credentials should still be valid
assert (
credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
) # Cached data should be preserved
mock_lookup_trust_boundary.assert_called_once_with(
request,
self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
headers={
"authorization": "Bearer token",
"x-allowed-locations": self.VALID_TRUST_BOUNDARY["encodedLocations"],
},
) # Lookup should have been attempted again
def test_build_trust_boundary_lookup_url_no_email(self):
credentials = self.make_credentials()
credentials._service_account_email = None
with pytest.raises(ValueError) as excinfo:
credentials._build_trust_boundary_lookup_url()
assert "Service account email is required" in str(excinfo.value)
class TestIDTokenCredentials(object):

@@ -798,5 +1063,10 @@ SERVICE_ACCOUNT_EMAIL = "service-account@example.com"

credentials.refresh(request)
req, iam_endpoint, signer_email, target_audience, access_token, universe_domain = call_iam_generate_id_token_endpoint.call_args[
0
]
(
req,
iam_endpoint,
signer_email,
target_audience,
access_token,
universe_domain,
) = call_iam_generate_id_token_endpoint.call_args[0]
assert req == request

@@ -821,5 +1091,10 @@ assert iam_endpoint == iam._IAM_IDTOKEN_ENDPOINT

credentials.refresh(request)
req, iam_endpoint, signer_email, target_audience, access_token, universe_domain = call_iam_generate_id_token_endpoint.call_args[
0
]
(
req,
iam_endpoint,
signer_email,
target_audience,
access_token,
universe_domain,
) = call_iam_generate_id_token_endpoint.call_args[0]
assert req == request

@@ -826,0 +1101,0 @@ assert (

@@ -23,3 +23,3 @@ # Copyright 2016 Google LLC

from google.auth import _helpers
from google.auth import _helpers, exceptions

@@ -238,2 +238,29 @@ # _MOCK_BASE_LOGGER_NAME is the base logger namespace used for testing.

def test_get_bool_from_env(monkeypatch):
# Test default value when environment variable is not set.
assert _helpers.get_bool_from_env("TEST_VAR") is False
assert _helpers.get_bool_from_env("TEST_VAR", default=True) is True
# Test true values (case-insensitive)
for true_value in ("true", "True", "TRUE", "1"):
monkeypatch.setenv("TEST_VAR", true_value)
assert _helpers.get_bool_from_env("TEST_VAR") is True
# Test false values (case-insensitive)
for false_value in ("false", "False", "FALSE", "0"):
monkeypatch.setenv("TEST_VAR", false_value)
assert _helpers.get_bool_from_env("TEST_VAR") is False
# Test invalid value
monkeypatch.setenv("TEST_VAR", "invalid_value")
with pytest.raises(exceptions.InvalidValue) as excinfo:
_helpers.get_bool_from_env("TEST_VAR")
assert 'must be one of "true", "false", "1", or "0"' in str(excinfo.value)
# Test empty string value
monkeypatch.setenv("TEST_VAR", "")
with pytest.raises(exceptions.InvalidValue):
_helpers.get_bool_from_env("TEST_VAR")
def test_hash_sensitive_info_basic():

@@ -240,0 +267,0 @@ test_data = {

@@ -16,2 +16,3 @@ # Copyright 2016 Google LLC

import datetime
import os

@@ -23,7 +24,10 @@ import mock

from google.auth import credentials
from google.auth import environment_vars
from google.auth import exceptions
from google.oauth2 import _client
class CredentialsImpl(credentials.Credentials):
def refresh(self, request):
self.token = request
class CredentialsImpl(credentials.CredentialsWithTrustBoundary):
def _refresh_token(self, request):
self.token = "refreshed-token"
self.expiry = (

@@ -38,3 +42,7 @@ datetime.datetime.utcnow()

def _build_trust_boundary_lookup_url(self):
# Using self.token here to make the URL dynamic for testing purposes
return "http://mock.url/lookup_for_{}".format(self.token)
class CredentialsImplWithMetrics(credentials.Credentials):

@@ -95,3 +103,3 @@ def refresh(self, request):

credentials = CredentialsImpl()
request = "token"
request = mock.Mock()
headers = {}

@@ -102,7 +110,7 @@

assert credentials.valid
assert credentials.token == "token"
assert headers["authorization"] == "Bearer token"
assert credentials.token == "refreshed-token"
assert headers["authorization"] == "Bearer refreshed-token"
assert "x-allowed-locations" not in headers
request = "token2"
request = mock.Mock()
headers = {}

@@ -113,4 +121,4 @@

assert credentials.valid
assert credentials.token == "token"
assert headers["authorization"] == "Bearer token"
assert credentials.token == "refreshed-token"
assert headers["authorization"] == "Bearer refreshed-token"
assert "x-allowed-locations" not in headers

@@ -122,4 +130,4 @@

credentials = CredentialsImpl()
credentials._trust_boundary = {"locations": [], "encoded_locations": DUMMY_BOUNDARY}
request = "token"
credentials._trust_boundary = {"locations": [], "encodedLocations": DUMMY_BOUNDARY}
request = mock.Mock()
headers = {}

@@ -130,7 +138,7 @@

assert credentials.valid
assert credentials.token == "token"
assert headers["authorization"] == "Bearer token"
assert credentials.token == "refreshed-token"
assert headers["authorization"] == "Bearer refreshed-token"
assert headers["x-allowed-locations"] == DUMMY_BOUNDARY
request = "token2"
request = mock.Mock()
headers = {}

@@ -141,4 +149,4 @@

assert credentials.valid
assert credentials.token == "token"
assert headers["authorization"] == "Bearer token"
assert credentials.token == "refreshed-token"
assert headers["authorization"] == "Bearer refreshed-token"
assert headers["x-allowed-locations"] == DUMMY_BOUNDARY

@@ -210,3 +218,15 @@

# Test with default scopes
credentials_with_default = ReadOnlyScopedCredentialsImpl()
credentials_with_default._default_scopes = ["one", "two"]
assert credentials_with_default.has_scopes(["one", "two"])
assert not credentials_with_default.has_scopes(["three"])
# Test with no scopes
credentials_no_scopes = ReadOnlyScopedCredentialsImpl()
assert not credentials_no_scopes.has_scopes(["one"])
assert credentials_no_scopes.has_scopes([])
def test_readonly_scoped_credentials_requires_scopes():

@@ -258,3 +278,3 @@ credentials = ReadOnlyScopedCredentialsImpl()

request = "token"
request = mock.Mock()

@@ -272,3 +292,3 @@ c.refresh(request)

request = "token"
request = mock.Mock()
headers = {}

@@ -281,4 +301,4 @@

assert c.valid
assert c.token == "token"
assert headers["authorization"] == "Bearer token"
assert c.token == "refreshed-token"
assert headers["authorization"] == "Bearer refreshed-token"
assert "x-identity-trust-boundary" not in headers

@@ -291,3 +311,3 @@

request = "token"
request = mock.Mock()
headers = {}

@@ -313,4 +333,4 @@

assert c.valid
assert c.token == "token"
assert headers["authorization"] == "Bearer token"
assert c.token == "refreshed-token"
assert headers["authorization"] == "Bearer refreshed-token"
assert "x-identity-trust-boundary" not in headers

@@ -323,3 +343,3 @@

request = "token"
request = mock.Mock()
headers = {}

@@ -347,4 +367,4 @@

assert c.valid
assert c.token == "token"
assert headers["authorization"] == "Bearer token"
assert c.token == "refreshed-token"
assert headers["authorization"] == "Bearer refreshed-token"
assert "x-identity-trust-boundary" not in headers

@@ -356,3 +376,3 @@

request = "token"
request = mock.Mock()
c.refresh(request)

@@ -364,1 +384,113 @@

c.before_request(request, "http://example.com", "GET", {})
class TestCredentialsWithTrustBoundary(object):
@mock.patch.object(_client, "_lookup_trust_boundary")
def test_lookup_trust_boundary_env_var_not_true(self, mock_lookup_tb):
creds = CredentialsImpl()
request = mock.Mock()
# Ensure env var is not "true"
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"}
):
result = creds._refresh_trust_boundary(request)
assert result is None
mock_lookup_tb.assert_not_called()
@mock.patch.object(_client, "_lookup_trust_boundary")
def test_lookup_trust_boundary_env_var_missing(self, mock_lookup_tb):
creds = CredentialsImpl()
request = mock.Mock()
# Ensure env var is missing
with mock.patch.dict(os.environ, clear=True):
result = creds._refresh_trust_boundary(request)
assert result is None
mock_lookup_tb.assert_not_called()
@mock.patch.object(_client, "_lookup_trust_boundary")
def test_lookup_trust_boundary_non_default_universe(self, mock_lookup_tb):
creds = CredentialsImpl()
creds._universe_domain = "my.universe.com" # Non-GDU
request = mock.Mock()
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
result = creds._refresh_trust_boundary(request)
assert result is None
mock_lookup_tb.assert_not_called()
@mock.patch.object(_client, "_lookup_trust_boundary")
def test_lookup_trust_boundary_calls_client_and_build_url(self, mock_lookup_tb):
creds = CredentialsImpl()
creds.token = "test_token" # For _build_trust_boundary_lookup_url
request = mock.Mock()
expected_url = "http://mock.url/lookup_for_test_token"
expected_boundary_info = {"encodedLocations": "0xABC"}
mock_lookup_tb.return_value = expected_boundary_info
# Mock _build_trust_boundary_lookup_url to ensure it's called.
mock_build_url = mock.Mock(return_value=expected_url)
creds._build_trust_boundary_lookup_url = mock_build_url
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
result = creds._lookup_trust_boundary(request)
assert result == expected_boundary_info
mock_build_url.assert_called_once()
expected_headers = {"authorization": "Bearer test_token"}
mock_lookup_tb.assert_called_once_with(
request, expected_url, headers=expected_headers
)
@mock.patch.object(_client, "_lookup_trust_boundary")
def test_lookup_trust_boundary_build_url_returns_none(self, mock_lookup_tb):
creds = CredentialsImpl()
request = mock.Mock()
# Mock _build_trust_boundary_lookup_url to return None
mock_build_url = mock.Mock(return_value=None)
creds._build_trust_boundary_lookup_url = mock_build_url
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
with pytest.raises(
exceptions.InvalidValue,
match="Failed to build trust boundary lookup URL.",
):
creds._lookup_trust_boundary(request)
mock_build_url.assert_called_once() # Ensure _build_trust_boundary_lookup_url was called
mock_lookup_tb.assert_not_called() # Ensure _client.lookup_trust_boundary was not called
@mock.patch("google.auth.credentials._LOGGER")
@mock.patch("google.auth._helpers.is_logging_enabled", return_value=True)
@mock.patch.object(_client, "_lookup_trust_boundary")
def test_refresh_trust_boundary_fails_with_cached_data_and_logging(
self, mock_lookup_tb, mock_is_logging_enabled, mock_logger
):
creds = CredentialsImpl()
creds._trust_boundary = {"encodedLocations": "0xABC"}
request = mock.Mock()
refresh_error = exceptions.RefreshError("Lookup failed")
mock_lookup_tb.side_effect = refresh_error
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
creds.refresh(request)
mock_lookup_tb.assert_called_once()
mock_is_logging_enabled.assert_called_once_with(mock_logger)
mock_logger.debug.assert_called_once_with(
"Using cached trust boundary due to refresh error: %s", refresh_error
)

@@ -25,3 +25,5 @@ # Copyright 2018 Google Inc.

from google.auth import _helpers
from google.auth import credentials as auth_credentials
from google.auth import crypt
from google.auth import environment_vars
from google.auth import exceptions

@@ -131,4 +133,17 @@ from google.auth import impersonated_credentials

LIFETIME = 3600
NO_OP_TRUST_BOUNDARY = {
"locations": auth_credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS,
"encodedLocations": auth_credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS,
}
VALID_TRUST_BOUNDARY = {
"locations": ["us-central1", "us-east1"],
"encodedLocations": "0xVALIDHEX",
}
EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = (
"https://iamcredentials.googleapis.com/v1/projects/-"
"/serviceAccounts/impersonated@project.iam.gserviceaccount.com/allowedLocations"
)
FAKE_UNIVERSE_DOMAIN = "universe.foo"
SOURCE_CREDENTIALS = service_account.Credentials(
SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI
SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI, trust_boundary=NO_OP_TRUST_BOUNDARY
)

@@ -148,2 +163,3 @@ USER_SOURCE_CREDENTIALS = credentials.Credentials(token="ABCDE")

iam_endpoint_override=None,
trust_boundary=None, # Align with Credentials class default
):

@@ -159,2 +175,3 @@

iam_endpoint_override=iam_endpoint_override,
trust_boundary=trust_boundary,
)

@@ -169,3 +186,3 @@

def test_from_impersonated_service_account_info_with_invalid_source_credentials_type(
self
self,
):

@@ -185,3 +202,3 @@ info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO)

def test_from_impersonated_service_account_info_with_invalid_impersonation_url(
self
self,
):

@@ -271,4 +288,8 @@ info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO)

@pytest.mark.parametrize("use_data_bytes", [True, False])
def test_refresh_success(self, use_data_bytes, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)
@mock.patch("google.oauth2._client._lookup_trust_boundary")
def test_refresh_success(
self, mock_lookup_trust_boundary, use_data_bytes, mock_donor_credentials
):
# Start with no boundary.
credentials = self.make_credentials(lifetime=None, trust_boundary=None)
token = "token"

@@ -287,3 +308,8 @@

with mock.patch(
# Mock the trust boundary lookup to return a valid value.
mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
), mock.patch(
"google.auth.metrics.token_request_access_token_impersonate",

@@ -301,2 +327,235 @@ return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,

# Verify that the x-allowed-locations header from the source credential
# was applied. The source credential has a NO_OP boundary, so the
# header should be an empty string.
request_kwargs = request.call_args[1]
assert "headers" in request_kwargs
assert "x-allowed-locations" in request_kwargs["headers"]
assert request_kwargs["headers"]["x-allowed-locations"] == ""
# Verify trust boundary was set.
assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
# Verify the mock was called with the correct URL.
mock_lookup_trust_boundary.assert_called_once_with(
request,
self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
headers={"authorization": "Bearer token"},
)
# Verify x-allowed-locations header is set correctly by apply().
headers_applied = {}
credentials.apply(headers_applied)
assert (
headers_applied["x-allowed-locations"]
== self.VALID_TRUST_BOUNDARY["encodedLocations"]
)
def test_refresh_source_creds_no_trust_boundary(self):
# Use a source credential that does not support trust boundaries.
source_credentials = credentials.Credentials(token="source_token")
creds = self.make_credentials(source_credentials=source_credentials)
token = "impersonated_token"
expire_time = (
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
).isoformat("T") + "Z"
response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
data=json.dumps(response_body), status=http_client.OK
)
creds.refresh(request)
# Verify that the x-allowed-locations header was NOT applied because
# the source credential does not support trust boundaries.
request_kwargs = request.call_args[1]
assert "x-allowed-locations" not in request_kwargs["headers"]
@mock.patch("google.oauth2._client._lookup_trust_boundary")
def test_refresh_trust_boundary_lookup_fails_no_cache(
self, mock_lookup_trust_boundary, mock_donor_credentials
):
# Start with no trust boundary
credentials = self.make_credentials(lifetime=None, trust_boundary=None)
token = "token"
expire_time = (
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
).isoformat("T") + "Z"
response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
data=json.dumps(response_body), status=http_client.OK
)
# Mock the trust boundary lookup to raise an error
mock_lookup_trust_boundary.side_effect = exceptions.RefreshError(
"Lookup failed"
)
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
), pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(request)
assert "Lookup failed" in str(excinfo.value)
assert credentials._trust_boundary is None # Still no trust boundary
mock_lookup_trust_boundary.assert_called_once()
@mock.patch("google.oauth2._client._lookup_trust_boundary")
def test_refresh_fetches_no_op_trust_boundary(
self, mock_lookup_trust_boundary, mock_donor_credentials
):
# Start with no trust boundary
credentials = self.make_credentials(lifetime=None, trust_boundary=None)
token = "token"
expire_time = (
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
).isoformat("T") + "Z"
response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
data=json.dumps(response_body), status=http_client.OK
)
mock_lookup_trust_boundary.return_value = (
self.NO_OP_TRUST_BOUNDARY
) # Mock returns NO_OP
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
), mock.patch(
"google.auth.metrics.token_request_access_token_impersonate",
return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
):
credentials.refresh(request)
assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY
mock_lookup_trust_boundary.assert_called_once_with(
request,
self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE,
headers={"authorization": "Bearer token"},
)
headers_applied = {}
credentials.apply(headers_applied)
assert headers_applied["x-allowed-locations"] == ""
@mock.patch("google.oauth2._client._lookup_trust_boundary")
def test_refresh_skips_trust_boundary_lookup_non_default_universe(
self, mock_lookup_trust_boundary
):
# Create source credentials with a non-default universe domain
source_credentials = service_account.Credentials(
SIGNER,
"some@email.com",
TOKEN_URI,
universe_domain=self.FAKE_UNIVERSE_DOMAIN,
)
# Create impersonated credentials using the non-default source credentials
credentials = self.make_credentials(source_credentials=source_credentials)
# Mock the IAM credentials API call for generateAccessToken
token = "token"
expire_time = (
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
).isoformat("T") + "Z"
response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
data=json.dumps(response_body), status=http_client.OK
)
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
credentials.refresh(request)
# Ensure trust boundary lookup was not called
mock_lookup_trust_boundary.assert_not_called()
# Verify that x-allowed-locations header is not set by apply()
headers_applied = {}
credentials.apply(headers_applied)
assert "x-allowed-locations" not in headers_applied
@mock.patch("google.oauth2._client._lookup_trust_boundary")
def test_refresh_starts_with_no_op_trust_boundary_skips_lookup(
self, mock_lookup_trust_boundary, mock_donor_credentials
):
credentials = self.make_credentials(
lifetime=None, trust_boundary=self.NO_OP_TRUST_BOUNDARY
) # Start with NO_OP
token = "token"
expire_time = (
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
).isoformat("T") + "Z"
response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
data=json.dumps(response_body), status=http_client.OK
)
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
), mock.patch(
"google.auth.metrics.token_request_access_token_impersonate",
return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
):
credentials.refresh(request)
# Verify trust boundary remained NO_OP
assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY
# Lookup should be skipped
mock_lookup_trust_boundary.assert_not_called()
# Verify that an empty header was added.
headers_applied = {}
credentials.apply(headers_applied)
assert headers_applied["x-allowed-locations"] == ""
@mock.patch("google.oauth2._client._lookup_trust_boundary")
def test_refresh_trust_boundary_lookup_fails_with_cached_data2(
self, mock_lookup_trust_boundary, mock_donor_credentials
):
# Start with no trust boundary
credentials = self.make_credentials(lifetime=None, trust_boundary=None)
token = "token"
expire_time = (
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
).isoformat("T") + "Z"
response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
data=json.dumps(response_body), status=http_client.OK
)
# First refresh: Successfully fetch a valid trust boundary.
mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
), mock.patch(
"google.auth.metrics.token_request_access_token_impersonate",
return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
):
credentials.refresh(request)
assert credentials.valid
# Verify trust boundary was set.
assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
mock_lookup_trust_boundary.assert_called_once()
# Second refresh: Mock lookup to fail, but expect cached data to be preserved.
mock_lookup_trust_boundary.reset_mock()
mock_lookup_trust_boundary.side_effect = exceptions.RefreshError(
"Lookup failed"
)
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
credentials.refresh(request)
assert credentials.valid
assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY
mock_lookup_trust_boundary.assert_called_once()
@pytest.mark.parametrize("use_data_bytes", [True, False])

@@ -684,2 +943,33 @@ def test_refresh_with_subject_success(self, use_data_bytes, mock_dwd_credentials):

def test_with_trust_boundary(self):
credentials = self.make_credentials()
new_boundary = {"encodedLocations": "new_boundary"}
new_credentials = credentials.with_trust_boundary(new_boundary)
assert new_credentials is not credentials
assert new_credentials._trust_boundary == new_boundary
# The source credentials should be a copy, not the same object.
# But they should be functionally equivalent.
assert (
new_credentials._source_credentials is not credentials._source_credentials
)
assert (
new_credentials._source_credentials.service_account_email
== credentials._source_credentials.service_account_email
)
assert (
new_credentials._source_credentials._signer
== credentials._source_credentials._signer
)
assert new_credentials._target_principal == credentials._target_principal
def test_build_trust_boundary_lookup_url_no_email(self):
credentials = self.make_credentials(target_principal=None)
with pytest.raises(ValueError) as excinfo:
credentials._build_trust_boundary_lookup_url()
assert "Service account email is required" in str(excinfo.value)
def test_with_scopes_provide_default_scopes(self):

@@ -686,0 +976,0 @@ credentials = self.make_credentials()

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display