
    OǻiU                        d dl Z d dlZd dlmZ d dlmZ d dlmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZmZmZm Z  d dl!m"Z"m#Z#m$Z$ d dl%m&Z&m'Z'm(Z( d dl)m*Z* d dl+m,Z,m-Z- d dl.m/Z/ d dl0m1Z1 d dl2m3Z3  e jh                  e5      Z6e3 G d dee             Z7defdZ8 G d dee      Z9 G d d      Z:y)    N)as_completed)ThreadPoolExecutor)AnyCallableListOptional)BackgroundScheduler)	NoBackoff)PubSubWorkerThread)CoreCommandsRedisModuleCommands)MaintNotificationsConfig)CircuitBreaker)State)DefaultCommandExecutor)DEFAULT_GRACE_PERIODDatabaseConfigInitialHealthCheckMultiDbConfig)Database	DatabasesSyncDatabase)InitialHealthCheckFailedErrorNoValidDatabaseExceptionUnhealthyDatabaseException)FailureDetector)HealthCheckHealthCheckPolicy)GeoFailoverReason)Retry)experimentalc                      e Zd ZdZdefdZd ZdefdZde	ddfd	Z
	 d$ded
efdZde	de	fdZdefdZde	defdZdefdZdefdZd Zd Zdedgdf   fdZd Zde	defdZdeeef   fdZd Zde d e!d!e!fd"Z"d# Z#y)%MultiDBClientz
    Client that operates on multiple logical Redis databases.
    Should be used in Client-side geographic failover database setups.
    configc           
         |j                         | _        |j                  s|j                         n|j                  | _        |j
                  | _        |j                  j                  |j                  |j                        | _        |j                  s|j                         n|j                  | _        |j                  |j!                         n|j                  | _        | j"                  j%                  | j                         |j&                  | _        |j*                  | _        |j.                  | _        | j0                  j3                  t4        f       t7        | j                  | j                  | j0                  | j"                  |j8                  |j:                  | j,                  | j(                        | _        d| _        tA        jB                         | _"        tG               | _$        || _%        y )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF)&r'   
_databaseshealth_checksdefault_health_checks_health_checkshealth_check_interval_health_check_intervalhealth_check_policyvaluehealth_check_probeshealth_check_probes_delay_health_check_policyr&   default_failure_detectors_failure_detectorsr)   default_failover_strategy_failover_strategyset_databasesr-   _auto_fallback_intervalr,   _event_dispatcherr(   _command_retryupdate_supported_errorsConnectionRefusedErrorr   r*   r+   command_executorinitialized	threadingRLock_hc_lockr	   _bg_scheduler_config)selfr$   s     R/opt/lhia/marcimex/agent/venv/lib/python3.12/site-packages/redis/multidb/client.py__init__zMultiDBClient.__init__+   s    **, '' ((*%% 	
 '-&B&B#7=7Q7Q7W7W&&(H(H8
!
 ++ ,,.)) 	 ''/ ,,.)) 	
 	--doo>'-'D'D$!'!8!8$22335K4MN 6"55oo--"55$66!00!33#'#?#?	!
 !!)02    c                    | j                          | j                  j                  | j                  | j                         d}| j
                  D ]h  \  }}|j                  j                  | j                         |j                  j                  t        j                  k(  sS|rV|| j                  _        d}j |st        d      d| _        y)zT
        Perform initialization of databases to define their initial state.
        FTz4Initial connection failed - no active database foundN)_perform_initial_health_checkrH   run_recurringr3   _check_databases_healthr.   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDrC   _active_databaser   rD   )rJ   is_active_db_founddatabaseweights       rK   
initializezMultiDBClient.initializeU   s     	**, 	((''((	

 # $ 		*Hf--d.T.TU %%7@R :B%%6%)"		* "*F   rM   returnc                     | j                   S )zE
        Returns a sorted (by weight) list of all databases.
        )r.   rJ   s    rK   get_databaseszMultiDBClient.get_databasesw   s     rM   rZ   Nc                 f   d}| j                   D ]  \  }}||k(  sd} n |st        d      | j                  |       |j                  j                  t
        j                  k(  rC| j                   j                  d      d   \  }}|t        j                  f| j                  _        yt        d      )zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r.   
