
    OǻiT                        d dl Z d dlZd dlmZmZmZmZmZmZ d dl	m
Z
 d dlmZ d dlmZmZmZmZ d dlmZmZmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlm Z  d dl!m"Z"m#Z# d dl$m%Z% d dl$m&Z' d dl(m)Z)m*Z*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0m1Z1 d dl2m3Z3  ejh                  e5      Z6e3 G d de#e"             Z7de%fdZ8 G d de#e"      Z9 G d d      Z:y)    N)Any	AwaitableCallableListOptionalUnion)PubSubHandler)DefaultCommandExecutor)DEFAULT_GRACE_PERIODDatabaseConfigInitialHealthCheckMultiDbConfig)AsyncDatabaseDatabase	Databases)AsyncFailureDetector)HealthCheckHealthCheckPolicy)Retry)BackgroundScheduler)	NoBackoff)AsyncCoreCommandsAsyncRedisModuleCommands)CircuitBreaker)State)InitialHealthCheckFailedErrorNoValidDatabaseExceptionUnhealthyDatabaseException)GeoFailoverReason)ChannelT
EncodableTKeyT)experimentalc                   J   e Zd ZdZdefdZd,dZd Zd Zde	fdZ
d	edd
fdZ	 d-dedefdZdedefdZd	efdZd	edefdZdefdZdefdZd Zd Zd
dd
ddedgeeee   f   f   dedee    d ed!ee   f
d"Z!d# Z"de#e$ef   fd$Z%d% Z&d	edefd&Z'd'e(d(e)d)e)fd*Z*d+ Z+y
).MultiDBClientz
    Client that operates on multiple logical Redis databases.
    Should be used in Client-side geographic failover database setups.
    configc           
         |j                         | _        |j                  s|j                         n|j                  | _        |j
                  | _        |j                  j                  |j                  |j                        | _        |j                  s|j                         n|j                  | _        |j                  |j!                         n|j                  | _        | j"                  j%                  | j                         |j&                  | _        |j*                  | _        |j.                  | _        | j0                  j3                  t4        g       t7        | j                  | j                  | j0                  | j"                  |j8                  |j:                  | j,                  | j(                        | _        d| _        tA        jB                         | _"        tG               | _$        || _%        d | _&        g | _'        d | _(        y )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF))r)   
_databaseshealth_checksdefault_health_checks_health_checkshealth_check_interval_health_check_intervalhealth_check_policyvaluehealth_check_probeshealth_check_delay_health_check_policyr(   default_failure_detectors_failure_detectorsr+   default_failover_strategy_failover_strategyset_databasesr/   _auto_fallback_intervalr.   _event_dispatcherr*   _command_retryupdate_supported_errorsConnectionRefusedErrorr
   r,   r-   command_executorinitializedasyncioLock_hc_lockr   _bg_scheduler_config_recurring_hc_task	_hc_tasks_half_open_state_task)selfr&   s     Z/opt/lhia/marcimex/agent/venv/lib/python3.12/site-packages/redis/asyncio/multidb/client.py__init__zMultiDBClient.__init__)   s    **, '' ((*%% 	 '-&B&B#7=7Q7Q7W7W&&(A(A8
!
 ++ ,,.)) 	 ''/ ,,.)) 	
 	--doo>'-'D'D$!'!8!8$22335K4LM 6"55oo--"55$66!00!33#'#?#?	!
 !02"&%)"    returnc                 Z   K   | j                   s| j                          d {    | S 7 wN)rF   
initializerO   s    rP   
__aenter__zMultiDBClient.__aenter__W   s)     //### $s    +)+c                    K   | j                   r| j                   j                          | j                  r| j                  j                          | j                  D ]  }|j                           y wrU   )rL   cancelrN   rM   )rO   exc_type	exc_value	tracebackhc_tasks        rP   	__aexit__zMultiDBClient.__aexit__\   sY     ""##**,%%&&--/~~ 	GNN	s   A/A1c                   K   | j                          d{    t        j                  | j                  j	                  | j
                  | j                              | _        d}| j                  D ]h  \  }}|j                  j                  | j                         |j                  j                  t        j                  k(  sS|rV|| j                  _        d}j |st#        d      d| _        y7 ڭw)zT
        Perform initialization of databases to define their initial state.
        NFTz4Initial connection failed - no active database found)_perform_initial_health_checkrG   create_taskrJ   run_recurring_asyncr5   _check_databases_healthrL   r0   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDrE   _active_databaser   rF   )rO   is_active_db_founddatabaseweights       rP   rV   zMultiDBClient.initialized   s      00222 #*"5"522++,,#
 # $ 		*Hf--d.T.TU %%7@R :B%%6%)"		* "*F   9 	3s   C3C1B,C3C3+C3c                     | j                   S )zE
        Returns a sorted (by weight) list of all databases.
        )r0   rW   s    rP   get_databaseszMultiDBClient.get_databases   s     rR   rm   Nc                   K   d}| j                   D ]  \  }}||k(  sd} n |st        d      | j                  |       d{    |j                  j                  t
        j                  k(  rT| j                   j                  d      d   \  }}| j                  j                  |t        j                         d{    yt        d      7 7 w)zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r0   
