standardized the variables #19

This commit is contained in:
jblu 2022-09-03 15:02:40 -05:00
parent 6b63bd9626
commit 4317a33647
7 changed files with 95 additions and 95 deletions

View File

@ -37,10 +37,10 @@ You will need a config.json in the root directory.
| port | number, port of admin gui(used for api aswell) |
| username | admin account for qbittorrent |
| password | password for admin account |
| loglevel | is what log messages are written to the log file. INFO or DEBUG are valid entries(case sensitive) |
| log_level | is what log messages are written to the log file. INFO or DEBUG are valid entries(case sensitive) |
| protected_tag | used to mark torrents to handle with care |
| non_protected_tag | we don't care about these torrents |
| logpath | will write a log in root directory if left as is other wise specify other path using forward slashes |
| log_path | will write a log in root directory if left as is other wise specify other path using forward slashes |
| age | number, seconds for how long we keep torrents from IPTORRENTS |
| minimum_age | age in seconds torrents should reached before they are removed |
| use_pushover | true or false to enable or disable pushover notification summary |

View File

@ -3,8 +3,8 @@
"port": 8080,
"username": "admin",
"password": "admin",
"loglevel": "INFO",
"logpath": "./qc.log",
"log_level": "INFO",
"log_path": "./qc.log",
"protected_tag": "ipt",
"non_protected_tag": "public",
"age": 2419200,

View File

@ -39,8 +39,8 @@ class Qbt:
self.use_log = self.config["use_log"]
self.po_key = self.config["po_key"]
self.po_token = self.config["po_token"]
self.logpath = self.config["logpath"]
self.loglevel = self.config["loglevel"]
self.log_path = self.config["log_path"]
self.log_level = self.config["log_level"]
self.tracker_protected_tag = self.config["protected_tag"]
self.tracker_non_protected_tag = self.config["non_protected_tag"]
self.minimum_age = self.config["minimum_age"]
@ -48,8 +48,8 @@ class Qbt:
self.enable_dragnet = self.config["enable_dragnet"]
self.dragnet_outfile = self.config["dragnet_outfile"]
# Calling log and notify functions
torlog(self)
tornotify(self)
tor_log(self)
tor_notify(self)
self.t = time
# Pulling domain names to treat carefully
f = open('./ignored_domains.json')
@ -70,24 +70,24 @@ class Qbt:
self.tl.exception(e)
self.po.send_message(e, title="qbit-maid API ERROR")
# Pulling all torrent data
self.torrentlist = self.qbt_client.torrents_info()
self.torrent_list = self.qbt_client.torrents_info()
#Main process block
if self.use_log:
listqbitapiinfo(self)
listfirsttor(self)
buildtorlist(self)
processcounts(self)
list_qbit_api_info(self)
list_first_tor(self)
build_tor_list(self)
process_counts(self)
if self.use_log:
torrentcount(self)
torprocessor(self)
torrent_count(self)
tor_processor(self)
if self.use_log:
printprocessor(self)
print_processor(self)
if self.config["delete_torrents"]:
tordelete(self)
tor_delete(self)
self.et = datetime.datetime.now()
getscriptruntime(self)
get_script_runtime(self)
if self.use_pushover:
tornotifysummary(self)
tor_notify_summary(self)
# Run
if __name__== "__main__":
Qbt()

View File

