
    1
i!                        d dl mZ d dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZmZ ddlmZmZ dd	lmZ dd
lmZmZ ddlmZ  G d d          ZdS )    )annotationsN)	lru_cache)
SSLContext)Any)	HTTPErrorURLError   )PyJWKPyJWKSet)decode_complete)PyJWKClientConnectionErrorPyJWKClientError)JWKSetCachec                  j    e Zd Z	 	 	 	 	 	 	 d'd(dZd)dZd*d+dZd*d,dZd-d Zd.d#Ze	d/d&            Z
dS )0PyJWKClientF   T,  N   uristr
cache_keysboolmax_cached_keysintcache_jwk_setlifespanfloatheadersdict[str, Any] | Nonetimeoutssl_contextSSLContext | Nonec	                   |i }|| _         d| _        || _        || _        || _        |r.|dk    rt          d| d          t          |          | _        nd| _        |r' t          |          | j                  }	|	| _        dS dS )u  A client for retrieving signing keys from a JWKS endpoint.

        ``PyJWKClient`` uses a two-tier caching system to avoid unnecessary
        network requests:

        **Tier 1 — JWK Set cache** (enabled by default):
        Caches the entire JSON Web Key Set response from the endpoint.
        Controlled by:

        - ``cache_jwk_set``: Set to ``True`` (the default) to enable this
          cache. When enabled, the JWK Set is fetched from the network only
          when the cache is empty or expired.
        - ``lifespan``: Time in seconds before the cached JWK Set expires.
          Defaults to ``300`` (5 minutes). Must be greater than 0.

        **Tier 2 — Signing key cache** (disabled by default):
        Caches individual signing keys (looked up by ``kid``) using an LRU
        cache with **no time-based expiration**. Keys are evicted only when
        the cache reaches its maximum size. Controlled by:

        - ``cache_keys``: Set to ``True`` to enable this cache.
          Defaults to ``False``.
        - ``max_cached_keys``: Maximum number of signing keys to keep in
          the LRU cache. Defaults to ``16``.

        :param uri: The URL of the JWKS endpoint.
        :type uri: str
        :param cache_keys: Enable the per-key LRU cache (Tier 2).
        :type cache_keys: bool
        :param max_cached_keys: Max entries in the signing key LRU cache.
        :type max_cached_keys: int
        :param cache_jwk_set: Enable the JWK Set response cache (Tier 1).
        :type cache_jwk_set: bool
        :param lifespan: TTL in seconds for the JWK Set cache.
        :type lifespan: float
        :param headers: Optional HTTP headers to include in requests.
        :type headers: dict or None
        :param timeout: HTTP request timeout in seconds.
        :type timeout: float
        :param ssl_context: Optional SSL context for the request.
        :type ssl_context: ssl.SSLContext or None
        Nr   z/Lifespan must be greater than 0, the input is "")maxsize)	r   jwk_set_cacher   r    r!   r   r   r   get_signing_key)
selfr   r   r   r   r   r   r    r!   r'   s
             C:\Users\Dell Inspiron 16\Desktop\tws\AgrotaPowerBi\back-agrota-powerbi\mcp-client-agrota\venv\Lib\site-packages\jwt/jwks_client.py__init__zPyJWKClient.__init__   s    j ?G15& 		& 1}}&QhQQQ   "-X!6!6D!%D 	3@i@@@AUVVO#2D   		3 	3    returnr   c                P   d}	 t           j                            | j        | j                  }t           j                            || j        | j                  5 }t          j	        |          }ddd           n# 1 swxY w Y   || j
        | j
                            |           S S # t          t          f$ rB}t          |t                    r|                                 t#          d| d          |d}~ww xY w# | j
        | j
                            |           w w xY w)ae  Fetch the JWK Set from the JWKS endpoint.

        Makes an HTTP request to the configured ``uri`` and returns the
        parsed JSON response. If the JWK Set cache is enabled, the
        response is stored in the cache.

        :returns: The parsed JWK Set as a dictionary.
        :raises PyJWKClientConnectionError: If the HTTP request fails.
        N)urlr   )r    contextz'Fail to fetch data from the url, err: "r$   )urllibrequestRequestr   r   urlopenr    r!   jsonloadr&   putr   TimeoutError
