API¶
Functions¶
-
zproc.
ping
(server_address: str, *, timeout: Union[float, int, None] = None, sent_payload: Optional[bytes] = None, secret_key: str = None) → Optional[int][source]¶ Ping the zproc server.
This can be used to easily detect if a server is alive and running, with the aid of a suitable
timeout
.Parameters: - server_address –
The zproc server’s address.
Please read The server address spec for a detailed explanation.
- timeout –
The timeout in seconds.
If this is set to
None
, then it will block forever, until the zproc server replies.For all other values, it will wait for a reply, for that amount of time before returning with a
TimeoutError
.By default it is set to
None
. - sent_payload –
payload that will be sent to the server.
If it is set to None, then
os.urandom(56)
(56 random bytes) will be used.(No real reason for the
56
magic number.)
Returns: The zproc server’s pid if the ping was successful, else
None
If this returns
None
, then it probably means there is some fault in communication with the server.- server_address –
-
zproc.
atomic
(fn: Callable) → Callable[source]¶ Wraps a function, to create an atomic operation out of it.
No Process shall access the state while
fn
is running.Note
Please read Atomicity and race conditions for a detailed explanation.
Parameters: fn – The function
to be wrapped, as an atomic function.Returns: A wrapper function
.The “wrapper”
function
returns the value returned by the “wrapped”function
.>>> import zproc >>> >>> @zproc.atomic ... def increment(snapshot): ... return snapshot['count'] + 1 ... >>> >>> ctx = zproc.Context() >>> ctx.state['count'] = 0 >>> >>> increment(ctx.state) 1
-
zproc.
start_server
(server_address: str = None, *, backend: Callable = <class 'multiprocessing.context.Process'>, secret_key: str = None)[source]¶ Start a new zproc server.
Parameters: - server_address –
The zproc server’s address.
If it is set to
None
, then a random address will be generated.Please read The server address spec for a detailed explanation.
- backend –
The backend to use for launching the server process.
For example, you may use
threading.Thread
as the backend.Warning
Not guaranteed to work well with anything other than
multiprocessing.Process
.
Returns: tuple
, containing amultiprocessing.Process
object for server and the server address.- server_address –
Exceptions¶
Context¶
-
class
zproc.
Context
(server_address: str = None, *, wait: bool = False, cleanup: bool = True, server_backend: Callable = <class 'multiprocessing.context.Process'>, namespace: str = 'default', secret_key: str = None, **process_kwargs)[source]¶ Provides a high level interface to
State
andProcess
.Primarily used to manage and launch processes.
All processes launched using a Context, share the same state.
Don’t share a Context object between Processes / Threads. A Context object is not thread-safe.
Parameters: - server_address –
The address of the server.
If it is set to
None
, then a new server is started and a random address will be generated.Otherwise, it will connect to an existing server with the address provided.
Caution
If you provide a “server_address”, be sure to manually start the server, as described here - Starting the server manually.
Please read The server address spec for a detailed explanation.
- wait –
Wait for all running process to finish their work before exiting.
Alternative to manually calling
wait_all()
at exit. - cleanup –
Whether to cleanup the process tree before exiting.
Registers a signal handler for
SIGTERM
, and anatexit
handler. - server_backend – Passed on to
start_server()
asbackend
. - **process_kwargs –
Keyword arguments that
Process
takes, exceptserver_address
andtarget
.If provided, these will be used while creating processes using this Context.
Variables: - state – A
State
instance. - process_list – A list of child
Process
(s) created under this Context. - worker_list – A list of worker
Process
(s) created under this Context. Used forContext.process_map()
. - server_process – A
multiprocessing.Process
object for the server, or None. - server_address – The server’s address as a 2 element
tuple
. - namespace – Passed on from the constructor. This is read-only.
-
process
(target: Optional[Callable] = None, **process_kwargs) → Union[zproc.process.Process, Callable][source]¶ Produce a child process bound to this context.
Can be used both as a function and decorator:
@zproc.process() # you may pass some arguments here def my_process1(state): print('hello') @zproc.process # or not... def my_process2(state): print('hello') def my_process3(state): print('hello') zproc.process(my_process3) # or just use as a good ol' function
Parameters: Returns: The
Process
instance produced.
-
process_factory
(*targets, count: int = 1, **process_kwargs)[source]¶ Produce multiple child process(s) bound to this context.
Parameters: - *targets – Passed on to the
Process
constructor, one at a time. - count – The number of processes to spawn for each item in
targets
. - **process_kwargs –
Keyword arguments that
Process
takes, exceptserver_address
andtarget
.If provided, these will have a precedence over the one’s provided in
Context
’s constructor.
Returns: A
list
of theProcess
instance(s) produced.- *targets – Passed on to the
-
pull_results_for_task
(task_detail: zproc.context.TaskDetail) → Generator[[Any, None], None][source]¶ PULL “count” results from the process pool. Also arranges the results in-order.
-
process_map
(target: Callable, map_iter: Sequence[Any] = None, *, map_args: Sequence[Sequence[Any]] = None, args: Sequence = None, map_kwargs: Sequence[Mapping[str, Any]] = None, kwargs: Mapping = None, count: int = None, stateful: bool = False, new: bool = False, return_task: bool = False) → Union[zproc.context.TaskDetail, Generator[[Any, None], None]][source]¶ Functional equivalent of
map()
in-built function, but executed in a parallel fashion.Distributes the iterables provided in the
map_*
arguments tocount
no of workerProcess
(s).(Aforementioned worker processes are visible here:
Context.worker_list
)- The idea is to:
- Split the the iterables provided in the
map_*
arguments intocount
number of equally sized chunks. - Send these chunks to
count
number of workerProcess
(s). - Wait for all these worker
Process
(s) to finish their task(s). - Combine the acquired results in the same sequence as provided in the
map_*
arguments. - Return the combined results.
Steps 3-5 are done lazily, on the fly with the help of a
generator
- Split the the iterables provided in the
Note
This function won’t spawn new worker
Process
(s), each time it is called.Existing workers will be used if a sufficient amount is available. If the workers are busy, then this will wait for them to finish up their current work.
Use the
new=True
Keyword Argument to spawn new workers, irregardless of existing ones.You need not worry about shutting down workers. ZProc will take care of that automatically.
Note
This method doesn’t have a way to pass Keyword Arguments to
Process
.This was done, to prevent weird behavior due to the re-use of workers done by ZProc.
Use the
Context
’s constructor to workaround this problem.Parameters: - target –
The
Callable
to be invoked inside aProcess
.It is invoked with the following signature:
target(state, map_iter[i], *map_args[i], *args, **map_kwargs[i], **kwargs)
Where:
state
is aState
instance. (Disabled by default. Use thestateful
Keyword Argument to enable)i
is the index of nth element of the Iterable(s) provided in themap_*
arguments.args
andkwargs
are passed from the**process_kwargs
.
P.S. The
stateful
Keyword Argument ofProcess
allows you to omit thestate
arg. - map_iter – A sequence whose elements are supplied as the first positional argument (after
state
) to thetarget
. - map_args – A sequence whose elements are supplied as positional arguments (
*args
) to thetarget
. - map_kwargs – A sequence whose elements are supplied as keyword arguments (
**kwargs
) to thetarget
. - args –
The argument tuple for
target
, supplied aftermap_iter
andmap_args
.By default, it is an empty
tuple
. - kwargs –
A dictionary of keyword arguments for
target
.By default, it is an empty
dict
. - stateful –
Weather this process needs to access the state.
If this is set to
False
, then thestate
argument won’t be provided to thetarget
.If this is set to
True
, then aState
object is provided as the first Argument to thetarget
.Unlike
Process
it is set toFalse
by default. (To retain a similar API to in-builtmap()
) - new –
Weather to spawn new workers.
If it is set to
True
, then it will spawn new workers, irregardless of existing ones.If it is set to
False
, thensize - len(Context.worker_list)
will be spawned.Un-used workers are thrashed automatically.
- count –
The number of worker
Process
(s) to use.By default, it is set to
multiprocessing.cpu_count()
(The number of CPU cores on your system) - return_task –
Return a
TaskDetail
namedtuple object, instead of a Generator that yields the results of the computation.The
TaskDetail
returned can be passed toContext.pull_results_for_task()
, which will fetch the results for you.This is useful in situations where the results are required at a later time, and since a Generator object is not easily serializable, things get a little tricky. On the other hand, a namedtuple can be serialized to JSON, pretty easily.
Returns: The result is quite similar to
map()
in-built function.It returns a
generator
whose elements are the return value of thetarget
function, when applied to every item of the Iterables provided in themap_*
arguments.The actual “processing” starts as soon as you call this function.
The returned
generator
fetches the results from the worker processes, one-by-one.Warning
- If
len(map_iter) != len(maps_args) != len(map_kwargs)
, then the results will be cut-off at the shortest Sequence.
See Process Map for Examples.
-
call_when_change
(*keys, exclude: bool = False, live: bool = False, **process_kwargs)[source]¶ Decorator version of
get_when_change()
.Spawns a new
Process
, and then calls the wrapped function inside of that new process.The wrapped function is run with the following signature:
target(snapshot, state, *args, **kwargs)
Where:
target
is the wrapped function.snapshot
is adict
containing a copy of the state.Its serves as a snapshot of the state, corresponding to the state-change for which the wrapped function is being called.
state
is aState
instance.*args
and**kwargs
are passed on from**process_kwargs
.
Parameters: - *keys –
Watch for changes on these keys in the state
dict
.If this is not provided, then all state-changes are respected. (default)
- exclude –
Reverse the lookup logic i.e.,
Watch for all changes in the state except in
*keys
.If
*keys
is not provided, then this has no effect. (default) - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
. - duplicate_okay –
Whether it’s okay to process duplicate updates.
Please read Duplicate-ness of events for a detailed explanation.
- **process_kwargs –
Keyword arguments that
Process
takes, exceptserver_address
andtarget
.If provided, these will have a precedence over the one’s provided in
Context
’s constructor.
Returns: A decorator function The decorator function will return the
Process
instance created.import zproc ctx = zproc.Context() @ctx.call_when_change('gold') def test(snapshot, state): print(snapshot['gold'], state)
-
call_when
(test_fn: Callable, *, live: bool = False, **process_kwargs)[source]¶ Decorator version of
get_when()
.Spawns a new
Process
, and then calls the wrapped function inside of that new process.The wrapped function is run with the following signature:
target(snapshot, state, *args, **kwargs)
Where:
target
is the wrapped function.snapshot
is adict
containing a copy of the state.Its serves as a snapshot of the state, corresponding to the state-change for which the wrapped function is being called.
state
is aState
instance.*args
and**kwargs
are passed on from**process_kwargs
.
Parameters: - test_fn – A
Callable
, which is called on each state-change. - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
. - duplicate_okay –
Whether it’s okay to process duplicate updates.
Please read Duplicate-ness of events for a detailed explanation.
- **process_kwargs –
Keyword arguments that
Process
takes, exceptserver_address
andtarget
.If provided, these will have a precedence over the one’s provided in
Context
’s constructor.
Returns: A decorator function The decorator function will return the
Process
instance created.import zproc ctx = zproc.Context() @ctx.get_state_when(lambda state: state['trees'] == 5) def test(snapshot, state): print(snapshot['trees'], state)
-
call_when_equal
(key: collections.abc.Hashable, value: Any, *, live: bool = False, **process_kwargs)[source]¶ Decorator version of
get_when_equal()
.Spawns a new
Process
, and then calls the wrapped function inside of that new process.The wrapped function is run with the following signature:
target(snapshot, state, *args, **kwargs)
Where:
target
is the wrapped function.snapshot
is adict
containing a copy of the state.Its serves as a snapshot of the state, corresponding to the state-change for which the wrapped function is being called.
state
is aState
instance.*args
and**kwargs
are passed on from**process_kwargs
.
Parameters: - key – Some key in the state
dict
. - value – The value corresponding to the
key
in statedict
. - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
. - duplicate_okay –
Whether it’s okay to process duplicate updates.
Please read Duplicate-ness of events for a detailed explanation.
- **process_kwargs –
Keyword arguments that
Process
takes, exceptserver_address
andtarget
.If provided, these will have a precedence over the one’s provided in
Context
’s constructor.
Returns: A decorator function The decorator function will return the
Process
instance created.import zproc ctx = zproc.Context() @ctx.call_when_equal('oranges', 5) def test(snapshot, state): print(snapshot['oranges'], state)
-
call_when_not_equal
(key: collections.abc.Hashable, value: Any, *, live: bool = False, **process_kwargs)[source]¶ Decorator version of
get_when_not_equal()
.Spawns a new
Process
, and then calls the wrapped function inside of that new process.The wrapped function is run with the following signature:
target(snapshot, state, *args, **kwargs)
Where:
target
is the wrapped function.snapshot
is adict
containing a copy of the state.Its serves as a snapshot of the state, corresponding to the state-change for which the wrapped function is being called.
state
is aState
instance.*args
and**kwargs
are passed on from**process_kwargs
.
Parameters: - key – Some key in the state
dict
. - value – The value corresponding to the
key
in statedict
. - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
. - duplicate_okay –
Whether it’s okay to process duplicate updates.
Please read Duplicate-ness of events for a detailed explanation.
- **process_kwargs –
Keyword arguments that
Process
takes, exceptserver_address
andtarget
.If provided, these will have a precedence over the one’s provided in
Context
’s constructor.
Returns: A decorator function The decorator function will return the
Process
instance created.import zproc ctx = zproc.Context() @ctx.call_when_not_equal('apples', 5) def test(snapshot, state): print(snapshot['apples'], state)
-
call_when_none
(key: collections.abc.Hashable, *, live: bool = False, **process_kwargs)[source]¶ Decorator version of
get_when_none()
.Spawns a new
Process
, and then calls the wrapped function inside of that new process.The wrapped function is run with the following signature:
target(snapshot, state, *args, **kwargs)
Where:
target
is the wrapped function.snapshot
is adict
containing a copy of the state.Its serves as a snapshot of the state, corresponding to the state-change for which the wrapped function is being called.
state
is aState
instance.*args
and**kwargs
are passed on from**process_kwargs
.
Parameters: - key – Some key in the state
dict
. - value – The value corresponding to the
key
in statedict
. - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
. - duplicate_okay –
Whether it’s okay to process duplicate updates.
Please read Duplicate-ness of events for a detailed explanation.
- **process_kwargs –
Keyword arguments that
Process
takes, exceptserver_address
andtarget
.If provided, these will have a precedence over the one’s provided in
Context
’s constructor.
Returns: A decorator function The decorator function will return the
Process
instance created.
-
call_when_not_none
(key: collections.abc.Hashable, *, live: bool = False, **process_kwargs)[source]¶ Decorator version of
get_when_not_none()
.Spawns a new
Process
, and then calls the wrapped function inside of that new process.The wrapped function is run with the following signature:
target(snapshot, state, *args, **kwargs)
Where:
target
is the wrapped function.snapshot
is adict
containing a copy of the state.Its serves as a snapshot of the state, corresponding to the state-change for which the wrapped function is being called.
state
is aState
instance.*args
and**kwargs
are passed on from**process_kwargs
.
Parameters: - key – Some key in the state
dict
. - value – The value corresponding to the
key
in statedict
. - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
. - duplicate_okay –
Whether it’s okay to process duplicate updates.
Please read Duplicate-ness of events for a detailed explanation.
- **process_kwargs –
Keyword arguments that
Process
takes, exceptserver_address
andtarget
.If provided, these will have a precedence over the one’s provided in
Context
’s constructor.
Returns: A decorator function The decorator function will return the
Process
instance created.
-
call_when_available
(key: collections.abc.Hashable, *, live: bool = False, **process_kwargs)[source]¶ Decorator version of
get_when_available()
.Spawns a new
Process
, and then calls the wrapped function inside of that new process.The wrapped function is run with the following signature:
target(snapshot, state, *args, **kwargs)
Where:
target
is the wrapped function.snapshot
is adict
containing a copy of the state.Its serves as a snapshot of the state, corresponding to the state-change for which the wrapped function is being called.
state
is aState
instance.*args
and**kwargs
are passed on from**process_kwargs
.
Parameters: - key – Some key in the state
dict
. - value – The value corresponding to the
key
in statedict
. - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
. - duplicate_okay –
Whether it’s okay to process duplicate updates.
Please read Duplicate-ness of events for a detailed explanation.
- **process_kwargs –
Keyword arguments that
Process
takes, exceptserver_address
andtarget
.If provided, these will have a precedence over the one’s provided in
Context
’s constructor.
Returns: A decorator function The decorator function will return the
Process
instance created.
-
wait_all
(timeout: Union[float, int, None] = None, safe: bool = False) → List[Union[Any, Exception]][source]¶ Call
wait()
on all the child processes of this Context. (Excluding the worker processes)Retains the same order as
Context.process_list
.Parameters: Returns: A
list
containing the values returned by child Processes of this Context.
-
start_all
()[source]¶ Call
start()
on all the child processes of this ContextIgnores if a Process is already started, unlike
start()
, which throws anAssertionError
.
-
stop_all
()[source]¶ Call
stop()
on all the child processes of this ContextRetains the same order as
Context.process_list
.Returns: A list
containing the exitcodes of the child Processes of this Context.
- server_address –
Process¶
-
class
zproc.
Process
(target: Callable, server_address: str, *, stateful: bool = True, pass_context: bool = False, args: Sequence = None, kwargs: Mapping = None, retry_for: Sequence[Union[signal.Signals, Exception]] = (), retry_delay: Union[int, float] = 5, max_retries: Optional[bool] = None, retry_args: Optional[tuple] = None, retry_kwargs: Optional[dict] = None, start: bool = True, backend: Callable = <class 'multiprocessing.context.Process'>, namespace: str = 'default', secret_key: Optional[str] = None)[source]¶ Provides a higher level interface to
multiprocessing.Process
.Please don’t share a Process object between Processes / Threads. A Process object is not thread-safe.
Parameters: - server_address –
The address of zproc server.
If you are using a
Context
, then this is automatically provided.Please read The server address spec for a detailed explanation.
- target –
The Callable to be invoked inside a new process.
The ``target`` is invoked with the following signature:
target(state, *args, **kwargs)
Where:
state
is aState
instance.args
andkwargs
are passed from the constructor.
- pass_context –
Weather to pass a
Context
to this process.If this is set to
True
, then the first argument totarget
will be aContext
object in-place of the default -State
.In other words, The
target
is invoked with the following signature:target(ctx, *args, **kwargs)
Where:
ctx
is aContext
object.args
andkwargs
are passed from the constructor.
- stateful –
Weather this process needs to access the state.
If this is set to
False
, then thestate
argument won’t be provided to thetarget
.In other words, The
target
is invoked with the following signature:target(*args, **kwargs)
Where:
args
andkwargs
are passed from the constructor.
Has no effect if
pass_context
is set toTrue
. - start – Automatically call
start()
on the process. - retry_for –
Retry only when one of these
Exception
/signal.Signals
is raised.import signal # retry if a ConnectionError, ValueError or signal.SIGTERM is received. ctx.process( my_process, retry_for=(ConnectionError, ValueError, signal.SIGTERM) )
To retry for any Exception -
retry_for=(Exception, )
The items of this sequence MUST be a subclass of
BaseException
or of typesignal.Signals
. - retry_delay – The delay in seconds, before retrying.
- max_retries –
Give up after this many attempts.
A value of
None
will result in an infinite number of retries.After “max_tries”, any Exception / Signal will exhibit default behavior.
- args –
The argument tuple for
target
.By default, it is an empty
tuple
. - kwargs –
A dictionary of keyword arguments for
target
.By default, it is an empty
dict
. - retry_args –
Used in place of
args
when retrying.If set to
None
, then it has no effect. - retry_kwargs –
Used in place of
kwargs
when retrying.If set to
None
, then it has no effect. - backend –
The backend to use for launching the process(s).
For example, you may use
threading.Thread
as the backend.Warning
Not guaranteed to work well with anything other than
multiprocessing.Process
.
Variables: - child – A
multiprocessing.Process
instance for the child process. - server_address – Passed on from the constructor.
- target – Passed on from the constructor.
- namespace – Passed on from the constructor. This is read-only.
-
start
()[source]¶ Start this Process
If the child has already been started once, it will return with an
AssertionError
.Returns: the process PID
-
stop
()[source]¶ Stop this process.
Once closed, it should not, and cannot be used again.
Returns: exitcode
.
-
wait
(timeout: Union[float, int, None] = None)[source]¶ Wait until this process finishes execution, then return the value returned by the
target
.Parameters: timeout – The timeout in seconds.
If the value is
None
, it will block until the zproc server replies.For all other values, it will wait for a reply, for that amount of time before returning with a
TimeoutError
.Returns: The value returned by the target
function.If the child finishes with a non-zero exitcode, or there is some error in retrieving the value returned by the
target
, aProcessWaitError
is raised.
-
is_alive
¶ Whether the child process is alive.
Roughly, a process object is alive; from the moment the
start()
method returns, until the child process is stopped manually (usingstop()
) or naturally exits
-
pid
¶ The process ID.
Before the process is started, this will be None.
-
exitcode
¶ The child’s exit code.
This will be None if the process has not yet terminated. A negative value
-N
indicates that the child was terminated by signalN
.
- server_address –
State¶
-
class
zproc.
State
(server_address: str, *, namespace: str = 'default', secret_key: Optional[str] = None)[source]¶ Allows accessing state stored on the zproc server, through a dict-like API.
Communicates to the zproc server using the ZMQ sockets.
Please don’t share a State object between Processes/Threads. A State object is not thread-safe.
Boasts the following
dict
-like members, for accessing the state:- Magic methods:
__contains__()
,__delitem__()
,__eq__()
,__getitem__()
,__iter__()
,__len__()
,__ne__()
,__setitem__()
- Methods:
clear()
,copy()
,get()
,items()
,keys()
,pop()
,popitem()
,setdefault()
,update()
,values()
Parameters: server_address – The address of zproc server.
If you are using a
Context
, then this is automatically provided.Please read The server address spec for a detailed explanation.
Variables: server_address – Passed on from constructor. -
fork
(server_address: Optional[str] = None, *, namespace: Optional[str] = None, secret_key: Optional[str] = None) → zproc.state.State[source]¶ “Forks” this State object.
Takes the same args as the
State
constructor, except that they automatically default to the values provided during the creation of this State object.If no args are provided to this function, then it shall create a new
State
object that follows the exact same semantics as this one.This is preferred over copying a
State
object.Useful when one needs to access 2 or more namespaces on the same server.
-
go_live
()[source]¶ Clear the outstanding queue (or buffer), thus clearing any past events that were stored.
Internally, this re-opens a socket, which in-turn clears the queue.
Please read Live-ness of events for a detailed explanation.
-
get_raw_update
(live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False) → Tuple[dict, dict, bool][source]¶ A low-level hook that emits each and every state update.
Parameters: - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
. - duplicate_okay –
Whether it’s okay to process duplicate updates.
Please read Duplicate-ness of events for a detailed explanation.
- live –
-
get_when_change
(*keys, exclude: bool = False, live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False) → dict[source]¶ Block until a change is observed, and then return a copy of the state.
Parameters: - *keys –
Watch for changes on these keys in the state
dict
.If this is not provided, then all state-changes are respected. (default)
- exclude –
Reverse the lookup logic i.e.,
Watch for all changes in the state except in
*keys
.If
*keys
is not provided, then this has no effect. (default) - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
.
Returns: A dict
containing a copy of the state.This copy serves as a snapshot of the state, corresponding to the state-change for which this state watcher was triggered.
- *keys –
-
get_when
(test_fn, *, live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False) → dict[source]¶ Block until
test_fn(snapshot)
returns a “truthy” value, and then return a copy of the state.Where-
snapshot
is adict
, containing a copy of the state.Parameters: - test_fn – A
Callable
, which is called on each state-change. - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
. - duplicate_okay –
Whether it’s okay to process duplicate updates.
Please read Duplicate-ness of events for a detailed explanation.
Returns: A dict
containing a copy of the state.This copy serves as a snapshot of the state, corresponding to the state-change for which this state watcher was triggered.
- test_fn – A
-
get_when_equal
(key: collections.abc.Hashable, value: Any, *, live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False) → dict[source]¶ Block until
state[key] == value
, and then return a copy of the state.Parameters: - key – Some key in the state
dict
. - value – The value corresponding to the
key
in statedict
. - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
. - duplicate_okay –
Whether it’s okay to process duplicate updates.
Please read Duplicate-ness of events for a detailed explanation.
Returns: A dict
containing a copy of the state.This copy serves as a snapshot of the state, corresponding to the state-change for which this state watcher was triggered.
- key – Some key in the state
-
get_when_not_equal
(key: collections.abc.Hashable, value: Any, *, live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False) → dict[source]¶ Block until
state[key] != value
, and then return a copy of the state.Parameters: - key – Some key in the state
dict
. - value – The value corresponding to the
key
in statedict
. - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
. - duplicate_okay –
Whether it’s okay to process duplicate updates.
Please read Duplicate-ness of events for a detailed explanation.
Returns: A dict
containing a copy of the state.This copy serves as a snapshot of the state, corresponding to the state-change for which this state watcher was triggered.
- key – Some key in the state
-
get_when_none
(key: collections.abc.Hashable, *, live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False) → dict[source]¶ Block until
state[key] is None
, and then return a copy of the state.Parameters: - key – Some key in the state
dict
. - value – The value corresponding to the
key
in statedict
. - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
. - duplicate_okay –
Whether it’s okay to process duplicate updates.
Please read Duplicate-ness of events for a detailed explanation.
Returns: A dict
containing a copy of the state.This copy serves as a snapshot of the state, corresponding to the state-change for which this state watcher was triggered.
- key – Some key in the state
-
get_when_not_none
(key: collections.abc.Hashable, *, live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False) → dict[source]¶ Block until
state[key] is not None
, and then return a copy of the state.Parameters: - key – Some key in the state
dict
. - value – The value corresponding to the
key
in statedict
. - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
. - duplicate_okay –
Whether it’s okay to process duplicate updates.
Please read Duplicate-ness of events for a detailed explanation.
Returns: A dict
containing a copy of the state.This copy serves as a snapshot of the state, corresponding to the state-change for which this state watcher was triggered.
- key – Some key in the state
-
get_when_available
(key: collections.abc.Hashable, *, live: bool = False, timeout: Union[float, int, None] = None, duplicate_okay: bool = False)[source]¶ Block until
key in state
, and then return a copy of the state.Parameters: - key – Some key in the state
dict
. - value – The value corresponding to the
key
in statedict
. - live –
Whether to get live updates.
Please read Live-ness of events for a detailed explanation.
- timeout –
Sets the timeout in seconds.
If the value is
None
, it will block until an update is available.For all other values (
>=0
), it will wait for a state-change, for that amount of time before returning with aTimeoutError
. - duplicate_okay –
Whether it’s okay to process duplicate updates.
Please read Duplicate-ness of events for a detailed explanation.
Returns: A dict
containing a copy of the state.This copy serves as a snapshot of the state, corresponding to the state-change for which this state watcher was triggered.
- key – Some key in the state