ValueError_check_db_healthrR   rU   rV   rW   	get_top_nr   MANUALrC   active_databaser   )rJ   rZ   existsexisting_db_highest_weighted_dbs         rK   set_active_databasez!MultiDBClient.set_active_database}   s     "oo 	NKh&	
 NOOh'!!W^^3%)__%>%>q%A!%D"!((5D!!1 &?
 	
rM   skip_initial_health_checkc                    t        dt                     |j                  d<   d|j                  vrt        d      |j                  d<   |j                  r< | j
                  j                  j                  |j                  fi |j                  }n|j                  r_|j                  j                  t        dt                            | j
                  j                  j                  |j                        }n& | j
                  j                  di |j                  }|j                  |j                         n|j                  }t        |||j                  |j                  	      }	 | j                  |       | j                   j#                  d
      d   \  }}| j                   j%                  ||j                         | j'                  ||       y# t        $ r |s Y hw xY w)z
        Adds a new database to the database list.

        Args:
            config: DatabaseConfig object that contains the database configuration.
            skip_initial_health_check: If True, adds the database even if it is unhealthy.
        r   )retriesbackoffretrymaint_notifications_configF)enabled)connection_poolN)clientrR   r[   health_check_urlrc    )r    r
   client_kwargsr   from_urlrI   client_class	from_pool	set_retryrR   default_circuit_breakerr   r[   rw   re   r   r.   rf   add_change_active_database)rJ   r$   rn   rv   rR   rZ   rl   highest_weights           rK   add_databasezMultiDBClient.add_database   s    ).a(MW% (v/C/CC(7   !=> ??7T\\..77#)#7#7F &&uQ	'LM\\..88 & 0 0 9 F /T\\..F1E1EFF ~~% **, 	 ==#44	
	!!(+
 /3oo.G.G.J1.M+^Hhoo6$$X/BC * 	, -	s   /G G)(G)new_databasehighest_weight_databasec                     |j                   |j                   kD  rJ|j                  j                  t        j                  k(  r"|t
        j                  f| j                  _        y y y N)	r[   rR   rU   rV   rW   r   	AUTOMATICrC   rh   )rJ   r   r   s      rK   r   z%MultiDBClient._change_active_database   sZ     "9"@"@@$$**gnn< !++5D!!1 = ArM   c                    | j                   j                  |      }| j                   j                  d      d   \  }}||k  rJ|j                  j                  t
        j                  k(  r"|t        j                  f| j                  _
        yyy)z<
        Removes a database from the database list.
        rc   r   N)r.   removerf   rR   rU   rV   rW   r   rg   rC   rh   )rJ   rZ   r[   rl   r   s        rK   remove_databasezMultiDBClient.remove_database   s     ''1.2oo.G.G.J1.M+^ f$#++11W^^C $!((5D!!1 D %rM   r[   c                    d}| j                   D ]  \  }}||k(  sd} n |st        d      | j                   j                  d      d   \  }}| j                   j                  ||       ||_        | j                  ||       y)z<
        Updates a database from the database list.
        NTrb   rc   r   )r.   rd   rf   update_weightr[   r   )rJ   rZ   r[   ri   rj   rk   rl   r   s           rK   update_database_weightz$MultiDBClient.update_database_weight   s     "oo 	NKh&	
 NOO.2oo.G.G.J1.M+^%%h7 $$X/BCrM   failure_detectorc                 :    | j                   j                  |       y)z>
        Adds a new failure detector to the database.
        N)r:   append)rJ   r   s     rK   add_failure_detectorz"MultiDBClient.add_failure_detector   s     	&&'78rM   healthcheckc                 |    | j                   5  | j                  j                  |       ddd       y# 1 sw Y   yxY w)z:
        Adds a new health check to the database.
        N)rG   r1   r   )rJ   r   s     rK   add_health_checkzMultiDBClient.add_health_check  s4     ]] 	4&&{3	4 	4 	4s   2;c                 r    | j                   s| j                           | j                  j                  |i |S )zB
        Executes a single command and return its result.
        )rD   r\   rC   execute_commandrJ   argsoptionss      rK   r   zMultiDBClient.execute_command  s5     OO4t$$44dFgFFrM   c                     t        |       S )z:
        Enters into pipeline mode of the client.
        )Pipeliner_   s    rK   pipelinezMultiDBClient.pipeline  s     ~rM   funcr   c                 x    | j                   s| j                           | j                  j                  |g|| S )z3
        Executes callable as transaction.
        )rD   r\   rC   execute_transaction)rJ   r   watchesr   s       rK   transactionzMultiDBClient.transaction  s:     OO8t$$88RR'RRrM   c                 R    | j                   s| j                          t        | fi |S )z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        )rD   r\   PubSub)rJ   kwargss     rK   pubsubzMultiDBClient.pubsub%  s'     OOd%f%%rM   c                 v   | j                   j                  | j                  |      }|sH|j                  j                  t
        j                  k7  rt
        j                  |j                  _        |S |rF|j                  j                  t
        j                  k7  rt
        j                  |j                  _        |S )zO
        Runs health checks on the given database until first failure.
        )r8   executer1   rR   rU   rV   OPENrW   )rJ   rZ   
