Watchers: Monitoring Kubernetes Activity

Reference doc

Intro

The K8s API includes a set of interfaces that allow you receive events describing the activities that K8s is undertaking. The general term for these are watches, and there are a variety of ways you can tune watch activities.

Hikaru provides a simplifying interface on top of these base facilities, reducing the knowledge you must have of watchable classes, group API objects, and watch interface semantics required to successfully set up watches on a K8s infrastructure.

The watch module

You access these capabilities through the hikaru.watch module. Besides knowing the objects you want to watch, there are only three classes in the watch module that you need to interact with:

  • The Watcher class provides a way to set up a stream of events on single kinds of Hikaru model objects such as Pods or Nodes. Once created, instances can be used to stream updates from the watched resource kinds.

  • The MultiplexingWatcher class provides a container for Watcher classes that produces a single stream of events for multiple kinds of resources.

  • The WatchEvent class is the wrapper object in which watch events are delivered; it contains an indication of the type of event (ADDED, MODIFIED, DELETED) and a Hikaru model instance of the type indicated during the Watcher’s creation.

A simple example

Here’s a minimal example that prints out some Pod metadata for Pod events across all namespaces:

from kubernetes import config
from hikaru.model.rel_1_22 import Pod
from hikaru.watch import Watcher

def pod_watcher():
    # config file location on my dev system:
    config.load_kube_config(config_file='/etc/rancher/k3s/k3s.yaml')
    watcher = Watcher(Pod)
    for we in watcher.stream():
        p: Pod = we.obj
        print(f"{we.etype}, {p.metadata.namespace}, {p.metadata.name}")

if __name__ == "__main__":
    pod_watcher()

The line watcher = Watcher(Pod) creates a watcher for Pod events across all namespaces, and watcher.stream() creates a generator that you can iterate over as it yields up WatcherEvent instances, here the we variable in the for loop. The obj attribute is a reference to the Hikaru model object that was sent from K8s, and the etype attribute contains one of the strings ADDED, MODIFIED, or DELETED to indicate what action was taken relative to the resource in obj.

When the above is run on my dev system, I get repeated sets of three lines for the currently existing Pods in the k3s system I use; these look like the following:

ADDED, kube-system, local-path-provisioner-5ff76fc89d-n5mr9
ADDED, kube-system, coredns-854c77959c-mdw6w
ADDED, kube-system, metrics-server-86cbb8457f-lgmgq
ADDED, kube-system, local-path-provisioner-5ff76fc89d-n5mr9
ADDED, kube-system, coredns-854c77959c-mdw6w
ADDED, kube-system, metrics-server-86cbb8457f-lgmgq

These same three lines appear repeatedly because of the default values of the keyword arguments to Watcher and the initiation of the stream(). We’ll go into these details below, but we can change the code to stop this behaviour by providing the Watcher a timeout_seconds argument with a value of None:

watcher = Watcher(Pod, timeout_seconds=None)

This results in getting these initial lines only once, and then lines for any new events that occur.

Working with Watchers

In Hikaru, a Watcher provides a control abstraction over top of the K8s watch facilities in the underlying K8s Python client, as well as hides the activities needed to dynamically determine the correct K8s interface classes and methods for the kind of object you want to watch. It not only wraps and exposes the underlying semantics, it implements some other common patterns on top of the underlying watchers that allows your code to be a bit simpler.

A K8s watch also produces a generator which may block until an event arrives. Hikaru’s Watcher manages this generator, and may restart it on timeouts or tell it to pick up with the most recent version of a resource, depending on how you configure it.

Key arguments when creating a Watcher

The main documentation for the Watcher class goes into each optional creation argument in detail, but two are worth going into more detail here as their interpretation can have some subtleties.

  • The timeout_seconds parameter instructs what timeout to set up for the underlying K8s watch object. The default value of 1 means that after a second of being idle the underlying watch generator will terminate. What the Watcher instance does if the underlying watch times-out depends on how you instructed the streaming operation to behave. If you supply a value of None for this argument then the underlying watch generator never times out. It can be good to have a timeout of 1 second as that gives the Watcher instance the opportunity to kill the underlying watch and/or exit the stream() method, as otherwise you have to wait until it delivers an event in order to stop it.

  • The resource_version parameter tells the underlying watch what version of the resource is older than the versions you want to consider. In other words, setting this to an integer or numeric string tells the watch that you don’t want any events for the resource whose version is the same or less than the version provided. If you don’t set any resource, how the Watcher behaves while streaming depends on the parameters to the stream() call.

