import sys
import logging
import subprocess
import json
from configstruct import OpenStruct
from .background_popen import BackgroundPopen
from .table_row_popen import TableRowPopen
_logger = logging.getLogger(__name__)
[docs]class KubeCtl(object):
def __init__(self, context=None, config=None):
val = subprocess.check_output('which kubectl', shell=True).strip()
self._kubectl = val.decode('utf-8')
self._context = context
self._config = config
self._processes = []
self._threads = []
self.final_rc = 0
@property
def context(self):
if self._context is None:
ctx = subprocess.check_output(self._commandline('config', 'current-context')).strip()
self._context = ctx.decode('utf-8') # returns a bytestring
return self._context
@property
def config(self):
if self._config is None:
self._config = OpenStruct(self.call_json('config', 'view'))
return self._config
[docs] def call(self, cmd, *args):
self.call_async(cmd, *args)
return self.wait()
[docs] def call_capture(self, cmd, *args):
cl = self._commandline(cmd, *args)
val = subprocess.check_output(cl)
return val.decode('utf-8')
[docs] def call_json(self, cmd, *args):
return json.loads(self.call_capture(cmd, '--output=json', *args))
[docs] def call_async(self, cmd, *args):
cl = self._commandline(cmd, *args)
proc = subprocess.Popen(cl)
self._processes.append((cl, proc))
return 0
[docs] def call_prefix(self, prefix, cmd, *args):
out_handler = BackgroundPopen.prefix_handler(prefix, sys.stdout)
err_handler = BackgroundPopen.prefix_handler('[ERR] ' + prefix, sys.stderr)
cl = self._commandline(cmd, *args)
proc = BackgroundPopen(out_handler, err_handler, cl)
self._processes.append((cl, proc))
return 0
[docs] def call_table_rows(self, row_handler, cmd, *args):
cl = self._commandline(cmd, *args)
proc = TableRowPopen(row_handler, cl)
self._processes.append((cl, proc))
return 0
[docs] def wait(self):
procs = self._processes
self._process = []
for cl, proc in procs:
self._check(cl, proc.wait())
return self.final_rc
[docs] def kill(self, signal=None):
procs = self._processes
self._process = []
for cl, proc in procs:
if signal:
proc.send_signal(signal)
else:
proc.kill()
proc.wait()
def _commandline(self, command, *args):
commandline = [self._kubectl]
if self._context:
commandline.extend(['--context', self._context])
commandline.append(command)
commandline.extend(args)
_logger.debug(' '.join(map(str, commandline)))
return commandline
def _check(self, cl, rc):
if rc != 0:
self.final_rc = rc
_logger.warn('%s => exit status: %d' % (' '.join(cl), rc))
return rc