
    Pǻi!                        d dl mZ d dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZmZ ddlmZmZ dd	lmZ dd
lmZmZ ddlmZ  G d d      Zy)    )annotationsN)	lru_cache)
SSLContext)Any)	HTTPErrorURLError   )PyJWKPyJWKSet)decode_complete)PyJWKClientConnectionErrorPyJWKClientError)JWKSetCachec                      e Zd Z	 	 	 	 	 	 	 d		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 d
dZddZdddZdddZddZddZe	dd       Z
y)PyJWKClientNc	                    |i }|| _         d| _        || _        || _        || _        |r%|dk  rt        d| d      t        |      | _        nd| _        |r$ t        |      | j                        }	|	| _        yy)u  A client for retrieving signing keys from a JWKS endpoint.

        ``PyJWKClient`` uses a two-tier caching system to avoid unnecessary
        network requests:

        **Tier 1 — JWK Set cache** (enabled by default):
        Caches the entire JSON Web Key Set response from the endpoint.
        Controlled by:

        - ``cache_jwk_set``: Set to ``True`` (the default) to enable this
          cache. When enabled, the JWK Set is fetched from the network only
          when the cache is empty or expired.
        - ``lifespan``: Time in seconds before the cached JWK Set expires.
          Defaults to ``300`` (5 minutes). Must be greater than 0.

        **Tier 2 — Signing key cache** (disabled by default):
        Caches individual signing keys (looked up by ``kid``) using an LRU
        cache with **no time-based expiration**. Keys are evicted only when
        the cache reaches its maximum size. Controlled by:

        - ``cache_keys``: Set to ``True`` to enable this cache.
          Defaults to ``False``.
        - ``max_cached_keys``: Maximum number of signing keys to keep in
          the LRU cache. Defaults to ``16``.

        :param uri: The URL of the JWKS endpoint.
        :type uri: str
        :param cache_keys: Enable the per-key LRU cache (Tier 2).
        :type cache_keys: bool
        :param max_cached_keys: Max entries in the signing key LRU cache.
        :type max_cached_keys: int
        :param cache_jwk_set: Enable the JWK Set response cache (Tier 1).
        :type cache_jwk_set: bool
        :param lifespan: TTL in seconds for the JWK Set cache.
        :type lifespan: float
        :param headers: Optional HTTP headers to include in requests.
        :type headers: dict or None
        :param timeout: HTTP request timeout in seconds.
        :type timeout: float
        :param ssl_context: Optional SSL context for the request.
        :type ssl_context: ssl.SSLContext or None
        Nr   z/Lifespan must be greater than 0, the input is "")maxsize)	urijwk_set_cacheheaderstimeoutssl_contextr   r   r   get_signing_key)
selfr   
cache_keysmax_cached_keyscache_jwk_setlifespanr   r   r   r   s
             M/opt/lhia/marcimex/agent/venv/lib/python3.12/site-packages/jwt/jwks_client.py__init__zPyJWKClient.__init__   s    j ?G15& 1}&EhZqQ  "-X!6D!%D@i@AUAUVO#2D 	     c                n   d}	 t         j                  j                  | j                  | j                        }t         j                  j                  || j                  | j                        5 }t        j                  |      }ddd       || j                  | j                  j                  |       S S # 1 sw Y   3xY w# t        t        f$ r5}t        |t              r|j!                          t#        d| d      |d}~ww xY w# | j                  | j                  j                  |       w w xY w)ae  Fetch the JWK Set from the JWKS endpoint.

        Makes an HTTP request to the configured ``uri`` and returns the
        parsed JSON response. If the JWK Set cache is enabled, the
        response is stored in the cache.

        :returns: The parsed JWK Set as a dictionary.
        :raises PyJWKClientConnectionError: If the HTTP request fails.
        N)urlr   )r   contextz'Fail to fetch data from the url, err: "r   )urllibrequestRequestr   r   urlopenr   r   jsonloadr   putr   TimeoutError
isinstancer   closer   )r   jwk_setrresponsees        r    
fetch_datazPyJWKClient.fetch_data_   s    	0&&488T\\&JA''4<<1A1A (  .))H-. !!-""&&w/ .. . ,' 	!Y'	,9!A>	 !!-""&&w/ .sB   A+C /B7C D
 7C <C D0DDD
 