@ -1,4 +1,4 @@
def buildtorlist(self):
def build_tor_list(self):
"""Builds multiple lists of torrents to be sorted. Also adds tags to the torents.
There are more effecient ways of doing things but I did this rather quickly.
V2 will certainly be more performant. The reason two lists were used was so that torrents
@ -9,65 +9,65 @@ def buildtorlist(self):
torrent = self.torrentlist.pop()
if self.use_log:
self.tl.debug(f'["{torrent["name"][0:20]}..."] {torrent["infohash_v1"]}')
if isignoredtag(self.ignored_tags.values(),torrent['tags']):
if is_ignored_tag(self.ignored_tags.values(),torrent['tags']):
self.ignored_counter += 1
continue
# if torrent['added_on'] + self.minimum_age >= self.t.time():
if ispreme(torrent['added_on'], self.minimum_age, self.t.time()):
if is_preme(torrent['added_on'], self.minimum_age, self.t.time()):
self.preme_tor_counter += 1
continue
# if torrent['category'] in self.cat_whitelist.values():
if iscatignored(torrent['category'], self.cat_whitelist.values()):
if is_cat_ignored(torrent['category'], self.cat_whitelist.values()):
self.tl.info(f'Ignored torrent:["{torrent["name"][0:20]}..."]')
self.ignored_counter += 1
continue
# if torrent['tracker'] == '':
if istrackerblank(torrent['tracker']):
if is_tracker_blank(torrent['tracker']):
if self.use_log:
self.tl.warning(f'Torrent doesn\'t have a tracker ["{torrent["name"][0:20]}..."] [{torrent["tracker"]}]hash: {torrent["hash"]}')
self.ignored_counter += 1
continue
# if torrent['tracker'].split('/')[2] in self.tracker_whitelist.values():
if isprotectedtracker(torrent['tracker'], self.tracker_whitelist.values()):
if is_protected_tracker(torrent['tracker'], self.tracker_whitelist.values()):
if self.use_log:
self.tl.debug(f'Protected torrent: {torrent["tracker"]}hash: {torrent["hash"]}')
# if torrent['tags'] == '':
if istagblank(torrent['tags']):
if is_tag_blank(torrent['tags']):
self.qbt_client.torrents_add_tags(self.tracker_protected_tag,torrent['hash'])
self.tracker_list.append(torrent)
if isnotprotectedtracker(torrent['tracker'], self.tracker_whitelist.values()):
if is_not_protected_tracker(torrent['tracker'], self.tracker_whitelist.values()):
if self.use_log:
self.tl.debug(f'Non-protected torrent: {torrent["tracker"]}hash: {torrent["hash"]}')
# if torrent['tags'] == '':
if istagblank(torrent['tags']):
if is_tag_blank(torrent['tags']):
self.qbt_client.torrents_add_tags(self.tracker_non_protected_tag,torrent['hash'])
self.tracker_list.append(torrent)
def ispreme(added, minage, time):
def is_preme(added, minage, time):
if added + minage >= time:
return True
def iscatignored(cat, catlist):
def is_cat_ignored(cat, catlist):
if cat in catlist:
return True
def istrackerblank(tracker):
def is_tracker_blank(tracker):
if tracker == '':
return True
def isprotectedtracker(tracker, trackerlist):
def is_protected_tracker(tracker, trackerlist):
if tracker.split('/')[2] in trackerlist:
return True
def isnotprotectedtracker(tracker, trackerlist):
def is_not_protected_tracker(tracker, trackerlist):
if tracker.split('/')[2] not in trackerlist:
return True
def istagblank(tag):
def is_tag_blank(tag):
if tag == '':
return True
def isignoredtag(igtags, tortags):
def is_ignored_tag(igtags, tortags):
for igt in igtags:
if igt in tortags:
return True

View File

@ -1,4 +1,4 @@
def torlog(self):
def tor_log(self):
"""Setting up the log file, if self.use_log is set to true and self.loglevel is DEBUG OR INFO"""
if self.use_log:
if self.loglevel == 'DEBUG':
@ -6,7 +6,7 @@ def torlog(self):
elif self.loglevel == 'INFO':
self.tl.basicConfig(filename=self.logpath, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.INFO)
def tornotify(self):
def tor_notify(self):
"""Seting up to use pushover, if self.use_pushover is set to true and
if valid self.po_key and self.po_token is provided in the config file"""
if self.use_pushover:
@ -16,12 +16,12 @@ def tornotifytest(self):
"""Used to make sure tornotify is working and messages are getting to the client"""
self.poc.send_message("Test Message", title="qbit-maid")
def processcounts(self):
def process_counts(self):
self.c = self.ct()
for item in self.tracker_list:
self.c[item["tags"]] += 1
def printprocessor(self):
def print_processor(self):
"""Print summary of torrents"""
self.tl.info(f'Total: {self.total_torrents}')
self.tl.info(f'Premature: {self.preme_tor_counter}')
@ -31,7 +31,7 @@ def printprocessor(self):
self.tl.info(f'Orphaned: {self.up_tor_counter}')
self.tl.info(f'Marked for deletion: {len(self.torrent_hash_delete_list)}')
def tornotifysummary(self):
def tor_notify_summary(self):
"""Main notification method when the app is used in an automated fashion"""
self.poc.send_message(f" Total: {self.total_torrents}\n\
Premature: {self.preme_tor_counter}\n\
@ -53,7 +53,7 @@ def writetor(self, filepath='./torrentinfo.json'):
"""
pass
def listfirsttor(self, index=0):
def list_first_tor(self, index=0):
"""Only lists the first torrent"""
self.tl.debug('First torrent in the list:')
torrent = self.torrentlist[index]
@ -61,12 +61,12 @@ def listfirsttor(self, index=0):
self.tl.debug(f'{k}: {v}')
self.tl.debug('\n')
def listqbitapiinfo(self):
def list_qbit_api_info(self):
"""Writes torrent info to log file"""
self.tl.debug(f'qBittorrent: {self.qbt_client.app.version}')
self.tl.debug(f'qBittorrent Web API: {self.qbt_client.app.web_api_version}')
def torrentcount(self):
def torrent_count(self):
"""write torrent counts to log file"""
self.tl.debug(f'torrents that are protected {self.tracker_list.count("ipt")}')
self.tl.debug(f'torrents that aren\'t protected {self.tracker_list.count("public")}')
@ -80,7 +80,7 @@ def debugpremecal(self):
print(torrent["name"][0:20])
print(torrent['added_on'] + self.minimum_age >= self.t.time())
def getscriptruntime(self):
def get_script_runtime(self):
elapsed_time = self.et - self.st
if self.use_log:
self.tl.info(f'Execution time: [{elapsed_time}]')

