Refactor conditions #14

Merged
jonbranan merged 3 commits from refactor-conditions into main 2022-08-31 11:03:39 -05:00
5 changed files with 174 additions and 29 deletions

1
.gitignore vendored
View File

@ -1,5 +1,6 @@
*.log *.log
*.json *.json
*.csv
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

View File

@ -9,6 +9,7 @@ import time
import datetime import datetime
import logging import logging
from collections import Counter from collections import Counter
import csv
class Qbt: class Qbt:
def __init__(self): def __init__(self):
@ -30,6 +31,7 @@ class Qbt:
self.tl = logging self.tl = logging
self.po = pushover self.po = pushover
self.ct = Counter self.ct = Counter
self.cv = csv
# Variables torlog uses from config.json # Variables torlog uses from config.json
self.use_pushover = self.config["use_pushover"] self.use_pushover = self.config["use_pushover"]
self.use_log = self.config["use_log"] self.use_log = self.config["use_log"]
@ -63,7 +65,6 @@ class Qbt:
except qbittorrentapi.APIError as e: except qbittorrentapi.APIError as e:
self.tl.exception(e) self.tl.exception(e)
self.po.send_message(e, title="qbit-maid API ERROR") self.po.send_message(e, title="qbit-maid API ERROR")
# self.torrentlist = {}
# Pulling all torrent data # Pulling all torrent data
self.torrentlist = self.qbt_client.torrents_info() self.torrentlist = self.qbt_client.torrents_info()
#Main process block #Main process block

View File

@ -9,27 +9,58 @@ def buildtorlist(self):
torrent = self.torrentlist.pop() torrent = self.torrentlist.pop()
if self.use_log: if self.use_log:
self.tl.debug(f'["{torrent["name"][0:20]}..."] {torrent["infohash_v1"]}') self.tl.debug(f'["{torrent["name"][0:20]}..."] {torrent["infohash_v1"]}')
if torrent['added_on'] + self.minimum_age >= self.t.time(): # if torrent['added_on'] + self.minimum_age >= self.t.time():
if ispreme(torrent['added_on'], self.minimum_age, self.t.time()):
self.preme_tor_counter += 1 self.preme_tor_counter += 1
continue continue
if torrent['category'] in self.cat_whitelist.values(): # if torrent['category'] in self.cat_whitelist.values():
if iscatignored(torrent['category'], self.cat_whitelist.values()):
self.tl.info(f'Ignored torrent:["{torrent["name"][0:20]}..."]') self.tl.info(f'Ignored torrent:["{torrent["name"][0:20]}..."]')
self.ignored_counter += 1 self.ignored_counter += 1
continue continue
if torrent['tracker'] == '': # if torrent['tracker'] == '':
if istrackerblank(torrent['tracker']):
if self.use_log: if self.use_log:
self.tl.warning(f'Torrent doesn\'t have a tracker ["{torrent["name"][0:20]}..."] [{torrent["tracker"]}]hash: {torrent["hash"]}') self.tl.warning(f'Torrent doesn\'t have a tracker ["{torrent["name"][0:20]}..."] [{torrent["tracker"]}]hash: {torrent["hash"]}')
self.ignored_counter += 1 self.ignored_counter += 1
continue continue
if torrent['tracker'].split('/')[2] in self.tracker_whitelist.values(): # if torrent['tracker'].split('/')[2] in self.tracker_whitelist.values():
if isprotectedtracker(torrent['tracker'], self.tracker_whitelist.values()):
if self.use_log: if self.use_log:
self.tl.debug(f'Protected torrent: {torrent["tracker"]}hash: {torrent["hash"]}') self.tl.debug(f'Protected torrent: {torrent["tracker"]}hash: {torrent["hash"]}')
if torrent['tags'] == '': # if torrent['tags'] == '':
if istagblank(torrent['tags']):
self.qbt_client.torrents_add_tags(self.tracker_protected_tag,torrent['hash']) self.qbt_client.torrents_add_tags(self.tracker_protected_tag,torrent['hash'])
self.tracker_list.append(torrent) self.tracker_list.append(torrent)
if torrent['tracker'].split('/')[2] not in self.tracker_whitelist.values(): if isnotprotectedtracker(torrent['tracker'], self.tracker_whitelist.values()):
if self.use_log: if self.use_log:
self.tl.debug(f'Non-protected torrent: {torrent["tracker"]}hash: {torrent["hash"]}') self.tl.debug(f'Non-protected torrent: {torrent["tracker"]}hash: {torrent["hash"]}')
if torrent['tags'] == '': # if torrent['tags'] == '':
if istagblank(torrent['tags']):
self.qbt_client.torrents_add_tags(self.tracker_non_protected_tag,torrent['hash']) self.qbt_client.torrents_add_tags(self.tracker_non_protected_tag,torrent['hash'])
self.tracker_list.append(torrent) self.tracker_list.append(torrent)
def ispreme(added, minage, time):
if added + minage >= time:
return True
def iscatignored(cat, catlist):
if cat in catlist:
return True
def istrackerblank(tracker):
if tracker == '':
return True
def isprotectedtracker(tracker, trackerlist):
if tracker.split('/')[2] in trackerlist:
return True
def isnotprotectedtracker(tracker, trackerlist):
if tracker.split('/')[2] not in trackerlist:
return True
def istagblank(tag):
if tag == '':
return True

