Merge pull request #20 from jonbranan/standardize_variables

Standardize variables
This commit is contained in:
jonbranan 2022-09-03 19:05:40 -05:00 committed by GitHub
commit 2e896aff4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 104 additions and 104 deletions

View File

@ -37,10 +37,10 @@ You will need a config.json in the root directory.
| port | number, port of admin gui(used for api aswell) |
| username | admin account for qbittorrent |
| password | password for admin account |
| loglevel | is what log messages are written to the log file. INFO or DEBUG are valid entries(case sensitive) |
| log_level | is what log messages are written to the log file. INFO or DEBUG are valid entries(case sensitive) |
| protected_tag | used to mark torrents to handle with care |
| non_protected_tag | we don't care about these torrents |
| logpath | will write a log in root directory if left as is other wise specify other path using forward slashes |
| log_path | will write a log in root directory if left as is other wise specify other path using forward slashes |
| age | number, seconds for how long we keep torrents from IPTORRENTS |
| minimum_age | age in seconds torrents should reached before they are removed |
| use_pushover | true or false to enable or disable pushover notification summary |

View File

@ -3,8 +3,8 @@
"port": 8080,
"username": "admin",
"password": "admin",
"loglevel": "INFO",
"logpath": "./qc.log",
"log_level": "INFO",
"log_path": "./qc.log",
"protected_tag": "ipt",
"non_protected_tag": "public",
"age": 2419200,

View File

@ -39,8 +39,8 @@ class Qbt:
self.use_log = self.config["use_log"]
self.po_key = self.config["po_key"]
self.po_token = self.config["po_token"]
self.logpath = self.config["logpath"]
self.loglevel = self.config["loglevel"]
self.log_path = self.config["log_path"]
self.log_level = self.config["log_level"]
self.tracker_protected_tag = self.config["protected_tag"]
self.tracker_non_protected_tag = self.config["non_protected_tag"]
self.minimum_age = self.config["minimum_age"]
@ -48,8 +48,8 @@ class Qbt:
self.enable_dragnet = self.config["enable_dragnet"]
self.dragnet_outfile = self.config["dragnet_outfile"]
# Calling log and notify functions
torlog(self)
tornotify(self)
tor_log(self)
tor_notify(self)
self.t = time
# Pulling domain names to treat carefully
f = open('./ignored_domains.json')
@ -70,24 +70,24 @@ class Qbt:
self.tl.exception(e)
self.po.send_message(e, title="qbit-maid API ERROR")
# Pulling all torrent data
self.torrentlist = self.qbt_client.torrents_info()
self.torrent_list = self.qbt_client.torrents_info()
#Main process block
if self.use_log:
listqbitapiinfo(self)
listfirsttor(self)
buildtorlist(self)
processcounts(self)
list_qbit_api_info(self)
list_first_tor(self)
build_tor_list(self)
process_counts(self)
if self.use_log:
torrentcount(self)
torprocessor(self)
torrent_count(self)
tor_processor(self)
if self.use_log:
printprocessor(self)
print_processor(self)
if self.config["delete_torrents"]:
tordelete(self)
tor_delete(self)
self.et = datetime.datetime.now()
getscriptruntime(self)
get_script_runtime(self)
if self.use_pushover:
tornotifysummary(self)
tor_notify_summary(self)
# Run
if __name__== "__main__":
Qbt()

View File