View File

@ -1,36 +1,36 @@
from cgitb import enable
def torprocessor(self):
def tor_processor(self):
"""Main logic to sort through both self.tracker_nonprotected_list and self.tracker_protected_list
If torrent meets criteria for deletion, its infohash_v1 will be appended to self.torrent_hash_delete_list
"""
for canidate in self.tracker_list:
# if canidate['state'] == 'downloading':
if isdownloading(canidate['state']):
if is_downloading(canidate['state']):
if self.use_log:
self.tl.info(f'["{canidate["name"][0:20]}..."] is still downloading and will be skipped.')
continue
# elif canidate['ratio'] < float(1.05) and self.tracker_protected_tag in canidate["tags"]:
elif isprotectedunderratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
elif is_protected_under_ratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] is below a 1.05 ratio({canidate["ratio"]})')
# if canidate['added_on'] + self.age <= self.t.time():
if isoldtor(canidate['added_on'], self.age, self.t.time()):
if is_old_tor(canidate['added_on'], self.age, self.t.time()):
if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] Seconds old: {self.t.time() - self.age - canidate["added_on"]}')
self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion from the protected list.')
# elif canidate['ratio'] >= float(1.05) and self.tracker_protected_tag in canidate["tags"]:
elif isprotectedoverratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
elif is_protected_over_ratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] is above a 1.05 ratio({canidate["ratio"]}).')
self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion from the protected list.')
# elif self.tracker_non_protected_tag in canidate["tags"]:
elif isnonprotectedtor(self.tracker_non_protected_tag, canidate["tags"]):
elif is_not_protected_tor(self.tracker_non_protected_tag, canidate["tags"]):
self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion.')
@ -41,11 +41,11 @@ def torprocessor(self):
self.up_tor_counter += 1
continue
def tordeletetags(self):
def tor_delete_tags(self):
tag_list = [self.tracker_protected_tag, self.tracker_non_protected_tag]
self.qbt_client.torrents_delete_tags(tag_list)
def tordelete(self):
def tor_delete(self):
"""Remove torrents, will also delete files, this keeps the filesystem clean.
Only pass self.torrent_hash_delete_list if you would like to keep the files."""
if self.use_log:
@ -53,23 +53,23 @@ def tordelete(self):
self.tl.debug(self.torrent_hash_delete_list)
self.qbt_client.torrents_delete(True, self.torrent_hash_delete_list)
def isdownloading(state):
def is_downloading(state):
if state == 'downloading':
return True
def isprotectedunderratio(torratio, setratio, settag, tortag):
def is_protected_under_ratio(torratio, setratio, settag, tortag):
if torratio < float(setratio) and settag in tortag:
return True
def isoldtor(toradd, setage, currenttime):
def is_old_tor(toradd, setage, currenttime):
if toradd + setage <= currenttime:
return True
def isprotectedoverratio(torratio, setratio, settag, tortag):
def is_protected_over_ratio(torratio, setratio, settag, tortag):
if torratio >= float(setratio) and settag in tortag:
return True
def isnonprotectedtor(setnonprotectedtag, tortags):
def is_not_protected_tor(setnonprotectedtag, tortags):
if setnonprotectedtag in tortags:
return True