*D4c                    d}| j                   |s| j                   j                         }|| j                         }t        |t              st        d      t        j                  |      S )aN  Return the JWK Set, using the cache when available.

        :param refresh: Force a fresh fetch from the endpoint, bypassing
            the cache.
        :type refresh: bool
        :returns: The JWK Set.
        :rtype: PyJWKSet
        :raises PyJWKClientError: If the endpoint does not return a JSON
            object.
        Nz.The JWKS endpoint did not return a JSON object)r   getr4   r.   dictr   r   	from_dict)r   refreshdatas      r    get_jwk_setzPyJWKClient.get_jwk_set|   sc     )'%%))+D<??$D$%"#STT!!$''r"   c                    | j                  |      }|j                  D cg c]  }|j                  dv r|j                  r|  }}|st	        d      |S c c}w )a  Return all signing keys from the JWK Set.

        Filters the JWK Set to keys whose ``use`` is ``"sig"`` (or
        unspecified) and that have a ``kid``.

        :param refresh: Force a fresh fetch from the endpoint, bypassing
            the cache.
        :type refresh: bool
        :returns: A list of signing keys.
        :rtype: list[PyJWK]
        :raises PyJWKClientError: If no signing keys are found.
        )sigNz2The JWKS endpoint did not contain any signing keys)r;   keyspublic_key_usekey_idr   )r   r9   r0   jwk_set_keysigning_keyss        r    get_signing_keyszPyJWKClient.get_signing_keys   si     ""7+  '||
))]:{?Q?Q 
 
 "#WXX
s   #Ac                    | j                         }| j                  ||      }|s5| j                  d      }| j                  ||      }|st        d| d      |S )a  Return the signing key matching the given ``kid``.

        If no match is found in the current JWK Set, the set is
        refreshed from the endpoint and the lookup is retried once.

        :param kid: The key ID to look up.
        :type kid: str
        :returns: The matching signing key.
        :rtype: PyJWK
        :raises PyJWKClientError: If no matching key is found after
            refreshing.
        T)r9   z,Unable to find a signing key that matches: "r   )rC   	match_kidr   )r   kidrB   signing_keys       r    r   zPyJWKClient.get_signing_key   sl     ,,.nn\37000>L..s;K&B3%qI  r"   c                j    t        |ddi      }|d   }| j                  |j                  d            S )aG  Return the signing key for a JWT by reading its ``kid`` header.

        Extracts the ``kid`` from the token's unverified header and
        delegates to :meth:`get_signing_key`.

        :param token: The encoded JWT.
        :type token: str or bytes
        :returns: The matching signing key.
        :rtype: PyJWK
        verify_signatureF)optionsheaderrF   )decode_tokenr   r6   )r   token
unverifiedrK   s       r    get_signing_key_from_jwtz$PyJWKClient.get_signing_key_from_jwt   s:     "%2De1LM
H%##FJJu$566r"   c                @    d}| D ]  }|j                   |k(  s|} |S  |S )a7  Find a key in *signing_keys* that matches *kid*.

        :param signing_keys: The list of keys to search.
        :type signing_keys: list[PyJWK]
        :param kid: The key ID to match.
        :type kid: str
        :returns: The matching key, or ``None`` if not found.
        :rtype: PyJWK or None
        N)r@   )rB   rF   rG   keys       r    rE   zPyJWKClient.match_kid   s:      	CzzS !	
 r"   )F   Ti,  N   N)r   strr   boolr   intr   rU   r   floatr   zdict[str, Any] | Noner   rW   r   zSSLContext | None)returnr   )F)r9   rU   rX   r   )r9   rU   rX   list[PyJWK])rF   rT   rX   r
   )rM   zstr | bytesrX   r
   )rB   rY   rF   rT   rX   zPyJWK | None)__name__
__module____qualname__r!   r4   r;   rC   r   rO   staticmethodrE    r"   r    r   r      s     !!")-)-L3L3 L3 	L3
 L3 L3 'L3 L3 'L3\0:(.287  r"   r   )
__future__r   r*   urllib.requestr&   	functoolsr   sslr   typingr   urllib.errorr   r   api_jwkr
   r   api_jwtr   rL   
exceptionsr   r   r   r   r   r^   r"   r    <module>rh      s2    "      , $ 4 D &Y Yr"   