@ -1,73 +1,73 @@
def buildtorlist(self):
def build_tor_list(self):
"""Builds multiple lists of torrents to be sorted. Also adds tags to the torents.
There are more effecient ways of doing things but I did this rather quickly.
V2 will certainly be more performant. The reason two lists were used was so that torrents
that are in public trackers woudln't be around as long as torrents from a private tracker.
"""
self.total_torrents = len(self.torrentlist)
while self.torrentlist:
torrent = self.torrentlist.pop()
self.total_torrents = len(self.torrent_list)
while self.torrent_list:
torrent = self.torrent_list.pop()
if self.use_log:
self.tl.debug(f'["{torrent["name"][0:20]}..."] {torrent["infohash_v1"]}')
if isignoredtag(self.ignored_tags.values(),torrent['tags']):
if is_ignored_tag(self.ignored_tags.values(),torrent['tags']):
self.ignored_counter += 1
continue
# if torrent['added_on'] + self.minimum_age >= self.t.time():
if ispreme(torrent['added_on'], self.minimum_age, self.t.time()):
if is_preme(torrent['added_on'], self.minimum_age, self.t.time()):
self.preme_tor_counter += 1
continue
# if torrent['category'] in self.cat_whitelist.values():
if iscatignored(torrent['category'], self.cat_whitelist.values()):
if is_cat_ignored(torrent['category'], self.cat_whitelist.values()):
self.tl.info(f'Ignored torrent:["{torrent["name"][0:20]}..."]')
self.ignored_counter += 1
continue
# if torrent['tracker'] == '':
if istrackerblank(torrent['tracker']):
if is_tracker_blank(torrent['tracker']):
if self.use_log:
self.tl.warning(f'Torrent doesn\'t have a tracker ["{torrent["name"][0:20]}..."] [{torrent["tracker"]}]hash: {torrent["hash"]}')
self.ignored_counter += 1
continue
# if torrent['tracker'].split('/')[2] in self.tracker_whitelist.values():
if isprotectedtracker(torrent['tracker'], self.tracker_whitelist.values()):
if is_protected_tracker(torrent['tracker'], self.tracker_whitelist.values()):
if self.use_log:
self.tl.debug(f'Protected torrent: {torrent["tracker"]}hash: {torrent["hash"]}')
# if torrent['tags'] == '':
if istagblank(torrent['tags']):
if is_tag_blank(torrent['tags']):
self.qbt_client.torrents_add_tags(self.tracker_protected_tag,torrent['hash'])
self.tracker_list.append(torrent)
if isnotprotectedtracker(torrent['tracker'], self.tracker_whitelist.values()):
if is_not_protected_tracker(torrent['tracker'], self.tracker_whitelist.values()):
if self.use_log:
self.tl.debug(f'Non-protected torrent: {torrent["tracker"]}hash: {torrent["hash"]}')
# if torrent['tags'] == '':
if istagblank(torrent['tags']):
if is_tag_blank(torrent['tags']):
self.qbt_client.torrents_add_tags(self.tracker_non_protected_tag,torrent['hash'])
self.tracker_list.append(torrent)
def ispreme(added, minage, time):
def is_preme(added, minage, time):
if added + minage >= time:
return True
def iscatignored(cat, catlist):
def is_cat_ignored(cat, catlist):
if cat in catlist:
return True
def istrackerblank(tracker):
def is_tracker_blank(tracker):
if tracker == '':
return True
def isprotectedtracker(tracker, trackerlist):
def is_protected_tracker(tracker, trackerlist):
if tracker.split('/')[2] in trackerlist:
return True
def isnotprotectedtracker(tracker, trackerlist):
def is_not_protected_tracker(tracker, trackerlist):
if tracker.split('/')[2] not in trackerlist:
return True
def istagblank(tag):
def is_tag_blank(tag):
if tag == '':
return True
def isignoredtag(igtags, tortags):
def is_ignored_tag(igtags, tortags):
for igt in igtags:
if igt in tortags:
return True

View File