ValueError_check_db_healthre   rh   ri   rj   	get_top_nrE   set_active_databaser   MANUALr   )rO   rm   existsexisting_db_highest_weighted_dbs         rP   rw   z!MultiDBClient.set_active_database   s      "oo 	NKh&	
 NOO##H---!!W^^3%)__%>%>q%A!%D"'';;+22   &?
 	
 	.s)   C&CCA9C=C>CCskip_initial_health_checkc                   K   |j                   j                  dt        dt                     i       |j                  r< | j
                  j                  j                  |j                  fi |j                   }n|j                  r_|j                  j                  t        dt                            | j
                  j                  j                  |j                        }n& | j
                  j                  di |j                   }|j                  |j                         n|j                  }t        |||j                  |j                        }	 | j                  |       d{    | j                   j#                  d      d   \  }}| j                   j%                  ||j                         | j'                  ||       d{    y7 f# t        $ r |s Y rw xY w7 w)	z
        Adds a new database to the database list.

        Args:
            config: DatabaseConfig object that contains the database configuration.
            skip_initial_health_check: If True, adds the database even if it is unhealthy.
        retryr   )retriesbackoff)connection_poolN)clientre   rn   health_check_urlrs    )client_kwargsupdater   r   from_urlrK   client_class	from_pool	set_retryre   default_circuit_breakerr   rn   r   ru   r   r0   rv   add_change_active_database)rO   r&   r}   r   re   rm   r|   highest_weights           rP   add_databasezMultiDBClient.add_database   s     	##WeAy{.S$TU??7T\\..77#)#7#7F &&uQ	'LM\\..88 & 0 0 9 F /T\\..F1E1EFF ~~% **, 	 ==#44	
	''111
 /3oo.G.G.J1.M+^Hhoo6**85HIII 2) 	, -	 	JsI   EG)G ,G-G 1AG)G'G)G G$!G)#G$$G)new_databasehighest_weight_databasec                    K   |j                   |j                   kD  r[|j                  j                  t        j                  k(  r3| j
                  j                  |t        j                         d {    y y y 7 wrU   )	rn   re   rh   ri   rj   rE   rw   r   	AUTOMATIC)rO   r   r   s      rP   r   z%MultiDBClient._change_active_database   so      "9"@"@@$$**gnn<'';;/99   = As   A.A:0A81A:c                 H  K   | j                   j                  |      }| j                   j                  d      d   \  }}||k  r[|j                  j                  t
        j                  k(  r3| j                  j                  |t        j                         d{    yyy7 w)z<
        Removes a database from the database list.
        rs   r   N)r0   removerv   re   rh   ri   rj   rE   rw   r   rx   )rO   rm   rn   r|   r   s        rP   remove_databasezMultiDBClient.remove_database   s      ''1.2oo.G.G.J1.M+^ f$#++11W^^C'';;#%6%=%=   D %s   BB"B B"rn   c                 $  K   d}| j                   D ]  \  }}||k(  sd} n |st        d      | j                   j                  d      d   \  }}| j                   j                  ||       ||_        | j                  ||       d{    y7 w)z<
        Updates a database from the database list.
        NTrr   rs   r   )r0   rt   rv   update_weightrn   r   )rO   rm   rn   ry   rz   r{   r|   r   s           rP   update_database_weightz$MultiDBClient.update_database_weight   s      "oo 	NKh&	
 NOO.2oo.G.G.J1.M+^%%h7 **85HIIIs   BA+BB	Bfailure_detectorc                 :    | j                   j                  |       y)z>
        Adds a new failure detector to the database.
        N)r<   append)rO   r   s     rP   add_failure_detectorz"MultiDBClient.add_failure_detector  s     	&&'78rR   healthcheckc                    K   | j                   4 d{    | j                  j                  |       ddd      d{    y7 07 # 1 d{  7  sw Y   yxY ww)z:
        Adds a new health check to the database.
        N)rI   r3   r   )rO   r   s     rP   add_health_checkzMultiDBClient.add_health_check  sR      == 	4 	4&&{3	4 	4 	4 	4 	4 	4 	4sA   AAAAAAAAAAAAc                    K   | j                   s| j                          d{     | j                  j                  |i | d{   S 7 (7 w)zB
        Executes a single command and return its result.
        N)rF   rV   rE   execute_commandrO   argsoptionss      rP   r   zMultiDBClient.execute_command  sK      //###:T**::DLGLLL $Ls!    AA#AAAAc                     t        |       S )z:
        Enters into pipeline mode of the client.
        )PipelinerW   s    rP   pipelinezMultiDBClient.pipeline  s     ~rR   F
shard_hintvalue_from_callablewatch_delayfuncr   watchesr   r   r   c                   K   | j                   s| j                          d{     | j                  j                  |g||||d d{   S 7 .7 w)z3
        Executes callable as transaction.
        Nr   )rF   rV   rE   execute_transaction)rO   r   r   r   r   r   s         rP   transactionzMultiDBClient.transaction#  sg      //###>T**>>

 " 3#
 
 	
 $