View File

@ -1,97 +1,97 @@
import unittest
from qlist import ispreme,iscatignored,istrackerblank,isprotectedtracker,isnotprotectedtracker,istagblank,isignoredtag
from qprocess import isdownloading,isprotectedunderratio,isoldtor,isprotectedoverratio,isnonprotectedtor
from qlist import is_preme,is_cat_ignored,is_tracker_blank,is_protected_tracker,is_not_protected_tracker,is_tag_blank,is_ignored_tag
from qprocess import is_downloading,is_protected_under_ratio,is_old_tor,is_protected_over_ratio,is_not_protected_tor
class TestQbitmaid(unittest.TestCase):
def test_ispreme_sanity(self):
self.assertTrue(ispreme(1,1,1))
self.assertFalse(ispreme(1,1,3))
self.assertTrue(is_preme(1,1,1))
self.assertFalse(is_preme(1,1,3))
def test_ispreme(self):
pass
def test_iscatignored_sanity(self):
self.assertTrue(iscatignored('a', ['a','b','c']))
self.assertTrue(iscatignored('b', ['a','b','c']))
self.assertTrue(iscatignored('c', ['a','b','c']))
self.assertFalse(iscatignored('d', ['a','b','c']))
self.assertFalse(iscatignored(1, ['a','b','c']))
self.assertFalse(iscatignored(1.0000000, ['a','b','c']))
self.assertTrue(is_cat_ignored('a', ['a','b','c']))
self.assertTrue(is_cat_ignored('b', ['a','b','c']))
self.assertTrue(is_cat_ignored('c', ['a','b','c']))
self.assertFalse(is_cat_ignored('d', ['a','b','c']))
self.assertFalse(is_cat_ignored(1, ['a','b','c']))
self.assertFalse(is_cat_ignored(1.0000000, ['a','b','c']))
def test_iscatignored(self):
pass
def test_istrackerblank_sanity(self):
self.assertTrue(istrackerblank(''))
self.assertFalse(istrackerblank('a'))
self.assertFalse(istrackerblank(1))
self.assertFalse(istrackerblank(1.00000000))
self.assertTrue(is_tracker_blank(''))
self.assertFalse(is_tracker_blank('a'))
self.assertFalse(is_tracker_blank(1))
self.assertFalse(is_tracker_blank(1.00000000))
def test_istrackerblank(self):
pass
def test_isprotectedtracker_sanity(self):
self.assertTrue(isprotectedtracker('https://a.com/',['a.com','b.me','c.io']))
self.assertFalse(isprotectedtracker('https://google.com/',['a.com','b.me','c.io']))
self.assertFalse(isprotectedtracker('https://d.com',['a.com','b.me','c.io']))
self.assertTrue(is_protected_tracker('https://a.com/',['a.com','b.me','c.io']))
self.assertFalse(is_protected_tracker('https://google.com/',['a.com','b.me','c.io']))
self.assertFalse(is_protected_tracker('https://d.com',['a.com','b.me','c.io']))
def test_isprotectedtracker(self):
pass
def test_isnotprotectedtracker_sanity(self):
self.assertFalse(isnotprotectedtracker('https://a.com/',['a.com','b.me','c.io']))
self.assertTrue(isnotprotectedtracker('https://google.com/',['a.com','b.me','c.io']))
self.assertTrue(isnotprotectedtracker('https://d.com',['a.com','b.me','c.io']))
self.assertFalse(is_not_protected_tracker('https://a.com/',['a.com','b.me','c.io']))
self.assertTrue(is_not_protected_tracker('https://google.com/',['a.com','b.me','c.io']))
self.assertTrue(is_not_protected_tracker('https://d.com',['a.com','b.me','c.io']))
def test_isnotprotectedtracker(self):
pass
def test_istagblank(self):
self.assertTrue(istagblank(''))
self.assertFalse(istagblank('a'))
self.assertFalse(istagblank(1))
self.assertFalse(istagblank(1.0001))
self.assertFalse(istagblank(False))
self.assertFalse(istagblank(True))
self.assertTrue(is_tag_blank(''))
self.assertFalse(is_tag_blank('a'))
self.assertFalse(is_tag_blank(1))
self.assertFalse(is_tag_blank(1.0001))
self.assertFalse(is_tag_blank(False))
self.assertFalse(is_tag_blank(True))
def test_isdownloading_sanity(self):
self.assertTrue(isdownloading('downloading'))
self.assertTrue(is_downloading('downloading'))
def test_isdownloading(self):
self.assertFalse(isdownloading('DOWNLOADING'))
self.assertFalse(isdownloading('dOwNlOaDiNg'))
self.assertFalse(is_downloading('DOWNLOADING'))
self.assertFalse(is_downloading('dOwNlOaDiNg'))
def test_isprotectedunderratio_sanity(self):
self.assertTrue(isprotectedunderratio(0.5,1,'a','a,b,c'))
self.assertTrue(is_protected_under_ratio(0.5,1,'a','a,b,c'))
def test_isprotectedunderratio(self):
pass
def test_isoldtor_sanity(self):
self.assertTrue(isoldtor(1,2,4))
self.assertTrue(is_old_tor(1,2,4))
def test_isoldtor(self):
self.assertFalse(isoldtor(1661150664,2419200,1662049004.2101078))
self.assertFalse(isoldtor(1661150664,2419200,1662049004))
self.assertFalse(isoldtor(1661150664.000000,2419200.0000000,1662049004.2101078))
self.assertFalse(is_old_tor(1661150664,2419200,1662049004.2101078))
self.assertFalse(is_old_tor(1661150664,2419200,1662049004))
self.assertFalse(is_old_tor(1661150664.000000,2419200.0000000,1662049004.2101078))
def test_isprotectedoverratio_sanity(self):
self.assertTrue(isprotectedoverratio(2,1,'a','a,b,c'))
self.assertTrue(is_protected_over_ratio(2,1,'a','a,b,c'))
def test_isprotectedoverratio(self):
pass
def test_isnonprotectedtor_sanity(self):
self.assertTrue(isnonprotectedtor('a','a,b,c'))
self.assertTrue(is_not_protected_tor('a','a,b,c'))
def test_isnonprotectedtor(self):
pass
def test_isignoredtag_sanity(self):
self.assertTrue(isignoredtag(['a','b','c'], 'first,second,third,a'))
self.assertTrue(is_ignored_tag(['a','b','c'], 'first,second,third,a'))
def test_isignoredtag_sanity(self):
self.assertTrue(isignoredtag(['a','b','c'], 'first,second,third,a'))
self.assertTrue(is_ignored_tag(['a','b','c'], 'first,second,third,a'))
# def test__sanity(self):
# pass