@ -1,12 +1,12 @@
def torlog(self):
def tor_log(self):
"""Setting up the log file, if self.use_log is set to true and self.loglevel is DEBUG OR INFO"""
if self.use_log:
if self.loglevel == 'DEBUG':
self.tl.basicConfig(filename=self.logpath, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.DEBUG)
elif self.loglevel == 'INFO':
self.tl.basicConfig(filename=self.logpath, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.INFO)
if self.log_level == 'DEBUG':
self.tl.basicConfig(filename=self.log_path, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.DEBUG)
elif self.log_level == 'INFO':
self.tl.basicConfig(filename=self.log_path, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.INFO)
def tornotify(self):
def tor_notify(self):
"""Seting up to use pushover, if self.use_pushover is set to true and
if valid self.po_key and self.po_token is provided in the config file"""
if self.use_pushover:
@ -16,12 +16,12 @@ def tornotifytest(self):
"""Used to make sure tornotify is working and messages are getting to the client"""
self.poc.send_message("Test Message", title="qbit-maid")
def processcounts(self):
def process_counts(self):
self.c = self.ct()
for item in self.tracker_list:
self.c[item["tags"]] += 1
def printprocessor(self):
def print_processor(self):
"""Print summary of torrents"""
self.tl.info(f'Total: {self.total_torrents}')
self.tl.info(f'Premature: {self.preme_tor_counter}')
@ -31,7 +31,7 @@ def printprocessor(self):
self.tl.info(f'Orphaned: {self.up_tor_counter}')
self.tl.info(f'Marked for deletion: {len(self.torrent_hash_delete_list)}')
def tornotifysummary(self):
def tor_notify_summary(self):
"""Main notification method when the app is used in an automated fashion"""
self.poc.send_message(f" Total: {self.total_torrents}\n\
Premature: {self.preme_tor_counter}\n\
@ -53,20 +53,20 @@ def writetor(self, filepath='./torrentinfo.json'):
"""
pass
def listfirsttor(self, index=0):
def list_first_tor(self, index=0):
"""Only lists the first torrent"""
self.tl.debug('First torrent in the list:')
torrent = self.torrentlist[index]
torrent = self.torrent_list[index]
for k,v in torrent.items():
self.tl.debug(f'{k}: {v}')
self.tl.debug('\n')
def listqbitapiinfo(self):
def list_qbit_api_info(self):
"""Writes torrent info to log file"""
self.tl.debug(f'qBittorrent: {self.qbt_client.app.version}')
self.tl.debug(f'qBittorrent Web API: {self.qbt_client.app.web_api_version}')
def torrentcount(self):
def torrent_count(self):
"""write torrent counts to log file"""
self.tl.debug(f'torrents that are protected {self.tracker_list.count("ipt")}')
self.tl.debug(f'torrents that aren\'t protected {self.tracker_list.count("public")}')
@ -75,12 +75,12 @@ def torlisttags(self):
pass
def debugpremecal(self):
for torrent in self.torrentlist:
for torrent in self.torrent_list:
if torrent['infohash_v1'] == 'a89b484ea375094af53ce89ecbea14bf086d6284':
print(torrent["name"][0:20])
print(torrent['added_on'] + self.minimum_age >= self.t.time())
def getscriptruntime(self):
def get_script_runtime(self):
elapsed_time = self.et - self.st
if self.use_log:
self.tl.info(f'Execution time: [{elapsed_time}]')

View File

@ -1,36 +1,36 @@
from cgitb import enable
def torprocessor(self):
def tor_processor(self):
"""Main logic to sort through both self.tracker_nonprotected_list and self.tracker_protected_list
If torrent meets criteria for deletion, its infohash_v1 will be appended to self.torrent_hash_delete_list
"""
for canidate in self.tracker_list:
# if canidate['state'] == 'downloading':
if isdownloading(canidate['state']):
if is_downloading(canidate['state']):
if self.use_log:
self.tl.info(f'["{canidate["name"][0:20]}..."] is still downloading and will be skipped.')
continue
# elif canidate['ratio'] < float(1.05) and self.tracker_protected_tag in canidate["tags"]:
elif isprotectedunderratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
elif is_protected_under_ratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] is below a 1.05 ratio({canidate["ratio"]})')
# if canidate['added_on'] + self.age <= self.t.time():
if isoldtor(canidate['added_on'], self.age, self.t.time()):
if is_old_tor(canidate['added_on'], self.age, self.t.time()):
if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] Seconds old: {self.t.time() - self.age - canidate["added_on"]}')
self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion from the protected list.')
# elif canidate['ratio'] >= float(1.05) and self.tracker_protected_tag in canidate["tags"]:
elif isprotectedoverratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
elif is_protected_over_ratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] is above a 1.05 ratio({canidate["ratio"]}).')
self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion from the protected list.')
# elif self.tracker_non_protected_tag in canidate["tags"]:
elif isnonprotectedtor(self.tracker_non_protected_tag, canidate["tags"]):
elif is_not_protected_tor(self.tracker_non_protected_tag, canidate["tags"]):
self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion.')
@ -41,11 +41,11 @@ def torprocessor(self):
self.up_tor_counter += 1
continue
def tordeletetags(self):
def tor_delete_tags(self):
tag_list = [self.tracker_protected_tag, self.tracker_non_protected_tag]
self.qbt_client.torrents_delete_tags(tag_list)
def tordelete(self):
def tor_delete(self):
"""Remove torrents, will also delete files, this keeps the filesystem clean.
Only pass self.torrent_hash_delete_list if you would like to keep the files."""
if self.use_log:
@ -53,23 +53,23 @@ def tordelete(self):
self.tl.debug(self.torrent_hash_delete_list)
self.qbt_client.torrents_delete(True, self.torrent_hash_delete_list)
def isdownloading(state):
def is_downloading(state):
if state == 'downloading':
return True
def isprotectedunderratio(torratio, setratio, settag, tortag):
def is_protected_under_ratio(torratio, setratio, settag, tortag):
if torratio < float(setratio) and settag in tortag:
return True
def isoldtor(toradd, setage, currenttime):
def is_old_tor(toradd, setage, currenttime):
if toradd + setage <= currenttime:
return True
def isprotectedoverratio(torratio, setratio, settag, tortag):
def is_protected_over_ratio(torratio, setratio, settag, tortag):
if torratio >= float(setratio) and settag in tortag:
return True
def isnonprotectedtor(setnonprotectedtag, tortags):
def is_not_protected_tor(setnonprotectedtag, tortags):
if setnonprotectedtag in tortags:
return True

View File

@ -1,97 +1,97 @@
import unittest
from qlist import ispreme,iscatignored,istrackerblank,isprotectedtracker,isnotprotectedtracker,istagblank,isignoredtag
from qprocess import isdownloading,isprotectedunderratio,isoldtor,isprotectedoverratio,isnonprotectedtor
from qlist import is_preme,is_cat_ignored,is_tracker_blank,is_protected_tracker,is_not_protected_tracker,is_tag_blank,is_ignored_tag
from qprocess import is_downloading,is_protected_under_ratio,is_old_tor,is_protected_over_ratio,is_not_protected_tor
class TestQbitmaid(unittest.TestCase):
def test_ispreme_sanity(self):
self.assertTrue(ispreme(1,1,1))
self.assertFalse(ispreme(1,1,3))
self.assertTrue(is_preme(1,1,1))
self.assertFalse(is_preme(1,1,3))
def test_ispreme(self):
pass
def test_iscatignored_sanity(self):
self.assertTrue(iscatignored('a', ['a','b','c']))
self.assertTrue(iscatignored('b', ['a','b','c']))
self.assertTrue(iscatignored('c', ['a','b','c']))
self.assertFalse(iscatignored('d', ['a','b','c']))
self.assertFalse(iscatignored(1, ['a','b','c']))
self.assertFalse(iscatignored(1.0000000, ['a','b','c']))
self.assertTrue(is_cat_ignored('a', ['a','b','c']))
self.assertTrue(is_cat_ignored('b', ['a','b','c']))
self.assertTrue(is_cat_ignored('c', ['a','b','c']))
self.assertFalse(is_cat_ignored('d', ['a','b','c']))
self.assertFalse(is_cat_ignored(1, ['a','b','c']))
self.assertFalse(is_cat_ignored(1.0000000, ['a','b','c']))
def test_iscatignored(self):
pass
def test_istrackerblank_sanity(self):
self.assertTrue(istrackerblank(''))
self.assertFalse(istrackerblank('a'))
self.assertFalse(istrackerblank(1))
self.assertFalse(istrackerblank(1.00000000))
self.assertTrue(is_tracker_blank(''))
self.assertFalse(is_tracker_blank('a'))
self.assertFalse(is_tracker_blank(1))
self.assertFalse(is_tracker_blank(1.00000000))
def test_istrackerblank(self):
pass
def test_isprotectedtracker_sanity(self):
self.assertTrue(isprotectedtracker('https://a.com/',['a.com','b.me','c.io']))
self.assertFalse(isprotectedtracker('https://google.com/',['a.com','b.me','c.io']))
self.assertFalse(isprotectedtracker('https://d.com',['a.com','b.me','c.io']))
self.assertTrue(is_protected_tracker('https://a.com/',['a.com','b.me','c.io']))
self.assertFalse(is_protected_tracker('https://google.com/',['a.com','b.me','c.io']))
self.assertFalse(is_protected_tracker('https://d.com',['a.com','b.me','c.io']))
def test_isprotectedtracker(self):
pass
def test_isnotprotectedtracker_sanity(self):
self.assertFalse(isnotprotectedtracker('https://a.com/',['a.com','b.me','c.io']))
self.assertTrue(isnotprotectedtracker('https://google.com/',['a.com','b.me','c.io']))
self.assertTrue(isnotprotectedtracker('https://d.com',['a.com','b.me','c.io']))
self.assertFalse(is_not_protected_tracker('https://a.com/',['a.com','b.me','c.io']))
self.assertTrue(is_not_protected_tracker('https://google.com/',['a.com','b.me','c.io']))
self.assertTrue(is_not_protected_tracker('https://d.com',['a.com','b.me','c.io']))
def test_isnotprotectedtracker(self):
pass
def test_istagblank(self):
self.assertTrue(istagblank(''))
self.assertFalse(istagblank('a'))
self.assertFalse(istagblank(1))
self.assertFalse(istagblank(1.0001))
self.assertFalse(istagblank(False))
self.assertFalse(istagblank(True))
self.assertTrue(is_tag_blank(''))
self.assertFalse(is_tag_blank('a'))
self.assertFalse(is_tag_blank(1))
self.assertFalse(is_tag_blank(1.0001))
self.assertFalse(is_tag_blank(False))
self.assertFalse(is_tag_blank(True))
def test_isdownloading_sanity(self):
self.assertTrue(isdownloading('downloading'))
self.assertTrue(is_downloading('downloading'))
def test_isdownloading(self):
self.assertFalse(isdownloading('DOWNLOADING'))
self.assertFalse(isdownloading('dOwNlOaDiNg'))
self.assertFalse(is_downloading('DOWNLOADING'))
self.assertFalse(is_downloading('dOwNlOaDiNg'))
def test_isprotectedunderratio_sanity(self):
self.assertTrue(isprotectedunderratio(0.5,1,'a','a,b,c'))
self.assertTrue(is_protected_under_ratio(0.5,1,'a','a,b,c'))
def test_isprotectedunderratio(self):
pass
def test_isoldtor_sanity(self):
self.assertTrue(isoldtor(1,2,4))
self.assertTrue(is_old_tor(1,2,4))
def test_isoldtor(self):
self.assertFalse(isoldtor(1661150664,2419200,1662049004.2101078))
self.assertFalse(isoldtor(1661150664,2419200,1662049004))
self.assertFalse(isoldtor(1661150664.000000,2419200.0000000,1662049004.2101078))
self.assertFalse(is_old_tor(1661150664,2419200,1662049004.2101078))
self.assertFalse(is_old_tor(1661150664,2419200,1662049004))
self.assertFalse(is_old_tor(1661150664.000000,2419200.0000000,1662049004.2101078))
def test_isprotectedoverratio_sanity(self):
self.assertTrue(isprotectedoverratio(2,1,'a','a,b,c'))
self.assertTrue(is_protected_over_ratio(2,1,'a','a,b,c'))
def test_isprotectedoverratio(self):
pass
def test_isnonprotectedtor_sanity(self):
self.assertTrue(isnonprotectedtor('a','a,b,c'))
self.assertTrue(is_not_protected_tor('a','a,b,c'))
def test_isnonprotectedtor(self):
pass
def test_isignoredtag_sanity(self):
self.assertTrue(isignoredtag(['a','b','c'], 'first,second,third,a'))
self.assertTrue(is_ignored_tag(['a','b','c'], 'first,second,third,a'))
def test_isignoredtag_sanity(self):
self.assertTrue(isignoredtag(['a','b','c'], 'first,second,third,a'))
self.assertTrue(is_ignored_tag(['a','b','c'], 'first,second,third,a'))
# def test__sanity(self):
# pass