s!    AA)AAAAc                 n   K   | j                   s| j                          d{    t        | fi |S 7 w)z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        N)rF   rV   PubSub)rO   kwargss     rP   pubsubzMultiDBClient.pubsub9  s6      //###d%f%% $s    535c                 >  K   	 i }g | _         | j                  D ]I  \  }}t        j                  | j	                  |            }|||<   | j                   j                  |       K t        j                  t        j                  | j                   ddi| j                         d{   }t        | j                   |      D ci c]  \  }}||   | }}}|j                         D ]}  \  }}t        |t              rR|j                  }t        j                   |j"                  _        t&        j)                  d|j*                         d||<   ht        |t,              syd||<    |S 7 # t        j                  $ r t        j                  d      w xY wc c}}w w)	zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        return_exceptionsT)timeoutNz4Health check execution exceeds health_check_intervalz%Health check failed, due to exception)exc_infoF)rM   r0   rG   rb   ru   r   wait_forgatherr5   TimeoutErrorzipitems
isinstancer   rm   ri   OPENre   rh   loggerdebugoriginal_exception	Exception)	rO   
task_to_dbrm   r{   taskresultsresult
db_resultsunhealthy_dbs	            rP   rd   z%MultiDBClient._check_databases_healthD  s    
	79JDN# ,!**4+@+@+JK#+
4 %%d+,
 $,,G$G33 G :=T^^W9U
)5vJtf$

 
 !+ 0 0 2 	-Hf&"<=%-4\\$$*;#66  
 ,1
<(FI.',
8$	- 9 ## 	&&F 	
sB   FB"E+ &E)'E+ +FFBF 	F)E+ +)F	Fc                   K   | j                          d{   }d}| j                  j                  t        j                  k(  rd|j                         v}n| j                  j                  t        j                  k(  r)t        |j                               t        |      dz  kD  }n9| j                  j                  t        j                  k(  rd|j                         v }|s"t        d| j                  j                         y7 w)zj
        Runs initial health check and evaluate healthiness based on initial_health_check_policy.
        NTF   z:Initial health check failed. Initial health check policy: )rd   rK   initial_health_check_policyr   ALL_AVAILABLEvaluesMAJORITY_AVAILABLEsumlenONE_AVAILABLEr   )rO   r   
is_healthys      rP   ra   z+MultiDBClient._perform_initial_health_checkp  s      4466
<<337I7W7WWgnn&66JLL44!445 W^^-.W1AAJLL448J8X8XX!11J/LT\\MuMuLvw   7s   DDC/Dc                   K   | j                   j                  | j                  |       d{   }|sH|j                  j                  t
        j                  k7  rt
        j                  |j                  _        |S |rF|j                  j                  t
        j                  k7  rt
        j                  |j                  _        |S 7 w)zO
        Runs health checks on the given database until first failure.
        N)r:   executer3   re   rh   ri   r   rj   )rO   rm   r   s      rP   ru   zMultiDBClient._check_db_health  s     
  44<<
 

 %%5)0  &H,,22gnnD%,^^H"