is_healthys      rK   re   zMultiDBClient._check_db_health0  s    
 ..66t7J7JHU
%%5)0  &H,,22gnnD%,^^H"rM   c           	      ^   t        t        | j                              5 }| j                  D ci c]"  \  }}|j                  | j                  |      |$ }}}i }	 t        || j                        D ]  }	 ||   }|j                         ||<    	 ddd       |S c c}}w # t        $ r[}|j                  }t        j                  |j                  _        t        j                  d|j                          d||<   Y d}~d}~ww xY w# t"        $ r t#        d      w xY w# 1 sw Y   S xY w)zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        )max_workers)timeoutz%Health check failed, due to exception)exc_infoFNz4Health check execution exceeds health_check_interval)r   lenr.   submitre   r   r3   resultr   rZ   rV   r   rR   rU   loggerdebugoriginal_exceptionTimeoutError)	rJ   executorrZ   rk   futuresresultsfutureeunhealthy_dbs	            rK   rQ   z%MultiDBClient._check_databases_health@  s6   
  C,@A 	X $(??Ha  5 5x@(JG 
 G*T%@%@ 6F6#*6?,2MMO)6	< 9 6 	6'(zz5<\\,,2C%&%9%9 % 
 16-	6   "J 5	< sY   D"'BD"D
8B#D
D"#	D,AD=D
DD

DD""D,c                    | j                         }d}| j                  j                  t        j                  k(  rd|j                         v}n| j                  j                  t        j                  k(  r)t        |j                               t        |      dz  kD  }n9| j                  j                  t        j                  k(  rd|j                         v }|s"t        d| j                  j                         y)zj
        Runs initial health check and evaluate healthiness based on initial_health_check_policy.
        TF   z:Initial health check failed. Initial health check policy: N)rQ   rI   initial_health_check_policyr   ALL_AVAILABLEvaluesMAJORITY_AVAILABLEsumr   ONE_AVAILABLEr   )rJ   r   r   s      rK   rO   z+MultiDBClient._perform_initial_health_checke  s     ..0