isinstancer   closer   )r(   jwk_setrresponsees        r)   
fetch_datazPyJWKClient.fetch_data_   sz    	0&&48T\&JJA''4<1A (   .)H--. . . . . . . . . . . . . . . !-"&&w//// . ,' 	 	 	!Y'' 			,>!>>> 	 !-"&&w//// .sN   AB+ A<0B+ <B  B+ B B+ D +C><=C99C>>D $D%refreshr   c                    d}| j         |s| j                                         }||                                 }t          |t                    st          d          t          j        |          S )aN  Return the JWK Set, using the cache when available.

        :param refresh: Force a fresh fetch from the endpoint, bypassing
            the cache.
        :type refresh: bool
        :returns: The JWK Set.
        :rtype: PyJWKSet
        :raises PyJWKClientError: If the endpoint does not return a JSON
            object.
        Nz.The JWKS endpoint did not return a JSON object)r&   getr>   r8   dictr   r   	from_dict)r(   r?   datas      r)   get_jwk_setzPyJWKClient.get_jwk_set|   sr     )')%))++D<??$$D$%% 	U"#STTT!$'''r+   list[PyJWK]c                t    |                      |          }d |j        D             }|st          d          |S )a  Return all signing keys from the JWK Set.

        Filters the JWK Set to keys whose ``use`` is ``"sig"`` (or
        unspecified) and that have a ``kid``.

        :param refresh: Force a fresh fetch from the endpoint, bypassing
            the cache.
        :type refresh: bool
        :returns: A list of signing keys.
        :rtype: list[PyJWK]
        :raises PyJWKClientError: If no signing keys are found.
        c                2    g | ]}|j         d v |j        |S ))sigN)public_key_usekey_id).0jwk_set_keys     r)   
<listcomp>z0PyJWKClient.get_signing_keys.<locals>.<listcomp>   s8     
 
 
)]::{?Q: :::r+   z2The JWKS endpoint did not contain any signing keys)rE   keysr   )r(   r?   r:   signing_keyss       r)   get_signing_keyszPyJWKClient.get_signing_keys   sW     ""7++
 
&|
 
 
  	Y"#WXXXr+   kidr
   c                    |                                  }|                     ||          }|sA|                      d          }|                     ||          }|st          d| d          |S )a  Return the signing key matching the given ``kid``.

        If no match is found in the current JWK Set, the set is
        refreshed from the endpoint and the lookup is retried once.

        :param kid: The key ID to look up.
        :type kid: str
        :returns: The matching signing key.
        :rtype: PyJWK
        :raises PyJWKClientError: If no matching key is found after
            refreshing.
        T)r?   z,Unable to find a signing key that matches: "r$   )rQ   	match_kidr   )r(   rR   rP   signing_keys       r)   r'   zPyJWKClient.get_signing_key   s     ,,..nn\377 	000>>L..s;;K &I3III   r+   tokenstr | bytesc                    t          |ddi          }|d         }|                     |                    d                    S )aG  Return the signing key for a JWT by reading its ``kid`` header.

        Extracts the ``kid`` from the token's unverified header and
        delegates to :meth:`get_signing_key`.

        :param token: The encoded JWT.
        :type token: str or bytes
        :returns: The matching signing key.
        :rtype: PyJWK
        verify_signatureF)optionsheaderrR   )decode_tokenr'   rA   )r(   rV   
unverifiedr[   s       r)   get_signing_key_from_jwtz$PyJWKClient.get_signing_key_from_jwt   sF     "%2De1LMMM
H%##FJJu$5$5666r+   rP   PyJWK | Nonec                2    d}| D ]}|j         |k    r|} n|S )a7  Find a key in *signing_keys* that matches *kid*.

        :param signing_keys: The list of keys to search.
        :type signing_keys: list[PyJWK]
        :param kid: The key ID to match.
        :type kid: str
        :returns: The matching key, or ``None`` if not found.
        :rtype: PyJWK or None
        N)rK   )rP   rR   rU   keys       r)   rT   zPyJWKClient.match_kid   s<      	 	CzS  ! ! r+   )Fr   Tr   Nr   N)r   r   r   r   r   r   r   r   r   r   r   r   r    r   r!   r"   )r,   r   )F)r?   r   r,   r   )r?   r   r,   rF   )rR   r   r,   r
   )rV   rW   r,   r
   )rP   rF   rR   r   r,   r_   )__name__
__module____qualname__r*   r>   rE   rQ   r'   r^   staticmethodrT    r+   r)   r   r      s         !!")-)-L3 L3 L3 L3 L3\0 0 0 0:( ( ( ( (.    2   87 7 7 7    \  r+   r   )
__future__r   r4   urllib.requestr0   	functoolsr   sslr   typingr   urllib.errorr   r   api_jwkr
   r   api_jwtr   r\   
exceptionsr   r   r&   r   r   rf   r+   r)   <module>rp      s
   " " " " " "                        , , , , , , , , $ $ $ $ $ $ $ $ 4 4 4 4 4 4 D D D D D D D D & & & & & &Y Y Y Y Y Y Y Y Y Yr+   