s   *CCBCre   	old_state	new_statec                    t        j                         }|t        j                  k(  r4t        j                  | j                  |j                              | _        y |t        j                  k(  rQ|t        j                  k(  r>t        j                  d|j                   d       |j                  t        t        |       |t        j                  k7  r8|t        j                  k(  r$t        j                  d|j                   d       y y y )Nz	Database z- is unreachable. Failover has been initiated.z is reachable again.)rG   get_running_loopri   	HALF_OPENrb   ru   rm   rN   rj   r   r   warning
call_laterr   _half_open_circuitinfo)rO   re   r   r   loops        rP   rg   z/MultiDBClient._on_circuit_state_change_callback  s     '')))))0)<)<%%g&6&67*D& &9+DNNG,,--Z[ OO02DgN&9+FKK)G$4$4#55IJK ,G&rR   c                    K   | j                   j                  r7| j                   j                  j                  j                          d {    y y 7 wrU   )rE   active_databaser   acloserW   s    rP   r   zMultiDBClient.aclose  sA       00''77>>EEGGG 1Gs   AA
AA)rO   r%   rS   r%   )T),__name__
__module____qualname____doc__r   rQ   rX   r_   rV   r   rp   r   rw   r   boolr   r   r   floatr   r   r   r   r   r   r   r   r   r   r   r"   r   strr   r   dictr   rd   ra   ru   r   ri   rg   r   r   rR   rP   r%   r%   "   sz   
,*} ,*\
" Hy 
- 
D 
8 IM/J$/JAE/Jb	)	DQ	m J] JE J&95I 94+ 4M %)$)'+

|U3	#+>%??@
 
 SM	

 "
 e_
,	&*tHdN/C *X0}  $L%L29LFML(HrR   r%   re   c                 .    t         j                  | _        y rU   )ri   r   rh   )re   s    rP   r   r     s    %%GMrR   c                   ~    e Zd ZdZdefdZddZd Zd Zd Z	de
fd	Zdefd
ZddZddZddZd Zdee   fdZy)r   zG
    Pipeline implementation for multiple logical Redis databases.
    r   c                      g | _         || _        y rU   )_command_stack_client)rO   r   s     rP   rQ   zPipeline.__init__  s     rR   rS   c                    K   | S wrU   r   rW   s    rP   rX   zPipeline.__aenter__          c                    K   | j                          d {    | j                  j                  |||       d {    y 7 *7 wrU   )resetr   r_   rO   r[   r\   r]   s       rP   r_   zPipeline.__aexit__  s:     jjlll$$Xy)DDD 	Ds   AA$AAAAc                 >    | j                         j                         S rU   )_async_self	__await__rW   s    rP   r   zPipeline.__await__  s    !++--rR   c                    K   | S wrU   r   rW   s    rP   r   zPipeline._async_self  r   r   c                 ,    t        | j                        S rU   )r   r   rW   s    rP   __len__zPipeline.__len__  s    4&&''rR   c                      y)z1Pipeline instances should always evaluate to TrueTr   rW   s    rP   __bool__zPipeline.__bool__  s    rR   Nc                    K   g | _         y wrU   )r   rW   s    rP   r   zPipeline.reset  s      s   	c                 @   K   | j                          d{    y7 w)zClose the pipelineN)r   rW   s    rP   r   zPipeline.aclose  s     jjl   c                 @    | j                   j                  ||f       | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   r   r   s      rP   pipeline_execute_commandz!Pipeline.pipeline_execute_command  s!     	""D'?3rR   c                 &     | j                   |i |S )zAdds a command to the stack)r  rO   r   r   s      rP   r   zPipeline.execute_command  s    ,t,,d=f==rR   c                 t  K   | j                   j                  s"| j                   j                          d{    	 | j                   j                  j	                  t        | j                               d{   | j                          d{    S 7 ]7 7 	# | j                          d{  7   w xY ww)z0Execute all the commands in the current pipelineN)r   rF   rV   rE   execute_pipelinetupler   r   rW   s    rP   r   zPipeline.execute  s     ||'',,))+++	66GGd))*  **, , $**,sV   4B8BB8;B 7B8B ;B8BB8B B8B5.B1/B55B8)rO   r   rS   r   rS   N)rS   r   )r   r   r   r   r%   rQ   rX   r_   r   r   intr   r   r   r   r   r  r   r   r   r   r   rR   rP   r   r     sd    } E.( ($ !>
