Source code for kubey.kubey

import os
import re
import logging
import time

from . import timestamp
from .kubectl import KubeCtl
from .cache import Cache
from .pod import Pod
from .node import Node
from .event import Event


_logger = logging.getLogger(__name__)


[docs]class Kubey(object):
[docs] class UnknownNamespace(ValueError): pass
ANY = '.' def __init__(self, config): self._config = config self.kubectl = KubeCtl(config.context) self._split_match() self._namespaces = self._cache('namespaces') self._nodes_cache = self._cache('nodes') self._pods_cache = self._cache('pods', '--all-namespaces') self._set_namespace() self._pods = None self._nodes = None def __repr__(self): return "<Kubey: context=%s namespace=%s match=%s/%s/%s>" % ( self.kubectl.context, self._config.namespace, self._node_re.pattern, self._pod_re.pattern, self._container_re.pattern)
[docs] def each_pod(self, limit=None): if self._pods: for pod in self._pods: yield pod return self._pods = [] for info in self._pods_cache.obj()['items']: if not self._pod_matches(info): continue pod = Pod(self._config, info, self._container_re.search) self._pods.append(pod) yield pod if self._exceeded_max(len(self._pods), limit): break
[docs] def each_node(self, limit=None, include_top_info=False): if self._nodes: for node in self._nodes: yield node return top_info = self._get_top_node_info() if include_top_info else {} self._nodes = [] for info in self._nodes_cache.obj()['items']: if not self._node_matches(info): continue node = Node(self._config, info, self.each_pod(), top_info) if self._config.namespace != self.ANY and len(node.pods) == 0: continue # no matching pods found self._nodes.append(node) yield node if self._exceeded_max(len(self._nodes), limit): break
[docs] def each_event(self, limit=None, watch_seconds=10): count = 0 args = ['events', '--all-namespaces', '--sort-by=lastTimestamp'] last_ts = youngest_ts = timestamp.epoch while True: json = self.kubectl.call_json('get', *args) for info in json['items']: if not self._event_matches(info): continue last_ts = timestamp.parse(info['lastTimestamp']) if last_ts <= youngest_ts: continue event = Event(self._config, info) yield event count += 1 if self._exceeded_max(count, limit): break # update after processing all items at least once (avoids dropping items w/ same ts) youngest_ts = last_ts time.sleep(watch_seconds)
# Private @staticmethod def _exceeded_max(count, limit): if limit and limit <= count: _logger.debug('Prematurely stopping at match maximum of {0}'.format(limit)) return True return False def _set_namespace(self): ns = '' if self._config.namespace == self.ANY else self._config.namespace self._namespace_re = re.compile(ns) if self._config.namespace == self.ANY: return # FIXME: namespace validation! # validation_query = 'items[?contains(metadata.name,\'%s\')].status.phase' % ( # self._config.namespace) # if not jmespath.search(validation_query, self._namespaces.obj()): # raise self.UnknownNamespace(self._config.namespace) def _split_match(self): match_items = self._config.match.split('/', 2) node = match_items.pop(0) if len(match_items) > 2 else '' pod = match_items.pop(0) container = match_items.pop(0) if len(match_items) > 0 else '' if node == self.ANY: node = '' if pod == self.ANY: pod = '' if container == self.ANY: container = '' self._node_re = re.compile(node, re.IGNORECASE) self._pod_re = re.compile(pod, re.IGNORECASE) self._container_re = re.compile(container, re.IGNORECASE) def _cache(self, name, *args): cache_fn = os.path.join( self._config.cache_path, '.%s_%s_%s' % (__name__, self.kubectl.context, name) ) return Cache( cache_fn, self._config.cache_seconds, self.kubectl.call_json, 'get', name, *args ) def _pod_matches(self, info): return (self._namespace_re.search(info['metadata']['namespace']) and self._node_re.search(info['spec'].get('nodeName', '')) and self._pod_re.search(info['metadata']['name'])) def _node_matches(self, info): return self._node_re.search(info['metadata']['name']) def _event_matches(self, info): return (self._namespace_re.search(info['metadata']['namespace']) and self._node_re.search(info['source'].get('host', '')) and self._pod_re.search(info['metadata']['name'])) def _get_top_node_info(self): info = {} def add_info(i, row): if i == 1: return info[row[0]] = row[1:] self.kubectl.call_table_rows(add_info, 'top', 'node') self.kubectl.wait() return info