
    vh~U                       d Z ddlmZ ddlmZmZ ddlZddlmZ ddl	Z	ddl
mZ ddlmZmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z  ddl!m"Z" ddlm#Z# ddl$m%Z% ddlZ&d/dZ'd0d1dZ(d2dZ)d Z*d Z+d3d4dZ,d5d6dZ-d7dZ.ed        Z/ed        Z0ed         Z1	 	 	 	 	 	 d8d!Z2	 	 	 	 	 	 d9d"Z3 ejh                  d#      Z5e5jm                  e2       d$ Z7e5jq                  e7        ejr                  e5d%        d& Z: ee:d      ejv                  e5<   d' Z< ejz                  e5e<       	 	 	 	 	 	 d8d(Z>	 	 	 	 	 	 d:d)Z? ejh                  d*      Z@e@jm                  e>       d+ ZAe@jq                  eA        ejr                  e@d,         ej                  e@       d- ZC ejz                  e@eC       d;d.ZDy)<zDUtilities for synchronizing and communication across multiple hosts.    )annotations)partial	lru_cacheN)Any)tree_flattentree_unflatten)core)ad)batching)mlir)array)sharding_impls)pxla)xla)pjit)PartitionSpec)distributed)safe_zip)
xla_bridge)
xla_clientc                D    t         j                  j                  d |       S )Nc                F    t        j                  | | j                  d      S )Nr   )dtypeaxis)jnpsumr   xs    [/opt/face_recognition/venv/lib/python3.12/site-packages/jax/experimental/multihost_utils.py<lambda>z_psum.<locals>.<lambda>+   s    q A     jaxtreemap)xss    r   _psumr'   *   s    	A2	FFr!   c           	        t        j                         dk(  r.t         j                  j                  t        j
                  |       S t        j                         dk(  t	        j                  t        j                               j                  t        j                         t        j                               }t         j                  j                  |d      t        d      fd}d }t         j                  j                  ||       }  t        j                  t        t         j                  j!                  t                           |       }t         j                  j                  ||      S )a  Broadcast data from a source host (host 0 by default) to all other hosts.

  Args:
    in_tree: pytree of arrays - each array *must* have the same shape across the
      hosts.
    is_source: optional bool denoting whether the caller is the source. Only
      'source host' will contribute the data for the broadcast. If None, then
      host 0 is used.

  Returns:
    A pytree matching in_tree where the leaves now all contain the data from the
    first host.
     r   	processeslocal_devicesr+   c                    r| }nt        j                  |       }t        j                  |d      }t        |      S )Nr   r   )np
zeros_likeexpand_dims host_local_array_to_global_array)r   inpglobal_mesh	is_sourcepspecs     r   pre_jitz%broadcast_one_to_all.<locals>.pre_jitG   s8    cMM!c
..1
%C+CeDDr!   c                J    t        j                  | j                  d            S )Nr   )r#   
device_getaddressable_datar   s    r   post_jitz&broadcast_one_to_all.<locals>.post_jitO   s    >>!,,Q/00r!   out_shardings)r#   process_countr$   r%   r/   asarrayprocess_indexr   devicesreshapelocal_device_countshardingMeshPjitr'   NamedSharding)in_treer5   rA   r7   r;   out_treer4   r6   s    `    @@r   broadcast_one_to_allrK   .   s
    	A88<<

G,,!!#q(I	kkmWS..0#2H2H2JK 
!!'+IJ+
K.%E1 HHLL'*'SWWU#,,*D*D13+  "(	h	))r!   c                    t        j                  t        j                  | j	                                     }t        |d|  d       y)z+Creates a barrier across all hosts/devices.z$sync_global_devices name mismatch ('z')N)r/   uint32zlibcrc32encodeassert_equal)namehs     r   sync_global_devicesrT   X   s3    ii