<<337I7W7WWgnn&66JLL44!445 W^^-.W1AAJLL448J8X8XX!11J/LT\\MuMuLvw  rM   rR   	old_state	new_statec                    |t         j                  k(  r| j                  |j                         y |t         j                  k(  r[|t         j
                  k(  rHt        j                  d|j                   d       | j                  j                  t        t        |       |t         j                  k7  r8|t         j                  k(  r$t        j                  d|j                   d       y y y )Nz	Database z- is unreachable. Failover has been initiated.z is reachable again.)rV   	HALF_OPENre   rZ   rW   r   r   warningrH   run_oncer   _half_open_circuitinfo)rJ   rR   r   r   s       rK   rT   z/MultiDBClient._on_circuit_state_change_callback}  s     )))!!'"2"23&9+DNNG,,--Z[ ''$&8' &9+FKK)G$4$4#55IJK ,G&rM   c                     | j                   r| j                   j                          | j                  j                  r/| j                  j                  j                  j                          yy)z:
        Closes the client and all its resources.
        N)rH   stoprC   rh   rv   closer_   s    rK   r   zMultiDBClient.close  sQ     ##%  00!!1188>>@ 1rM   )T)$__name__
__module____qualname____doc__r   rL   r\   r   r`   r   rm   r   boolr   r   r   r   floatr   r   r   r   r   r   r   r   r   r   re   dictrQ   rO   r   rV   rT   r   rx   rM   rK   r#   r#   $   s'   
(} (T  Dy 
L 
T 
: IM6D$6DAE6Dp
(
CO
  D| DU D&9_ 94K 4GS*t); < S	& $  #hn)= #J0L%L29LFML&ArM   r#   rR   c                 .    t         j                  | _        y r   )rV   r   rU   )rR   s    rK   r   r     s    %%GMrM   c                   x    e Zd ZdZdefdZddZd Zd Zde	fdZ
defd	ZddZddZddZd Zdee   fdZy
)r   zG
    Pipeline implementation for multiple logical Redis databases.
    rv   c                      g | _         || _        y r   )_command_stack_client)rJ   rv   s     rK   rL   zPipeline.__init__  s     rM   r]   c                     | S r   rx   r_   s    rK   	__enter__zPipeline.__enter__      rM   c                 $    | j                          y r   reset)rJ   exc_type	exc_value	tracebacks       rK   __exit__zPipeline.__exit__      

rM   c                 D    	 | j                          y # t        $ r Y y w xY wr   r   	Exceptionr_   s    rK   __del__zPipeline.__del__  s"    	JJL 		    	c                 ,    t        | j                        S r   )r   r   r_   s    rK   __len__zPipeline.__len__  s    4&&''rM   c                      y)z1Pipeline instances should always evaluate to TrueTrx   r_   s    rK   __bool__zPipeline.__bool__  s    rM   Nc                     g | _         y r   )r   r_   s    rK   r   zPipeline.reset  s
     rM   c                 $    | j                          y)zClose the pipelineNr   r_   s    rK   r   zPipeline.close  s    

rM   c                 @    | j                   j                  ||f       | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   r   r   s      rK   pipeline_execute_commandz!Pipeline.pipeline_execute_command  s!     	""D'?3rM   c                 &     | j                   |i |S )zAdds a command to the stack)r   rJ   r   r   s      rK   r   zPipeline.execute_command  s    ,t,,d=f==rM   c                    | j                   j                  s| j                   j                          	 | j                   j                  j	                  t        | j                              | j                          S # | j                          w xY w)z0Execute all the commands in the current pipeline)r   rD   r\   rC   execute_pipelinetupler   r   r_   s    rK   r   zPipeline.execute  s_    ||''LL##%	<<00AAd))* JJLDJJLs   7A: :B)r]   r   r]   N)r   r   r   r   r#   rL   r   r   r   intr   r   r   r   r   r   r   r   r   r   rx   rM   rK   r   r     s^    } ( ($ !>
c 
rM   r   c                       e Zd ZdZdefdZddZddZddZdd	Z	e
defd
       Zd Zd Zd Zd Zd Zd Zd Z	 ddedefdZ	 ddedefdZ	 	 	 	 ddededee   deddf
dZy) r   z2
    PubSub object for multi database client.
    rv   c                 ^    || _          | j                   j                  j                  di | y)zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        Nrx   )r   rC   r   )rJ   rv   r   s      rK   rL   zPubSub.__init__  s(     ,%%,,6v6rM   r]   c                     | S r   rx   r_   s    rK   r   zPubSub.__enter__  r   rM   Nc                 D    	 | j                          y # t        $ r Y y w xY wr   r   r_   s    rK   r   zPubSub.__del__  s$    	 JJL 		r   c                 L    | j                   j                  j                  d      S )Nr   r   rC   execute_pubsub_methodr_   s    rK   r   zPubSub.reset  s    ||,,BB7KKrM   c                 $    | j                          y r   r   r_   s    rK   r   zPubSub.close   r   rM   c                 V    | j                   j                  j                  j                  S r   )r   rC   active_pubsub
subscribedr_   s    rK   r  zPubSub.subscribed  s    ||,,::EEErM   c                 P     | j                   j                  j                  dg| S )Nr   r   rJ   r   s     rK   r   zPubSub.execute_command  s,    Bt||,,BB
 $
 	
rM   c                 V     | j                   j                  j                  dg|i |S )aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscriber   r   s      rK   r  zPubSub.psubscribe  7     Ct||,,BB

#)
 	
rM   c                 P     | j                   j                  j                  dg| S )zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscriber   r  s     rK   r
  zPubSub.punsubscribe  /    
 Ct||,,BB
!
 	
rM   c                 V     | j                   j                  j                  dg|i |S )aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscriber   r   s      rK   r  zPubSub.subscribe!  s7     Ct||,,BB

"(
 	
rM   c                 P     | j                   j                  j                  dg| S )zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscriber   r  s     rK   r  zPubSub.unsubscribe-  s(    
 Ct||,,BB=XSWXXrM   c                 V     | j                   j                  j                  dg|i |S )az  
        Subscribes the client to the specified shard channels.
        Channels supplied as keyword arguments expect a channel name as the key
        and a callable as the value. A channel's callable will be invoked automatically
        when a message is received on that channel rather than producing a message via
        ``listen()`` or ``get_sharded_message()``.
        
ssubscriber   r   s      rK   r  zPubSub.ssubscribe4  r  rM   c                 P     | j                   j                  j                  dg| S )zu
        Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
        all shard_channels
        sunsubscriber   r  s     rK   r  zPubSub.sunsubscribe@  r  rM   ignore_subscribe_messagesr   c                 R    | j                   j                  j                  d||      S )a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_messager  r   r   rJ   r  r   s      rK   r  zPubSub.get_messageI  s0     ||,,BB&? C 
 	
rM   c                 R    | j                   j                  j                  d||      S )a&  
        Get the next message if one is available in a sharded channel, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_sharded_messager  r   r  s      rK   r  zPubSub.get_sharded_messageY  s0     ||,,BB!&? C 
 	
rM   
sleep_timedaemonexception_handlersharded_pubsubr   c                 V    | j                   j                  j                  |||| |      S )N)r  r  r   r  )r   rC   execute_pubsub_run)rJ   r  r  r  r  s        rK   run_in_threadzPubSub.run_in_threadi  s6     ||,,??/) @ 
 	
rM   )r]   r   r   )F        )r"  FNF)r   r   r   r   r#   rL   r   r   r   r   propertyr   r  r   r  r
  r  r  r  r  r   r  r  r   r   r!  rx   rM   rK   r   r     s    	7} 	7L FD F F






Y


 IL
)-
@E
" IL
)-
@E
$  04$

 
 $H-	

 
 

rM   r   );loggingrE   concurrent.futuresr   concurrent.futures.threadr   typingr   r   r   r   redis.backgroundr	   redis.backoffr
   redis.clientr   redis.commandsr   r   redis.maint_notificationsr   redis.multidb.circuitr   r   rV   redis.multidb.command_executorr   redis.multidb.configr   r   r   r   redis.multidb.databaser   r   r   redis.multidb.exceptionr   r   r   redis.multidb.failure_detectorr   redis.multidb.healthcheckr   r   redis.observability.attributesr   redis.retryr    redis.utilsr!   	getLoggerr   r   r#   r   r   r   rx   rM   rK   <module>r8     s      + 8 0 0 0 # + < > 0 2 A  E D 
 ; D <  $			8	$ rA' rA rAj& &@"L @FU
 U
rM   