View File

@ -3,30 +3,36 @@ def torprocessor(self):
If torrent meets criteria for deletion, its infohash_v1 will be appended to self.torrent_hash_delete_list If torrent meets criteria for deletion, its infohash_v1 will be appended to self.torrent_hash_delete_list
""" """
for canidate in self.tracker_list: for canidate in self.tracker_list:
if canidate['state'] == 'downloading': # if canidate['state'] == 'downloading':
if isdownloading(canidate['state']):
if self.use_log: if self.use_log:
self.tl.info(f'["{canidate["name"][0:20]}..."] is still downloading and will be skipped.') self.tl.info(f'["{canidate["name"][0:20]}..."] is still downloading and will be skipped.')
continue continue
elif canidate['ratio'] < float(1.05) and self.tracker_protected_tag in canidate["tags"]: # elif canidate['ratio'] < float(1.05) and self.tracker_protected_tag in canidate["tags"]:
elif isprotectedunderratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
if self.use_log: if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] is below a 1.05 ratio({canidate["ratio"]})') self.tl.debug(f'["{canidate["name"][0:20]}..."] is below a 1.05 ratio({canidate["ratio"]})')
if canidate['added_on'] + self.age <= self.t.time(): # if canidate['added_on'] + self.age <= self.t.time():
if isoldtor(canidate['added_on'], self.age, self.t.time()):
if self.use_log: if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] Seconds old: {self.t.time() - self.age - canidate["added_on"]}') self.tl.debug(f'["{canidate["name"][0:20]}..."] Seconds old: {self.t.time() - self.age - canidate["added_on"]}')
self.torrent_hash_delete_list.append(canidate['infohash_v1']) self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log: if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion from the protected list.') self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion from the protected list.')
elif canidate['ratio'] >= float(1.05) and self.tracker_protected_tag in canidate["tags"]: # elif canidate['ratio'] >= float(1.05) and self.tracker_protected_tag in canidate["tags"]:
elif isprotectedoverratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
if self.use_log: if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] is above a 1.05 ratio({canidate["ratio"]}).') self.tl.debug(f'["{canidate["name"][0:20]}..."] is above a 1.05 ratio({canidate["ratio"]}).')
self.torrent_hash_delete_list.append(canidate['infohash_v1']) self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log: if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion from the protected list.') self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion from the protected list.')
elif self.tracker_non_protected_tag in canidate["tags"]: # elif self.tracker_non_protected_tag in canidate["tags"]:
elif isnonprotectedtor(self.tracker_non_protected_tag, canidate["tags"]):
self.torrent_hash_delete_list.append(canidate['infohash_v1']) self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log: if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion.') self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion.')
else: else:
dragnet(self,canidate['state'],canidate['ratio'],canidate["tags"],canidate['added_on'],self.age,self.t.time(),canidate['infohash_v1'],canidate["name"][0:20])
self.tl.info(f'["{canidate["name"][0:20]}..."] is orphaned.') self.tl.info(f'["{canidate["name"][0:20]}..."] is orphaned.')
self.up_tor_counter += 1 self.up_tor_counter += 1
continue continue
@ -42,3 +48,32 @@ def tordelete(self):
self.tl.debug('Hash list submitted for deletion:') self.tl.debug('Hash list submitted for deletion:')
self.tl.debug(self.torrent_hash_delete_list) self.tl.debug(self.torrent_hash_delete_list)
self.qbt_client.torrents_delete(True, self.torrent_hash_delete_list) self.qbt_client.torrents_delete(True, self.torrent_hash_delete_list)
def isdownloading(state):
if state == 'downloading':
return True
def isprotectedunderratio(torratio, setratio, settag, tortag):
if torratio < float(setratio) and settag in tortag:
return True
def isoldtor(toradd, setage, currenttime):
if toradd + setage <= currenttime:
return True
def isprotectedoverratio(torratio, setratio, settag, tortag):
if torratio >= float(setratio) and settag in tortag:
return True
def isnonprotectedtor(setnonprotectedtag, tortags):
if setnonprotectedtag in tortags:
return True
def dragnet(self,state,ratio,tags,added,age,time,thash,tname):
header = ['state','ratio','tags','added','age','time','thash','tname']
row = [state,ratio,tags,added,age,time,thash,tname]
with open('./orphanedtorrents.csv', 'w', encoding='UTF8', newline='') as f:
writer = self.cv.writer(f)
if f.tell() == 0:
writer.writerow(header)
writer.writerow(row)