4;;=)*!q8bABr!   c                    | S N r   s    r   _identity_fnrX   `   s    	
(r!   c           	     ,   t        | t        j                        r| j                  st        | j                  t
        j                        r%| j                  j                  t                     }nIt
        j                  j                  | j                  j                  | j                  j                        } t        j                  t        |      |       }nt        j                          dk(  r0t#        j$                  |       }|st#        j&                  |d      S |S t#        j                  t        j(                               j+                  t        j                          t        j,                               }t        j                  j/                  |d      }t        d      }t        j                  j                  ||      }t#        j$                  |       }|j0                  dk(  s|st#        j&                  |d      }t3        j4                  |j6                  |j8                        }	t;        j<                  |t;        j>                  |      |	      }
t        j@                         D cg c]  }t        jB                  ||       }}t        jD                  |
j6                  ||      } t        j                  t        t        j                  |t                           |      }t#        j$                  |jG                  d            S c c}w )	N)spec)memory_kindr<   r)   r   r.   r*   r+   )$
isinstancer   	ArrayImplis_fully_addressablerD   r   rH   updaterF   GSPMDShardingget_replicated_device_assignmentr[   r#   rG   rX   r>   r/   r?   r1   rA   rB   rC   rE   ndimr	   ShapedArrayshaper   r   mesh_local_to_globalget_array_mappingr,   
device_put$make_array_from_single_device_arraysr:   )r3   tiledrepsoutrA   r4   r6   shost_np_arravalglobal_avaldbufs
global_arrs                 r   _handle_array_process_allgatherrt   d   s<   U__%c.F.F#,, < <=\\  ac *d))88
,,
)
)s||7O7O 9 Qd
3#'',d
3C
8C aJJsOc05R^^Ca(>3>hhs{{}%--c.?.?.A.1.D.D.FHG,,##G-KLKkNE"";6A**S/K1ENN;Q7kK--{/@/@AD++T++E2D:K 584E4E4GHqCNN;*HDH;;1d$JE#'', # 1 1+qs CEEOQC 
C((+	,, Is   0LFc                N    fd}t         j                  j                  ||       S )a  Gather data from across processes.

  Args:
    in_tree: pytree of arrays - each array _must_ have the same shape across the
      hosts.
    tiled: Whether to stack or concat the output. Defaults to False i.e. stack
      into a new positional axis at index 0.

  Returns:
    Pytrees of numpy arrays.
      * If the input is a non-fully addressable jax.Array, then the data is
        fully replicated.
      * If the input is numpy array or fully addressable jax.Array, then the
        output shape is dependent on the `tiled` argument.
        If its False, then the output will be stacked else concatenated.
      * If the input is a scalar, then the output will be stacked.
  c                    t        |       S rV   )rt   )r3   rj   s    r   _pjitz process_allgather.<locals>._pjit   s    *366r!   r"   )rI   rj   rw   s    ` r   process_allgatherrx      s    &7	eW	%%r!   c                    t        |       }t        j                  j                  t        j                  j	                  d | |            st        | d| d|  d      y)z9Verifies that all the hosts have the same tree of values.c                 L    t        j                  t        j                  |        S rV   )r/   allequalr   s    r   r    zassert_equal.<locals>.<lambda>   s    rxx|(< r!   z Expected: z; got: .N)rK   r#   	tree_utiltree_alltree_mapAssertionError)rI   fail_messageexpecteds      r   rQ   rQ      sa    !'*(				mm<gxP
R
.H:WWIQ?A A
Rr!   c                    t         j                  j                  yt         j                  j                  }|t	        d      |j                  |       S )a  Determine whether all hosts have reached a preemption sync step.

  When any host receives a preemption notice, the notice is propagated to all
  hosts and triggers a synchronization protocol in the background. The
  synchronization protocol calculates the maximum step ids from all hosts, and
  uses the next step id (i.e., max + 1) as the safe step to save a checkpoint.
  All hosts should continue training more steps until this method returns True,
  indicating that the `step_id` is equal to the safe step and the hosts should
  start saving a checkpoint.

  To use this API, all hosts must start training from the same step and call it
  at every training step. Example usage:

  ```
  def should_save(step_id: int) -> bool:

    # Should save an on-demand checkpoint for preemption
    if multihost_utils.reached_preemption_sync_point(step_id):
      return True

    # Should save a regular checkpoint
    return step_id - last_saved_checkpoint_step >= save_interval_steps
  ```

  Preemption notice is provided by the cluster scheduler to notify the
  application in advance before it gets evicted. By default, we use SIGTERM as
  the signal for preemption notice.

  TODO(b/230630494): Add instructions for customized preemption notice.

  Returns:
    A boolean indicating whether all hosts have reached a synchronization step
    after some hosts are preempted.

  Raises:
    RuntimeError: if preemption sync manager has not been initialized.
  Fz1Preemption sync manager has not been initialized.)r   global_stateclientpreemption_sync_managerRuntimeErrorreached_sync_point)step_idsync_managers     r   reached_preemption_sync_pointr      sN    L $$,))AA,
