initial release #1

This commit is contained in:
jblu 2022-10-21 12:48:19 -05:00
parent d9ec7e5603
commit fa7f64a65b
3 changed files with 60 additions and 22 deletions

View File

@ -4,10 +4,30 @@ def build_cont_list(obj,hypercare_containers):
for i, c in enumerate(obj):
if c["State"].lower() != "running":
if c["Names"][0].lstrip("/") in hypercare_containers:
print(f'index: {i} container: {c["Names"][0].lstrip("/")} State: {c["State"]} ID: {c["Id"]}')
#print(f'index: {i} container: {c["Names"][0].lstrip("/")} State: {c["State"]} ID: {c["Id"]}')
cont_list.append(c)
print(len(cont_list))
return cont_list
def process_cont_list(full_cont_list, cont_fn):
pass
def build_full_cont_list(obj, hypercare_containers):
cont_full_list = []
for i, c in enumerate(obj):
if c["Names"][0].lstrip("/") in hypercare_containers:
#print(f'index: {i} container: {c["Names"][0].lstrip("/")} State: {c["State"]} ID: {c["Id"]}')
cont_full_list.append(c)
return cont_full_list
def process_cont_list(full_cont_list, start_cont_fn, req_obj, host, port, jwt, endpoint):
if full_cont_list:
for container in full_cont_list:
start_cont_fn(req_obj, host, port, jwt, endpoint, container["Id"])
return True
def process_cont_status(obj):
if not obj:
return -1 # Can't find the containter
for c in enumerate(obj):
# print(c[1]['Names'][0].lstrip("/"))
if 'running' in c[1]["State"].lower():
return 0
if c[1]["State"] != 'running':
return 1

View File

@ -1,7 +1,8 @@
import pushover
from json import load
from cclient import c_auth, c_get_containers, c_start_container
from cclient import *
from clogging import *
from cprocess import *
import time
import datetime
import logging
@ -45,8 +46,12 @@ class Crn:
self.tl.info('Authenticating.')
self.jwt = c_auth(self.cc, self.host, self.port, self.username, self.password)
self.tl.info('Authenticated successfully.')
self.cont_obj = c_get_containers(self.cc, self.host, self.port, self.jwt)
self.tl.info('Collected container list.')
self.cont_obj = c_get_containers(self.cc, self.host, self.port, self.jwt, self.endpoint)
self.tl.info('Collected container data.')
self.cont_list = build_cont_list(self.cont_obj, self.observed_containers)
self.tl.info('Building container list.')
self.process_cont_list_response = process_cont_list(self.cont_list, c_start_container, self.cc, self.host, self.port, self.jwt, self.endpoint)
except requests.exceptions.RequestException as e:
self.tl.exception(e)
self.po.send_message(e, title="crane API ERROR")

View File

@ -2,7 +2,8 @@ import unittest
import requests
from json import load
from cclient import c_auth, c_get_containers, c_start_container, c_stop_container
from cprocess import build_cont_list
from cprocess import build_cont_list, process_cont_list, build_full_cont_list, process_cont_status
unittest.TestLoader.sortTestMethodsUsing = None
class TestCrane(unittest.TestCase):
def setUp(self):
@ -13,35 +14,47 @@ class TestCrane(unittest.TestCase):
self.username = self.config["username"]
self.password = self.config["password"]
self.endpoint = self.config["endpoint"]
self.cid = '06ffaed8153495db19b35f5982e0f9d4cfea0da7b803f825a6686a4017af8d6e'
self.cid = 'aa5b217ca6217fd9d268396039da69ea9e4a5aff381b3dceb71edb5a1f4d429d'
self.req_obj = requests
self.hypercare_containers = ['hello-world']
def test_c_auth(self):
self.jwt = c_auth(self.req_obj, self.host, self.port, self.username, self.password)
self.cont_obj = c_get_containers(self.req_obj, self.host, self.port, self.jwt, self.endpoint)
def test_c_auth(self):
self.assertTrue(self.jwt, "No JWT returned by cauth.")
def test_c_get_containers(self):
self.jwt = c_auth(self.req_obj, self.host, self.port, self.username, self.password)
self.cont_obj = c_get_containers(self.req_obj, self.host, self.port, self.jwt, self.endpoint)
self.assertTrue(self.cont_obj, "No cont object returned by c_get_containers.")
def test_a_is_hypercare_container_status(self):
self.cont_full_list = build_full_cont_list(self.cont_obj, self.hypercare_containers)
self.process_cont_status_response = process_cont_status(self.cont_full_list)
if self.process_cont_status_response == 0:
c_stop_container(self.req_obj, self.host, self.port, self.jwt, self.endpoint, self.cid)
def test_build_cont_list(self):
self.jwt = c_auth(self.req_obj, self.host, self.port, self.username, self.password)
self.cont_obj = c_get_containers(self.req_obj, self.host, self.port, self.jwt, self.endpoint)
self.cont_list = build_cont_list(self.cont_obj, self.hypercare_containers)
self.assertTrue(self.cont_list, "No cont_list returned by build_cont_list.")
self.assertTrue(self.cont_list, "No cont_list returned by build_cont_list. Does the test container exist?")
def test_c_start_container(self):
self.jwt = c_auth(self.req_obj, self.host, self.port, self.username, self.password)
self.c_start_container_response = c_start_container(self.req_obj, self.host, self.port, self.jwt, self.endpoint, self.cid)
print(self.c_start_container_response)
# print(self.c_start_container_response)
self.assertTrue(self.c_start_container_response, "No c_start_container_resonse returned by c_start_container.")
# 204 success 304 already on
def test_c_stop_container(self):
self.jwt = c_auth(self.req_obj, self.host, self.port, self.username, self.password)
self.c_stop_container_response = c_stop_container(self.req_obj, self.host, self.port, self.jwt, self.endpoint, self.cid)
print(self.c_stop_container_response)
# print(self.c_stop_container_response)
self.assertTrue(self.c_stop_container_response, "No c_start_container_resonse returned by c_start_container.")
def test_process_cont_list(self):
self.cont_list = build_cont_list(self.cont_obj, self.hypercare_containers)
self.process_cont_list_response = process_cont_list(self.cont_list, c_start_container, self.req_obj, self.host, self.port, self.jwt, self.endpoint)
self.assertTrue(self.process_cont_list_response, "No c_start_container_resonse returned by c_start_container.")
def test_z_tear_down(self):
c_stop_container(self.req_obj, self.host, self.port, self.jwt, self.endpoint, self.cid)
if __name__ == '__main__':
unittest.main()