Compare commits

...

3 Commits

Author SHA1 Message Date
eca3e3920f Merge pull request 'dev-update-testing' (#24) from dev-update-testing into main
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: https://git.jbranan.com/jblu/crane/pulls/24
2023-06-27 12:03:05 -05:00
bc325787a3 updated testing for ci
Some checks failed
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is failing
2023-06-27 11:59:19 -05:00
5d6a77c108 updated testing
All checks were successful
continuous-integration/drone/push Build is passing
2023-06-25 23:46:10 -05:00
4 changed files with 66 additions and 17 deletions

View File

@ -3,7 +3,6 @@ LICENSE
*.log
README.md
requirements.txt
test_*
Dockerfile
*docker-test*
*.log

View File

@ -1,6 +1,5 @@
kind: pipeline
name: default
steps:
- name: docker
image: plugins/docker
@ -30,3 +29,37 @@ steps:
when:
branch:
- dev*
- name: test-main
image: git.jbranan.com/jblu/crane:latest
environment:
CRANE_HOST:
from_secret: CRANE_HOST
CRANE_PORT:
from_secret: CRANE_PORT
CRANE_ENDPOINT:
from_secret: CRANE_ENDPOINT
commands:
- echo $CRANE_HOST
- echo $CRANE_PORT
- echo $CRANE_ENDPOINT
- python test_crane.py
when:
branch:
- main
- name: test-dev
image: git.jbranan.com/jblu/crane:dev
environment:
CRANE_HOST:
from_secret: CRANE_HOST
CRANE_PORT:
from_secret: CRANE_PORT
CRANE_ENDPOINT:
from_secret: CRANE_ENDPOINT
commands:
- echo $CRANE_HOST
- echo $CRANE_PORT
- echo $CRANE_ENDPOINT
- python test_crane.py
when:
branch:
- dev*

View File

@ -9,6 +9,15 @@ def c_get_filtered_containers(req_obj, j_obj, host, port, access_token, endpoint
c_get_containers_response = req_obj.get(url, headers={"X-API-Key": access_token}, verify=False)
return c_get_containers_response.json()
def c_get_container_id(req_obj, j_obj, host, port, access_token, endpoint, containers, statuses):
statuses.append("running")
filter_string = j_obj.dumps({"name":containers,"status":statuses})
url = f'https://{host}:{port}/api/endpoints/{endpoint}/docker/containers/json?filters={filter_string}'
c_get_container_id_response = req_obj.get(url, headers={"X-API-Key": access_token}, verify=False)
id = c_get_container_id_response.json()
id = id[0]["Id"]
return id
def c_start_container(req_obj, host, port, access_token, endpoint, cid):
url = f'https://{host}:{port}/api/endpoints/{endpoint}/docker/containers/{cid}/start'
c_start_container_response = req_obj.post(url, headers={"X-API-Key": access_token},verify=False)

View File

@ -2,45 +2,53 @@ import unittest
import requests
import json
from tomllib import load
from cclient import c_get_filtered_containers, c_start_container, c_stop_container
from cclient import c_get_filtered_containers, c_start_container, c_stop_container, c_get_container_id
from cprocess import process_cont_list
# unittest.TestLoader.sortTestMethodsUsing = None
import os
class TestCrane(unittest.TestCase):
def setUp(self):
with open('./config.toml', 'rb') as c:
self.config = load(c)
self.host = self.config["portainer"]["host"]
self.port = self.config["portainer"]["port"]
self.access_token = "ptr_ufS1nADXmrU3QSN3bvITLMQ7oOH9yo3ECb/QNwtIYJ4="
self.endpoint = self.config["portainer"]["endpoint"]
self.cid = 'ef8fee86e02b2b82acbddf6f0da1ff023f60bfe52c0b4087cac29c1686ccbac4'
self.req_obj = requests
self.j_obj = json
self.hypercare_containers = ['hello-world']
self.req_obj.packages.urllib3.disable_warnings()
config_file_path = './docker/config.toml'
if os.getenv("toml_path", os.path.exists(config_file_path)):
with open(config_file_path, 'rb') as c:
self.config = load(c)
self.host = self.config["portainer"]["host"]
self.port = self.config["portainer"]["port"]
self.endpoint = self.config["portainer"]["endpoint"]
else:
self.host = os.getenv("CRANE_HOST", "192.168.4.11")
self.port = os.getenv("CRANE_PORT", 9443)
self.endpoint = os.getenv("CRANE_ENDPOINT", 1)
self.access_token = "ptr_ufS1nADXmrU3QSN3bvITLMQ7oOH9yo3ECb/QNwtIYJ4="
self.hypercare_containers = ['hottub']
self.status_filters = ["paused","dead","created","exited","removing","restarting","created"]
self.cont_obj = c_get_filtered_containers(self.req_obj,self.j_obj, self.host, self.port, self.access_token, self.endpoint,self.hypercare_containers,self.status_filters)
def test_c_get_filtered_containers(self):
self.cid = c_get_container_id(self.req_obj,self.j_obj, self.host, self.port, self.access_token, self.endpoint, self.hypercare_containers,self.status_filters)
self.c_stop_container_response = c_stop_container(self.req_obj, self.host, self.port, self.access_token, self.endpoint, self.cid)
self.cont_obj = c_get_filtered_containers(self.req_obj,self.j_obj, self.host, self.port, self.access_token, self.endpoint,self.hypercare_containers,self.status_filters)
self.assertTrue(self.cont_obj, "No cont object returned by c_get_filtered_containers.")
def test_c_start_container(self):
self.cid = c_get_container_id(self.req_obj,self.j_obj, self.host, self.port, self.access_token, self.endpoint, self.hypercare_containers,self.status_filters)
c_stop_container(self.req_obj, self.host, self.port, self.access_token, self.endpoint, self.cid)
self.c_start_container_response = c_start_container(self.req_obj, self.host, self.port, self.access_token, self.endpoint, self.cid)
# print(self.c_start_container_response)
self.assertTrue(self.c_start_container_response, "No c_start_container_resonse returned by c_start_container.")
# 204 success 304 already on
def test_c_process_cont_list(self):
self.cont_obj = c_get_filtered_containers(self.req_obj,self.j_obj, self.host, self.port, self.access_token, self.endpoint,self.hypercare_containers,self.status_filters)
self.assertTrue(process_cont_list(self.cont_obj, c_start_container, self.req_obj, self.host, self.port, self.access_token, self.endpoint))
def test_c_stop_container(self):
self.cid = c_get_container_id(self.req_obj,self.j_obj, self.host, self.port, self.access_token, self.endpoint, self.hypercare_containers,self.status_filters)
self.c_stop_container_response = c_stop_container(self.req_obj, self.host, self.port, self.access_token, self.endpoint, self.cid)
# print(self.c_stop_container_response)
self.assertTrue(self.c_stop_container_response, "No c_start_container_resonse returned by c_start_container.")
def test_z_tear_down(self):
c_stop_container(self.req_obj, self.host, self.port, self.access_token, self.endpoint, self.cid)
if __name__ == '__main__':
unittest.main()