J
KK		(	(	11r!   c                >    t        j                  | | |       d      S )NT)tupled_args)pjit_libflatten_axis_resources)rR   rI   pspecs_thunks      r   _flatten_pspecsr      s     		(	(
G\^
7 7r!   c                V    t        j                  |t        j                  |      |       S rV   )r   rf   rg   )
local_avalmeshr6   s      r   _local_to_global_avalr      s'    		"	"4)?)?)F#-
/ /r!   c                V    t        j                  |t        j                  |      |       S rV   )r   mesh_global_to_localrg   )rp   r   r6   s      r   _global_to_local_avalr      s'    		"	"4)?)?)F#.
0 0r!   c               "   |t        d      t        | t        j                        r| j                  s| S t        | t        j                        rCt        | j
                  t        j
                  j                        rt        j                  |       } t        j
                  j                  |j                  |      }t        | t        j                        rJ| j
                  j                  || j                        r$| j                  D cg c]  }|j                   }}nSt        j                   |       } |j#                  | j$                        j'                         D cg c]
  \  }}| |    }}}t)        t+        j,                  | j$                  | j.                        ||      }t1        j2                  |t        j
                  j                  ||      |t5        |j                  j6                  j8                              S c c}w c c}}w )N`None` is not a valid input to the pspecs argument. Please use jax.sharding.PartitionSpec() if you wanted to replicate your input.)
ValueErrorr\   r   r]   r^   rD   r#   PmapShardingr/   rH   
local_meshis_equivalent_torc   addressable_shardsdatar   canonicalize_dtypedevices_indices_mapre   itemsr   r	   rd   r   r   batched_device_putlistrA   flat)	arrr4   r6   local_shardingr   arraysrq   indexrp   s	            r   %host_local_array_to_global_array_implr      s   
]
	NO O U__%c.F.FJU__%*	llCLL--+/
((3-C<<--k.D.DeL.
 eoo&	ll##NCHH=!445aff5F5

 
 
%C '::399EKKMOAu 	E
OF O &
syy#)),k5B+ 
	 	 3<<--k5Ad;))11667
9 9 6Os   H"Hc           	         t        |       \  }}t        d|t        j                  |            }t	        ||      D cg c]  \  }}t
        j                  |||       }}}t        ||      S c c}}w )a  Converts a host local value to a globally sharded jax.Array.

  This function takes host-local data (which might be different
  across hosts), and populates a global array with this data, where each
  device on each host, get the appropriate slice of the data according to
  sharding defined by the global_mesh/pspects.

  For example:

  >>> global_mesh = jax.sharding.Mesh(jax.devices(), 'x')
  >>> pspecs = jax.sharding.PartitionSpec('x')
  >>> host_id = jax.process_index()
  >>> arr = host_local_array_to_global_array(np.arange(4) * host_id, mesh, pspecs)  # NB: assumes jax.local_device_count() divides 4.   # doctest: +SKIP

  The resulting array will have the shape (4 * num_processes) and will
  have distributed value of: (0, 1, 2, 3, 0, 2, 4, 6, 0, 3, 6, 9, ... ),
  where each slice np.arange(4) * host_id will be partitioned across the
  corresponding host's devices.

  Similarly:

  >>> mesh = jax.sharding.Mesh(np.array(jax.devices()).reshape(jax.process_count(), jax.local_device_count()), ['host', 'dev'])
  >>> pspecs = jax.sharding.PartitionSpec('host')
  >>> host_id = jax.process_index()
  >>> arr = host_local_array_to_global_array(np.arange(4) * host_id, mesh, pspecs)  # doctest: +SKIP

  will create the same distributed value (0, 1, 2, 3, 0, 2, 4, 6, ...),
  however each slice np.arange(4) * i will be *replicated* across corresponding
  host devices.

  On the other hand, if pspecs = PartitionSpec(), which means
  replication across all axes, then this snippet:

  >>> pspecs = jax.sharding.PartitionSpec()
  >>> arr = host_local_array_to_global_array(np.arange(4), mesh, pspecs)  # doctest: +SKIP

  will have the shape (4,) and the value (0, 1, 2, 3) will be replicated
  across all hosts and devices.

  It is an undefined behavior to have not identical local_inputs with pspec
  indicating data replication.

  You can use this function to transition to jax.Array. Using jax.Array with
  pjit has the same semantics of using GDA with pjit i.e. all jax.Array
  inputs to pjit should be globally shaped.

  If you are currently passing host local values to pjit, you can use this
  function to convert your host local values to global Arrays and then pass that
  to pjit.


  Example usage.

  >>> from jax.experimental import multihost_utils # doctest: +SKIP
  >>>
  >>> global_inputs = multihost_utils.host_local_array_to_global_array(host_local_inputs, global_mesh, in_pspecs) # doctest: +SKIP
  >>>
  >>> with mesh: # doctest: +SKIP
  >>>   global_out = pjitted_fun(global_inputs) # doctest: +SKIP
  >>>
  >>> host_local_output = multihost_utils.global_array_to_host_local_array(global_out, mesh, out_pspecs) # doctest: +SKIP

  Please note this function requires global mesh to be a continuous mesh, meaning
  that  devices that belong to each host should form a subcube in this mesh.
  To move local data to global array with non-continuous mesh use
  jax.make_array_from_callback or jax.make_array_from_single_device_arrays
  instead.

  Args:
    local_inputs: A Pytree of host local values.
    global_mesh: A jax.sharding.Mesh object. The mesh must be a contiguous mesh,
    that is all hosts' devices must form a subcube in this mesh.
    pspecs: A Pytree of jax.sharding.PartitionSpec's.

  Returns:
    A pytree of global arrays.
  zinput pspecsr4   r6   )r   r   r   hashable_pytreer   "host_local_array_to_global_array_pbindr   )	local_inputsr4   pspecs	flat_inpsrI   	in_pspecsr3   in_specout_flats	            r   r2   r2     s    ^ $L1)Wng&66v>@)
 #9i8 #w )--c{4; . =( 
 
	**   "A/r2   c               l    t        t        j                  | j                  | j                        ||      S rV   )r   r	   rd   re   r   r   r4   r6   s      r   ltg_abstract_evalr   g  -    	
