dev-create-rest-client #7
3
.gitignore
vendored
3
.gitignore
vendored
@ -1,4 +1,5 @@
|
|||||||
*config.toml
|
*config.toml
|
||||||
*.json
|
*.json
|
||||||
__pycache__/
|
__pycache__/
|
||||||
*.log
|
*.log
|
||||||
|
*.token
|
48
inex.py
48
inex.py
@ -2,7 +2,7 @@ import pyodbc
|
|||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
import datetime
|
import datetime
|
||||||
from tomllib import load
|
import tomllib
|
||||||
from inexLogging import inexLog
|
from inexLogging import inexLog
|
||||||
import inexConnect
|
import inexConnect
|
||||||
from inexDataModel import dataTemplate
|
from inexDataModel import dataTemplate
|
||||||
@ -14,18 +14,21 @@ import requests
|
|||||||
class Inex:
|
class Inex:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
"""Initilize config, calls functions from inex-connect.py and inex-logging.py"""
|
"""Initilize config, calls functions from inex-connect.py and inex-logging.py"""
|
||||||
if os.path.exists('./config.toml'):
|
|
||||||
config_file_path = './config.toml'
|
|
||||||
with open(config_file_path, 'rb') as c:
|
|
||||||
self.config = load(c)
|
|
||||||
|
|
||||||
# assign libraries
|
# assign libraries
|
||||||
self.db = pyodbc
|
self.db = pyodbc
|
||||||
self.tm = datetime
|
self.tm = datetime
|
||||||
self.il = logging
|
self.il = logging
|
||||||
self.ic = inexConnect
|
self.ic = inexConnect
|
||||||
self.r = requests
|
self.r = requests
|
||||||
|
self.tl = tomllib
|
||||||
|
self.os = os
|
||||||
|
self.j = json
|
||||||
|
|
||||||
|
if self.os.path.exists('./config.toml'):
|
||||||
|
config_file_path = './config.toml'
|
||||||
|
with open(config_file_path, 'rb') as c:
|
||||||
|
self.config = self.tl.load(c)
|
||||||
|
|
||||||
# set config
|
# set config
|
||||||
self.dbDriver = self.config["database"]["driver"]
|
self.dbDriver = self.config["database"]["driver"]
|
||||||
self.dbServer = self.config["database"]["server"]
|
self.dbServer = self.config["database"]["server"]
|
||||||
@ -41,25 +44,34 @@ class Inex:
|
|||||||
self.productGUID = self.config["immutables"]["product_guid"]
|
self.productGUID = self.config["immutables"]["product_guid"]
|
||||||
self.productName = self.config["immutables"]["product_name"]
|
self.productName = self.config["immutables"]["product_name"]
|
||||||
self.productVersion = self.config["immutables"]["product_version"]
|
self.productVersion = self.config["immutables"]["product_version"]
|
||||||
|
self.tokenFilepath = self.config["output"]["token"]
|
||||||
|
self.selectedPlatform = self.config["fortraPlatform"]["selectedPlatform"]
|
||||||
|
|
||||||
|
if "dev" in self.selectedPlatform.lower():
|
||||||
|
self.platformConfig = self.config["fortraPlatform"]["dev"]
|
||||||
|
if "stag" in self.selectedPlatform.lower():
|
||||||
|
self.platformConfig = self.config["fortraPlatform"]["stage"]
|
||||||
|
if "prod" in self.selectedPlatform.lower():
|
||||||
|
self.platformConfig = self.config["fortraPlatform"]["prod"]
|
||||||
|
print(self.platformConfig)
|
||||||
|
|
||||||
#Setup logging
|
#Setup logging
|
||||||
inexLog(self)
|
inexLog(self)
|
||||||
|
|
||||||
# create the connection to the database
|
# create the connection to the database
|
||||||
self.cursor = self.ic.connectDatabase(self, self.db, self.dbDriver, self.dbServer, self.dbDatabase, self.dbUser, self.dbPassword)
|
# self.cursor = self.ic.connectDatabase(self, self.db, self.dbDriver, self.dbServer, self.dbDatabase, self.dbUser, self.dbPassword)
|
||||||
|
|
||||||
self.data = self.ic.databaseQuery(self, self.cursor, self.dbQuery)
|
# self.data = self.ic.databaseQuery(self, self.cursor, self.dbQuery)
|
||||||
|
|
||||||
self.modifiedData = processData(self.data, dataTemplate, prd_instance_id=self.prdInstanceID,\
|
# self.modifiedData = processData(self.data, dataTemplate, prd_instance_id=self.prdInstanceID,\
|
||||||
product_guid=self.productGUID,product_name=self.productName,product_version=self.productVersion)
|
# product_guid=self.productGUID,product_name=self.productName,product_version=self.productVersion)
|
||||||
|
|
||||||
|
# # TODO: move this to its own function
|
||||||
|
# if self.useLog:
|
||||||
|
# self.il.warning(f"Writing to '{self.outputFile}'.")
|
||||||
|
|
||||||
|
# with open(self.outputFile, "w") as f:
|
||||||
# TODO: move this to its own function
|
# json.dump(self.modifiedData, f, indent = 2, cls=Encoder)
|
||||||
if self.useLog:
|
|
||||||
self.il.warning(f"Writing to '{self.outputFile}'.")
|
|
||||||
|
|
||||||
with open(self.outputFile, "w") as f:
|
|
||||||
json.dump(self.modifiedData, f, indent = 2, cls=Encoder)
|
|
||||||
|
|
||||||
# TODO: Move this class to it's own file
|
# TODO: Move this class to it's own file
|
||||||
class Encoder(json.JSONEncoder):
|
class Encoder(json.JSONEncoder):
|
||||||
|
@ -39,15 +39,24 @@ def databaseQuery(self, cursor, query, args=()):
|
|||||||
cur.connection.close()
|
cur.connection.close()
|
||||||
if self.useLog:
|
if self.useLog:
|
||||||
self.il.debug(f"Database connection closed")
|
self.il.debug(f"Database connection closed")
|
||||||
# return (r[0] if r else None) if one else r
|
|
||||||
return r
|
return r
|
||||||
|
|
||||||
def getToken(reqObj,idpUrl, id, secret):
|
class fortraEFC:
|
||||||
getTokenResponse = reqObj.post(idpUrl, headers={"client_id": id,"client_secret": secret})
|
def __init__(self):
|
||||||
return getTokenResponse["access_token"]
|
if self.os.path.exists(self.tokenFilepath):
|
||||||
|
with open(self.tokenFilepath, 'rb') as t:
|
||||||
|
self.token = self.j.load(t)
|
||||||
|
print(self.token["access_token"])
|
||||||
|
|
||||||
def pushPayload(reqObj, host, token, tenant_id, payload):
|
def saveToken(self):
|
||||||
url = f'{host}/api/v1/unity/data/{tenant_id}/machine_event'
|
with open(self.tokenFilepath, "w") as f:
|
||||||
pushPayloadResponse = reqObj.post(url, headers={'Authorization': f'bearer {token}'},\
|
self.j.dump(self.tokenData, f, indent = 2)
|
||||||
payload=payload)
|
|
||||||
return pushPayloadResponse.status_code
|
def getToken(self):
|
||||||
|
self.tokenData = self.r.post(self.platformConfig["idp"], headers={"client_id": self.platformConfig["client_id"],"client_secret": self.platformConfig["secret"]})
|
||||||
|
|
||||||
|
def pushPayload(self):
|
||||||
|
url = f'{self.host}/api/v1/unity/data/{self.tenant_id}/machine_event'
|
||||||
|
pushPayloadResponse = self.r.post(self.platformConfig["efc_url"], headers={'Authorization': f'bearer {self.token["access_token"]}'},\
|
||||||
|
payload=self.modifiedData)
|
||||||
|
return pushPayloadResponse.status_code
|
Loading…
x
Reference in New Issue
Block a user