tCy 
rR   r   c                       e Zd ZdZdefdZddZddZd Ze	de
fd	       Zd
efdZd
edefdZd
efdZd
edefdZd Z	 dde
dee   fdZddddeddfdZy)r   z2
    PubSub object for multi database client.
    r   c                 ^    || _          | j                   j                  j                  di | y)zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        Nr   )r   rE   r   )rO   r   r   s      rP   rQ   zPubSub.__init__   s(     ,%%,,6v6rR   rS   c                    K   | S wrU   r   rW   s    rP   rX   zPubSub.__aenter__  r   r   Nc                 @   K   | j                          d {    y 7 wrU   )r   r   s       rP   r_   zPubSub.__aexit__  s     kkmr  c                 h   K   | j                   j                  j                  d       d {   S 7 w)Nr   r   rE   execute_pubsub_methodrW   s    rP   r   zPubSub.aclose  s'     \\22HHRRRRs   )202c                 V    | j                   j                  j                  j                  S rU   )r   rE   active_pubsub
subscribedrW   s    rP   r  zPubSub.subscribed  s    ||,,::EEErR   r   c                 l   K    | j                   j                  j                  dg|  d {   S 7 w)Nr   r  rO   r   s     rP   r   zPubSub.execute_command  s:     HT\\22HH
 $
 
 	
 
   +424r   c                 r   K    | j                   j                  j                  dg|i | d{   S 7 w)aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscribeNr  r  s      rP   r  zPubSub.psubscribe  sE      IT\\22HH

#)
 
 	
 
   .757c                 l   K    | j                   j                  j                  dg|  d{   S 7 w)zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscribeNr  r  s     rP   r  zPubSub.punsubscribe)  s=     
 IT\\22HH
!
 
 	
 
r  c                 r   K    | j                   j                  j                  dg|i | d{   S 7 w)aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscribeNr  r  s      rP   r  zPubSub.subscribe2  sE      IT\\22HH

"(
 
 	
 
r  c                 l   K    | j                   j                  j                  dg|  d{   S 7 w)zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscribeNr  r  s     rP   r!  zPubSub.unsubscribe>  s=     
 IT\\22HH
 
 
 	
 
r  ignore_subscribe_messagesr   c                 n   K   | j                   j                  j                  d||       d{   S 7 w)a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number or None to wait indefinitely.
        get_message)r"  r   Nr  )rO   r"  r   s      rP   r$  zPubSub.get_messageG  s>      \\22HH&? I 
 
 	
 
   ,535g      ?)exception_handlerpoll_timeoutr'  c                n   K   | j                   j                  j                  |||        d{   S 7 w)a  Process pub/sub messages using registered callbacks.

        This is the equivalent of :py:meth:`redis.PubSub.run_in_thread` in
        redis-py, but it is a coroutine. To launch it as a separate task, use
        ``asyncio.create_task``:

            >>> task = asyncio.create_task(pubsub.run())

        To shut it down, use asyncio cancellation:

            >>> task.cancel()
            >>> await task
        )
sleep_timer&  r   N)r   rE   execute_pubsub_run)rO   r&  r'  s      rP   runz
PubSub.runW  s>     & \\22EE#7HQU F 
 
 	
 
r%  )rS   r   r
  )Fg        )r   r   r   r   r%   rQ   rX   r_   r   propertyr   r  r!   r   r    r	   r  r  r   r  r!  r   r   r$  r+  r   rR   rP   r   r     s    	7} 	7S FD F F
: 



h 

- 


 


X 

 


 SV
)-
@H
& !	
 	

 

rR   r   );rG   loggingtypingr   r   r   r   r   r   redis.asyncio.clientr	   &redis.asyncio.multidb.command_executorr
   redis.asyncio.multidb.configr   r   r   r   redis.asyncio.multidb.databaser   r   r   &redis.asyncio.multidb.failure_detectorr   !redis.asyncio.multidb.healthcheckr   r   redis.asyncio.retryr   redis.backgroundr   redis.backoffr   redis.commandsr   r   redis.multidb.circuitr   r   ri   redis.multidb.exceptionr   r   r   redis.observability.attributesr   redis.typingr    r!   r"   redis.utilsr#   	getLoggerr   r   r%   r   r   r   r   rR   rP   <module>r?     s      B B . I  N M G L % 0 # F 0 2 
 = 3 3 $			8	$ MH,.? MH MH`& &A'): AHq
 q
rR   