syy#)),k5
B Br!   c                0    t        j                  | fi |fS rV   )r   r   ct_paramss      r   r    r    m      499"GG'J r!   c                    |\  }|\  }|j                   d n|j                   }t        |      }	|	j                  ||       t        |	 }	t        j                  |||	      }
|
|fS )Nr   )	spmd_namer   insertrF   r   r   )insert_axis	axis_datavals_indims_inr4   r6   r   rq   	new_parts	new_pspecys              r   ltg_batcherr   p  so    "!"!))1dy7J7J)5k)1i m)(--[	 . 3!	
A+r!   c                   |gS rV   rW   ctxr   r4   r6   s       r   _ltg_loweringr   }  	    
*r!   c                  |t        d      t        | t        j                        r| j                  r| S t
        j                  j                  ||      }t
        j                  j                  |j                  |      }t        t        j                  | j                  | j                        ||      }t        | t        j                        rn| j                  j                  || j                        r| j                   }n"t        j"                  | |      }|j                   }t        j                  |||d      S t%        j&                  |       } |j)                  | j                        j+                         D 	cg c]
  \  }}	| |	    }}}	t-        j.                  |||t1        |j                  j2                  j4                              S c c}	}w )Nr   T)	committed)r   r\   r   r]   r^   r#   rD   rH   r   r   r	   rd   re   r   r   rc   _arraysrh   r   r   r   r   r   r   r   rA   r   )
r   r4   r6   global_shardingr   r   r   resharded_arrayrq   r   s
             r   %global_array_to_host_local_array_implr     s   
]
	NO O U__%#*B*BJLL..{EB/<<--k.D.DeL.$
syy#)),k5B* U__%
||$$_chh?{{fsO<o&&f??:~vNN 
 
 
%C '::399EKKMOAu 	E
OF O ""NF[##++0013 3Os   >Gc           	         t        |       \  }}t        d|t        j                  |            }t	        ||      D cg c]  \  }}t
        j                  |||       }}}t        ||      S c c}}w )a  Converts a global `jax.Array` to a host local `jax.Array`.

  You can use this function to transition to `jax.Array`. Using `jax.Array` with
  pjit has the same semantics of using GDA with pjit i.e. all `jax.Array`
  inputs to pjit should be globally shaped and the output from pjit will also
  be globally shaped jax.Array's

  You can use this function to convert the globally shaped `jax.Array` output
  from pjit to host local values again so that the transition to jax.Array can
  be a mechanical change.

  Example usage:

  >>> from jax.experimental import multihost_utils # doctest: +SKIP
  >>>
  >>> global_inputs = multihost_utils.host_local_array_to_global_array(host_local_inputs, global_mesh, in_pspecs) # doctest: +SKIP
  >>>
  >>> with mesh: # doctest: +SKIP
  ...   global_out = pjitted_fun(global_inputs) # doctest: +SKIP
  >>>
  >>> host_local_output = multihost_utils.global_array_to_host_local_array(global_out, mesh, out_pspecs) # doctest: +SKIP

  Args:
    global_inputs: A Pytree of global jax.Array's.
    global_mesh: A :class:`jax.sharding.Mesh` object. The mesh must be contiguous
      meaning all local devices of the host must form a subcube.
    pspecs: A Pytree of :class:`jax.sharding.PartitionSpec` objects.

  Returns:
    A Pytree of host local arrays.
  zoutput pspecsr   )r   r   r   r   r   "global_array_to_host_local_array_pr   r   )	global_inputsr4   r   r   rJ   