View File

@ -1,18 +1,95 @@
import unittest import unittest
from qprocess import torprocessor from qlist import ispreme,iscatignored,istrackerblank,isprotectedtracker,isnotprotectedtracker,istagblank
import json from qprocess import isdownloading,isprotectedunderratio,isoldtor,isprotectedoverratio,isnonprotectedtor
import logging
import sys
class TestQprocess(unittest.TestCase):
def test_log_and_test_data(self):
self.log= logging.getLogger( "SomeTest.testSomething" )
self.tracker_list = open("./test/torrentinfo.txt", "r")
self.log.debug(self.tracker_list)
assert self.tracker_list
# torprocessor(self) class TestQbitmaid(unittest.TestCase):
def test_ispreme_sanity(self):
self.assertTrue(ispreme(1,1,1))
self.assertFalse(ispreme(1,1,3))
def test_ispreme(self):
pass
def test_iscatignored_sanity(self):
self.assertTrue(iscatignored('a', ['a','b','c']))
self.assertTrue(iscatignored('b', ['a','b','c']))
self.assertTrue(iscatignored('c', ['a','b','c']))
self.assertFalse(iscatignored('d', ['a','b','c']))
self.assertFalse(iscatignored(1, ['a','b','c']))
self.assertFalse(iscatignored(1.0000000, ['a','b','c']))
def test_iscatignored(self):
pass
def test_istrackerblank_sanity(self):
self.assertTrue(istrackerblank(''))
self.assertFalse(istrackerblank('a'))
self.assertFalse(istrackerblank(1))
self.assertFalse(istrackerblank(1.00000000))
def test_istrackerblank(self):
pass
def test_isprotectedtracker_sanity(self):
self.assertTrue(isprotectedtracker('https://a.com/',['a.com','b.me','c.io']))
self.assertFalse(isprotectedtracker('https://google.com/',['a.com','b.me','c.io']))
self.assertFalse(isprotectedtracker('https://d.com',['a.com','b.me','c.io']))
def test_isprotectedtracker(self):
pass
def test_isnotprotectedtracker_sanity(self):
self.assertFalse(isnotprotectedtracker('https://a.com/',['a.com','b.me','c.io']))
self.assertTrue(isnotprotectedtracker('https://google.com/',['a.com','b.me','c.io']))
self.assertTrue(isnotprotectedtracker('https://d.com',['a.com','b.me','c.io']))
def test_isnotprotectedtracker(self):
pass
def test_istagblank(self):
self.assertTrue(istagblank(''))
self.assertFalse(istagblank('a'))
self.assertFalse(istagblank(1))
self.assertFalse(istagblank(1.0001))
self.assertFalse(istagblank(False))
self.assertFalse(istagblank(True))
def test_isdownloading_sanity(self):
self.assertTrue(isdownloading('downloading'))
def test_isdownloading(self):
self.assertFalse(isdownloading('DOWNLOADING'))
self.assertFalse(isdownloading('dOwNlOaDiNg'))
def test_isprotectedunderratio_sanity(self):
self.assertTrue(isprotectedunderratio(0.5,1,'a','a,b,c'))
def test_isprotectedunderratio(self):
pass
def test_isoldtor_sanity(self):
self.assertTrue(isoldtor(1,2,4))
def test_isoldtor(self):
pass
def test_isprotectedoverratio_sanity(self):
self.assertTrue(isprotectedoverratio(2,1,'a','a,b,c'))
def test_isprotectedoverratio(self):
pass
def test_isnonprotectedtor_sanity(self):
self.assertTrue(isnonprotectedtor('a','a,b,c'))
def test_isnonprotectedtor(self):
pass
# def test__sanity(self):
# pass
# def test_(self):
# pass
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig( stream=sys.stderr )
logging.getLogger( "SomeTest.testSomething" ).setLevel( logging.DEBUG )
unittest.main() unittest.main()