So, in first example, when we created the Watcher with just the Pod argument, the timeout_seconds value was 1 and we didn’t specify any resource_version. This caused k3s to send events for the currently operating Pods. After a second of no further events, the underlying watch times out and stops, but because of the default arguments to stream() (more on these below), the watch is restarted and the same events are sent again. This is why there is the repeated listing of the same three Pods. When the value None is provided for timeout_seconds, the underlying watch never times out and hence we see only the three Pod events one time.

Streaming events

Once you have created a Watcher, you’re ready to start streaming events with the stream() method. This method has two arguments that govern its operation:

  • The manage_resource_version argument is a bool that tells the Watcher if you want it to manage the underlying watch in terms of what values to set for resource_version as the Watcher operates the watch. This defaults to False, so a Watcher normally does nothing about managing the resource_version of events, and just takes whatever is sent from K8s.

  • The quit_on_timeout argument is a bool that tells the Watcher how to behave if the underlying watch times out. The default, False, tells the Watcher to restart the watch if it times out. This is what contributed to the initial example from above repeatedly restarting the underlying watch: the watch had a default timeout of 1 second, and after a second of inactivity the watch exited. But since quit_on_timeout defaults to False, the Watcher instance restarts the underlying watch which runs again as if it was the first time.

The interaction of the resource_version argument to the Watcher constructor and the manage_resource_version argument to the stream() instance method can be subtle; you sometimes have to think about what’s happening underneath to be comfortable with the results you see, or to know what combination of argument values you need to get the behaviour you want. The table below explains what happens with each combination when streaming·so you can get the results you want (the argument ‘manage_resource_version’ is rendered as ‘manage resource version’ so that the first column isn’t too wide):

Resource Version Impacting Arguments

resource_version is None (default value)

resource_version is a resource version number (str or int)

manage resource version is False (default value)

May be K8s system dependent. On k3s, it results in the delivery of an ADDED event for each operating resource of the type specified for the Watcher, and then subsequent events of the resource type as they occur. If the underlying watch is restarted, then it is as if the stream() was started the first time: ADDED events for any existing resources of the Watcher’s type, and then any new events that occur while the Watcher is streaming. This means if you set a timeout for the Watcher and the underlying watch does indeed timeout, you may miss some events before the Watcher restarts the watch and the stream starts delivering events again. In general, this isn’t a very useful combination of options except when first trying out Watchers.

Indicates that you only want to receive events for this resource that come after the specified resource_version number. If this number is too low (there are no events held by the K8s system that far back), you will get an ApiException raised with a 410 status code and a reason string that names the oldest resource_version that is held by K8s at the present time. The exception class is found in kubernetes.client. This is a useful combination of options if the consumer of Watcher events will be making a record of the resourceVersions delivered so that in the case of a restart you can leap to the last observed resourceVersion and continue processing.

manage resource version is True

When Watcher.stream() starts execution, it begins by finding the lowest resourceVersion available for that resource, and then uses that value for resource_version in the underlying watch. This usually results in a series of initial events that may have already occurred in the past but should bring you up to date as to the current state of the K8s system. After this initial catch-up, the Watcher will then continue to process events as they arrive, but will also capture each newer resourceVersion values that are higher than the last kept version. If the underlying watch terminates for any reason (an exception or timeout), then the Watcher will use this highest observed resourceVersion as the new value for resource_version when it restarts the underlying watch, or even if the stream() call itself is re-started. Additionally, if any gaps wind up appearing in the event stream (as can happen), the Watcher will in this case move to the proper resourceVersion and continue processing events. This combination is good if you can survive receiving events that may have occurred in the past, but otherwise don’t want to bother with tracking the highest-valued resourceVersion.