out_pspecsr3   or   s	            r    global_array_to_host_local_arrayr     s    B %]3)X'77?A*
 Y
3 #q )--c{45 . 7( 
 
(	++r   r   c               l    t        t        j                  | j                  | j                        ||      S rV   )r   r	   rd   re   r   r   s      r   gtl_abstract_evalr     r   r!   c                0    t        j                  | fi |fS rV   )r   r   r   s      r   r    r      r   r!   c                   |gS rV   rW   r   s       r   _gtl_loweringr     r   r!   c                   t         j                  j                  }|t        d      | st	        d      | D ch c]  }|j
                   }}t        j
                         |vrt	        d      t        |      dk(  r| S |j                  t        |            }| D cg c]  }|j
                  |v s| c}S c c}w c c}w )ay  Returns the subset of the provided devices that are live and healthy.

  This API is under active development and is not stable.

  `live_devices` is a low-level fault tolerance primitive that can be used to
  implement fault tolerant multi-process JAX programs.

  Barrier Semantics

  It's important that every process agrees on which devices are live to avoid
  the processes' behavior from diverging. For example, imagine a set of
  processes trying to run an AllGather, but they all disagree on which devices
  should be participating in the AllGather. This is buggy.

  To ensure that every process agrees on the set of live devices, the
  `live_devices` function has barrier-like semantics. Consider an invocation
  `live_devices(devices)` where `devices` includes devices across a set of
  processes P. The invocation acts as a barrier, waiting for every process in P
  to call `live_devices(devices)`. Afterwards, `live_devices` returns the same
  set of live devices `A` to all the processes in P. This ensures that every
  process agrees on the set of live devices.

  `live_devices` does not actually act as a barrier for *every* process in P
  because some processes in P might have failed. Instead, the `live_devices`
  function waits only for the processes with a device in the returned set of
  live devices A.

  An Example

  Imagine we have four processes, each with two devices:

    Process A: Devices 1 and 2
    Process B: Devices 3 and 4
    Process C: Devices 5 and 6
    Process D: Devices 7 and 8

  Further imagine that process D fails and that every process calls
  `live_devices(jax.devices())`. The invocation returns devices 1, 2, 3, 4, 5,
  and 6. Because these devices are hosted by processes A, B, and C, the call to
  `live_devices` acts as a barrier across processes A, B, and C. Process D,
  which failed, is ignored.

  Args:
    devices: A list of devices. The provided devices must include at least one
    local device.

  Returns:
    The subset of the provided devices that are live and healthy.

  Raises:
    RuntimeError: If the distributed runtime was not initialized.
    ValueError: If no local devices are provided.
  z Distributed JAX not initialized.zNo devices provided.z/Provided devices do not have any local devices.r)   )
r   r   r   r   r   r@   r   lenget_live_nodesr   )rA   r   rq   process_idslive_process_idss        r   live_devicesr     s    l ##**&^
9
::	 +
,,*12Q2+2{2 F
GG N**4+<=	D3C C!	DD 3 
Es   B8B=1B=)r&   r   returnr   rV   )rI   r   r5   zbool | Noner   r   )rR   str)F)rI   r   rj   boolr   r   ) )r   r   )r   intr   r   )r   r   r4   jax.sharding.Meshr6   r   )r   r   r4   r   r   r   )r   r   r4   r   r   r   )rA   list[xla_client.Device]r   r   )E__doc__
__future__r   	functoolsr   r   rN   typingr   r#   	jax.numpynumpyr   jax.tree_utilr   r   jax._srcr	   jax._src.interpretersr
   r   r   r   r   r   jax.interpretersr   r   r   jax.shardingr   rF   r   jax._src.utilr   r   jax._src.libr   r/   r'   rK   rT   rX   rt   rx   rQ   r   r   r   r   r   r2   	Primitiver   def_implr   def_abstract_eval
deflinear2r   fancy_primitive_batchersr   register_loweringr   r   r   r   defvectorizedr   r   rW   r!   r   <module>r     sF   K " (   
  6  $ * &  # &   % +   "  # G'*TC"-J&0A+2\ 7 7 / / 0 0
 9	 9/ 98; 9FW+W+$5W+?BW+r &4T^^4V%W " " + +,Q RB # 4 45F G 0JK	 IPI ! !"D E   9= I3	3/38;3B),),%6),@C),V &4T^^4V%W " " + +,Q RB # 4 45F G 0JK   9 :   9= ILEr!   