In this case, as there is an initial resource_version specified when the Watcher was made, there is no attempt to look for the oldest available resourceVersion and ‘catch up’; the supplied resource_version will be used on the underlying watch and events will be processed from there. However, if it turns out that that the supplied version is too old and results in a 410 error, the Watcher will catch that and determine the lowest available version, and then continue to process from there. It will also record the highest observed resourceVersion as it processes events and will re-start the watch at this value if the underlying watch needs to be restarted or if the Watcher itself gets restarted with a new call to stream(). This combination is good when you don’t want to ever receive events that you’ve already received and can persist the highest-valued resourceVersion that has come out of the Watcher.

Stopping a Watcher

Once stream() is activated, it will continue to emit events subject to how its timeouts and resource_version management have been configured as discussed above. To stop the stream, you should invoke the Watcher’s stop() method. This method can be invoked while processing an event received from the stream() generator, or may be invoked from another thread.

Note

If invoked from another thread, the stop() won’t be acted upon until the underlying watch produces a new event and the Watcher can regain control.

If run in a for loop, a stream() can of course also be stopped by simply break ing out of the loop. However, if you can bother to have a break, it is just as easy to invoke stop().

A stopped Watcher can be started again with a new call to stream().

Managing the resource_version yourself

You can do the resource_version value management yourself if you don’t want the Watcher doing it for you. You can capture and persist the resource_version value from each Hikaru model object’s ObjectMeta object (the value of the metadata attribute on the top-level object in the WatchEvent instance) and remember to supply that back to the creation of any Watcher.

If you wind up getting any unexpected exceptions out of the stream() method and would like to restart it, you will need to update the Watcher’s resource_version value, otherwise you can wind up with a replay of events you’ve already seen (although this might be something you don’t mind). If you want to update a Watcher’s resource_version, use the update_resource_version() method on the Watcher, supplying the new resource_version to use. This value will be used the next time that stream() is invoked on the Watcher instance; it won’t have any effect on any currently running stream.

Namespaced and unnamespaced; what can be watched?

The underlying K8s APIs have different endpoints for narrowing a watch down to resources in a specified namespace. So for example, there are different endpoints to call if you want to watch Pod events across all of K8s vs Pod events from a specific namespace.

Additionally, there are some K8s resources that don’t have namespaces associated with them (such as Nodes), hence they only have a single API endpoint available for watches.

Hikaru provides some assistance in creating code that use these features through a few different means:

  • First, if a Hikaru model class doesn’t support any watches, a TypeError is raised when you try to create a Watcher on that class.

  • Second, you can indicate you want to use a namespaced Watcher simply by supplying the namespace keyword argument a value when creating a new Watcher. If the model class you supply doesn’t support namespaced watches, a TypeError is raised.

  • Third, you can get some help in remembering what classes support namespaced and unnamespaced watches by using the objects in the watchables module that accompanies each model version module in a version package.

  • Finally, from the perspective of creating a Watcher, both the singular item and item list version of Hikaru model objects can be used when building a Watcher. So for example, you can interchangeably use Pod and PodList to get a list of Pod WatchEvents from a Watcher.

Let’s look at these in turn.

Since only HikaruDocumentBase subclasses can potentially be watched, using anything else will result in a TypeError:

>>> from hikaru.watch import Watcher
>>> from hikaru.model.rel_1_22.v1 import ObjectMeta
>>> w = Watcher(ObjectMeta)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/haxsaw/hikaru/hikaru/watch.py", line 207, in __init__
    raise TypeError("cls must be a subclass of HikaruDocumentBase")

Additionally, the class must support watches:

>>> from hikaru.model.rel_1_22.v1 import SelfSubjectRulesReview
>>> w = Watcher(SelfSubjectRulesReview)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/haxsaw/hikaru/hikaru/watch.py", line 220, in __init__
    raise TypeError(f"{cls.__name__} has no watcher support")
TypeError: SelfSubjectRulesReview has no watcher support

The Hikaru won’t let you try to create a namespaced Watcher on classes that only support unnamespaced watches:

>>> from hikaru.model.rel_1_22.v1 import Node
>>> w = Watcher(Node, namespace='will-it-blend')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/haxsaw/hikaru/hikaru/watch.py", line 216, in __init__
    raise TypeError(f"{cls.__name__} has no namespaced watcher support")
TypeError: Node has no namespaced watcher support

So in general, you can check pretty quickly whether or not the class you want to watch supports the operations you have in mind.

Second, you can easily select namespace-bound Watchers simply by providing a value for the namespace argument:

>>> from hikaru.model.rel_1_22.v1 import Pod
>>> w = Watcher(Pod, namespace='some-business-unit')
>>>

All events streamed from such a Watcher will only be from the indicated namespace. The supplied class must support namespaced Watcher’s

Third, you can get some hints as to which classes can be watched with/without namespaces by using the watchables module:

>>> from hikaru.watch import Watcher
>>> from hikaru.model.rel_1_22.v1 import watchables
>>> w = Watcher(watchables.Watchables.Pod)
>>> # or, for a namespaced Watcher
>>> w = Watcher(watchables.NamespacedWatchables.Pod,
                namespace='some-business-unit')
>>>

Each version package (v1, v1beta1, etc) will contain a watchables module if there are any model objects in that version that can be watched. This module contains two classes:

  • Watchables, which contains attributes that are model classes that can be watched without a namespace.

  • NamespacedWatchables, which contains attributes that are model classes that can be watched with a namespace.

The attributes on these classes are simply references to the actual model classes in the model class module. Watcher allows you to use either, as they refer to the same object. The watchables module solely exists to provide some handy documentation that you can use in your IDE to know that classes can be watched and which can support namespaced watching.

Finally, Watcher allows you to use either the listing model class or the list’s item class when creating a watcher; either one will result in a stream of events of the list item’s class:

# these are the same:
w = Watcher(Pod)
w = Watcher(PodList)

When streaming such a Watcher, both will emit a series of events for Pod resources.

Streaming multiple event types: the MultiplexingWatcher

A Watcher yields events containing Hikaru instances of a single class. If you wish to monitor instances from multiple classes you need to make an additional Watcher for each class you wish to monitor. Managing multiple Watchers requires either configuring each for polling-style operations (setting timeout_seconds to 1, manage_resource_version to True, and quit_on_timeout to True), or using threading or multiprocessing to handle parallel streaming across all Watchers.

To take some of the burden of this type of use away from the user, Hikaru provides a convenience class, MultiplexingWatcher, that handles these issues for you and produces a single stream of K8s events containing different types of Hikaru model instances.

Note

If you are using the non-default model release in Hikaru, you must call set_global_default_release() with the name of the release you are using prior to streaming from a MultiplexingWatcher, otherwise the individual Watcher threads will wind up using the default release model instead of the one you intend. If code is written to a specific release, it’s good practice to always call set_global_default_release() when using a MultiplexingWatcher to ensure that the code won’t malfunction with a future release of Hikaru where the default release changes.

The MultiplexingWatcher is a container of Watchers that itself behaves like a Watcher. To use it, you create individual Watcher instances, each configured as you wish regarding timeout behaviour, namespaces, model class being watched, and other parameters, and then create a MultiplexingWatcher instance and call its add_watcher() method with each Watcher. You can then simply call stream() on the MultiplexingWatcher and receive a stream of WatchEvent``s containing model instances from all the different ``Watcher``s managed by the ``MultiplexingWatcher.

A simple multiplexing example

Below is some example code that looks for events on Namespaces and Pods using a MultiplexingWatcher:

from kubernetes import config
from hikaru import set_global_default_release
from hikaru.model.rel_1_22 import Pod, Namespace
from hikaru.watch import Watcher, MultiplexingWatcher

def muxing_watcher():
    # be sure to set the default release first!
    set_global_default_release("rel_1_22")
    # config file location on my dev system:
    config.load_kube_config(config_file='/etc/rancher/k3s/k3s.yaml')
    # make each Watcher:
    pod_watcher = Watcher(Pod)
    ns_watcher = Watcher(Namespace)
    # make the multiplexer and add the watchers:
    mux = MultiplexingWatcher()
    mux.add_watcher(pod_watcher)
    mux.add_watcher(ns_watcher)
    # and then stream:
    for we in mux.stream(manage_resource_version=True,
                         quit_on_timeout=False):
        if we.obj.kind == "Pod":
            # do stuff
        elif we.obj.kind == "Namespace":
            # do different stuff

if __name__ == "__main__":
        muxing_watcher()

Note that the MultiplexingWatcher takes the same arguments to stream() that a plain Watcher does. The MultiplexingWatcher passes the values of manage_resource_version and quit_on_timeout to each Watcher so they can be managed consistently.

A few key details regarding MultiplexingWatcher:

  • A MultiplexingWatcher can only contain one Watcher per watched model class– you can’t give two Watchers that both are watching Pods, for example. The MultiplexingWatcher will only manage the last supplied Watcher for any given class.

  • You can call add_watcher() while a MultiplexingWatcher() is streaming events, and the new Watcher will be started and its events will be added to the stream.

  • Likewise, you can call del_watcher() on a MultiplexingWatcher while it is streaming; however, you may still get a few events for the deleted Watcher’s model class as they may have already been received and may be queued internally in the MultiplexingWatcher instance.

Dealing with individual Watcher exceptions

A MultiplexingWatcher normally consumes any exceptions that its contained Watchers raise, giving no indication to the user of the MultiplexingWatcher that anything has happened. While this isn’t a problem in many cases, especially when manage_resource_version is True and quit_on_timeout is False, there are still exceptions (such as HTTP status code 500) that no Watcher is able to automatically recover from. In these cases, the exception will interrupt the stream() call of a Watcher. With no other mechanisms in place, MultiplexingWatcher will catch the exception from the contained Watcher and simply cull that Watcher from the set it manages.

To give MultiplexingWatcher users an opportunity to handle and recover from such errors, there is an optional argument, exception_callback, which can be provided to the MultiplexingWatcher during creation that is a callable that will be invoked if a Watcher allows any exception to escape out of stream() (outside of the ones that MultiplexingWatcher is prepared to handle). The callback has the following form:

def callback(mux: MultiplexingWatcher, w: Watcher, e: Exception):

…where mux is the MultiplexingWatcher that caught the Watcher exception, w is the Watcher that raised the exception, and e is the exception raised (these are normally instances of kubernetes.client.ApiException). The callback is free to perform any action it wishes on mux or w, and can even create a new Watcher and add it to mux. The return value of the callback will determine what the MultiplexingWatcher will do with the Watcher that raised the exception:

  • If True is returned, then the exception is considered handled by the callback and the MultiplexingWatcher will continue to monitor the Watcher for new events (but if none arrive, it won’t do anything about that). Note the value must be True, not just some value that logically evaluates to True.

  • If any other value is returned, then the MultiplexingWatcher will delete the Watcher that raised the exception.

Note

While in the callback, adding a new Watcher that watches the same model class as the one that just raised the exception won’t replace the old one with the new unless the handler returns True. Otherwise, the Watcher for that particular model class will simply be removed, whether it was a new Watcher or the one that raised the exception. So remember, if you wish to replace the Watcher within the exception handler, be sure to return True from the handler.

Callbacks can be any callable, such as a function or bound method on an instance. Below is an example of a callback that is an instance method:

from hikaru.watch import MultiplexingWatcher

class WatcherExceptionHandler(object):
    def __init__(self):
        # whatever you want

    def callback(self, mux, watcher, exc):
        # handle how you like; return True
        # to indicate you want to keep the handler going
        return True

exp_handler = WatcherExceptionHandler()
mux = MultiplexingWatcher(exception_callback=exp_handler.callback)

Stopping the MultiplexingWatcher

This works just like with the Watcher; simply invoke the stop() method on the instance. Since there is an internal queue within the MultiplexingWatcher, it is possible that it contains events that haven’t been delivered. Once stop is invoked, these events won’t be delivered unless stream() is invoked on the method again.

Note

It is important to invoke the stop() method on a MultiplexingWatcher; not doing so can result in significant memory consumption. Although you can simply break out of the loop that is running the generator returned by stream(), doing so will allow the threads that are running the Watcher``s inside the ``MultiplexingWatcher to continue to receive and queue events, however with no call to stream() these will never get yielded and subsequently deleted. So even if you do decide to use a break to exit and event-reading loop, be sure to invoke stop() on the MultiplexingWatcher() first.