Compare commits

..

No commits in common. "main" and "v1.0.0" have entirely different histories.
main ... v1.0.0

21 changed files with 307 additions and 992 deletions

View File

@ -1,178 +0,0 @@
*.example
LICENSE
*.log
README.md
requirements.txt
Dockerfile
*docker-test*
*.log
*.json
*.csv
*.toml
*.git*
.dockerignore
.DS_Store
.vscode/*
thunder-tests/*
.drone.yml
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

View File

@ -1,55 +0,0 @@
kind: pipeline
name: default
steps:
- name: docker
image: plugins/docker
settings:
registry: git.jonb.io
dry_run: false
username: jblu
password:
from_secret: gittea_drone
repo: git.jonb.io/jblu/qbit-maid
tags:
- latest
when:
branch:
- main
event:
- push
- name: docker-test
image: plugins/docker
settings:
registry: git.jonb.io
dry_run: false
username: jblu
password:
from_secret: gittea_drone
repo: git.jonb.io/jblu/qbit-maid
tags:
- dev
when:
branch:
- dev*
event:
- push
- name: test-main
image: git.jonb.io/jblu/qbit-maid:latest
commands:
- python test_qbitmaid.py
- python test_write_csv.py
when:
branch:
- main
event:
- push
- name: test-dev
image: git.jonb.io/jblu/qbit-maid:dev
commands:
- python test_qbitmaid.py
- python test_write_csv.py
when:
branch:
- dev*
event:
- push

1
.gitignore vendored Executable file → Normal file
View File

@ -1,7 +1,6 @@
*.log *.log
*.json *.json
*.csv *.csv
*.toml
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

View File

@ -1,40 +0,0 @@
import requests as r
from tomllib import load
import os
def apprise_notify(req_obj, host, port, aurls, title, body):
payload = {'urls': aurls,'title': title,'body': body,}
url = f'http://{host}:{port}/notify/'
apprise_response = req_obj.post(url, json = payload ,verify=False)
return apprise_response
class AppriseClient:
def __init__(self):
self.config = ''
try:
if os.environ["DOCKER"]:
self.host = os.environ["host"]
self.port = os.environ["port"]
self.aurls = os.environ["aurls"]
self.title = os.environ["title"]
self.body = os.environ["body"]
if os.environ["toml_path"]:
config_file_path=os.environ["toml_path"]
with open(config_file_path, 'rb') as c:
self.config = load(c)
except:
KeyError
if os.path.exists('./config.toml'):
config_file_path = './config.toml'
with open(config_file_path, 'rb') as c:
self.config = load(c)
if self.config:
self.host = self.config["apprise"]["host"]
self.port = self.config["apprise"]["port"]
self.aurls = self.config["apprise"]["aurls"]
self.title = self.config["apprise"]["title"]
self.body = self.config["apprise"]["body"]
self.apprise_response = apprise_notify(r,self.host,self.port,self.aurls,self.title,self.body)
if __name__ == "__main__":
AppriseClient()

View File

@ -1,8 +0,0 @@
FROM python:alpine3.18
WORKDIR /
COPY . opt
RUN apk add --no-cache supercronic
RUN pip install requests
RUN pip install qbittorrent-api
RUN chmod +x /opt/entrypoint.sh
CMD ["/opt/entrypoint.sh"]

21
LICENSE
View File

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2022 Jonathan Logan Branan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

174
README.md Executable file → Normal file
View File

@ -1,8 +1,6 @@
# qbit-maid # qbit-maid
> Warning: This application removes torrents that are over the minimum age and that are not part of the ignored categories, domains or tags. Please use the delete_torrents feature set to false when first testing its functionality.
[![Build Status](https://drone.jonb.io/api/badges/jblu/qbit-maid/status.svg?ref=refs/heads/main)](https://drone.jonb.io/jblu/qbit-maid) ## Warning: This application removes torrents that are over the minimum age and that are not part of the ignored categories, domains or tags. Please use the delete_torrents feature set to false when first testing its functionality.
The objective is to remove torrents based on the following criteria: The objective is to remove torrents based on the following criteria:
- tracker domain name - tracker domain name
@ -10,92 +8,94 @@ The objective is to remove torrents based on the following criteria:
- ratio - ratio
- state - state
## Install ```mermaid
### Docker(Recommended) graph TD;
qbit-maid.py-->qlogging.py;
[package](https://git.jonb.io/jblu/-/packages/container/qbit-maid/latest) qbit-maid.py-->qlist.py;
qbit-maid.py-->qprocess.py;
docker pull git.jonb.io/jblu/qbit-maid:latest qlogging.py-->qbit-maid.py;
qlist.py-->qbit-maid.py;
#### Docker Run Command: qprocess.py-->qbit-maid.py;
> Please note it is best practice to escape spaces in variables. That is why there is backslashes in the cron schedule.
docker run --name qbit-maid -v /opt/qbit-maid:/config/ -e CRON=0\ 1\ *\ *\ * -e toml_path=/config/config.toml git.jonb.io/jblu/qbit-maid
#### Docker Compose
``` ```
version: '3.3'
services: | File | Purpose |
qbit-maid: | --- | --- |
image: git.jonb.io/jblu/qbit-maid | qbit-maid.py | Client to the qbit api and calls functions from the other files |
container_name: qbit-maid | qlist.py | Builds out torrent lists |
volumes: | qlogging.py | Logging and push notification communication |
- /opt/qbit-maid:/config | qprocess.py | Submits qualifying torrents for deletion |
environment: | test_qbitmaid.py | Unit tests |
- CRON="0 1 * * *" | ignored_categories.json | whitelist for categorys to ignore |
- toml_path=/config/config.toml | ignored_tags.json | whitelist for torrent tags to ignore |
| ignored_trackers.json | whitelist of fqdn names to ignore |
You will need a config.json in the root directory.
| Key | Value |
| --- | --- |
| host | string, ip or hostname of qbittorrent server |
| port | number, port of admin gui(used for api aswell) |
| username | admin account for qbittorrent |
| password | password for admin account |
| loglevel | is what log messages are written to the log file. INFO or DEBUG are valid entries(case sensitive) |
| protected_tag | used to mark torrents to handle with care |
| non_protected_tag | we don't care about these torrents |
| logpath | will write a log in root directory if left as is other wise specify other path using forward slashes |
| age | number, seconds for how long we keep torrents from IPTORRENTS |
| minimum_age | age in seconds torrents should reached before they are removed |
| use_pushover | true or false to enable or disable pushover notification summary |
| use_log | true or false to enable or disable writing to alog file |
| po_key | pushover key |
| po_token | pushover api token |
| delete_torrents | true or false to enable or disable deletion. Useful for dry-runs |
| enable_dragnet | true or false to enable dragnet functionality. Useful for debugging |
It should look something like this:
Config.json
``` ```
### Via Git {
git clone https://git.jonb.io/jblu/qbit-maid.git "host": "192.168.1.1",
"port": 8080,
Qbit-maid will look for an environment variable *toml_path* for its configuration.If it doesn't find it, it will look for a config.toml file in it's own directory. "username": "admin",
##### config.toml "password": "admin",
"loglevel": "INFO",
"logpath": "./qc.log",
"protected_tag": "ipt",
"non_protected_tag": "public",
"age": 2419200,
"minimum_age": 432000,
"use_pushover": false,
"use_log": true,
"po_key": "",
"po_token": "",
"delete_torrents": false
"enable_dragnet": false,
"dragnet_outfile": "./orphaned.csv"
}
``` ```
[qbittorrent]
host = "192.168.x.x"
port = 8080
username = "user"
password = "pass"
[logging] You will need a ignored_categories.json in the root directory. This will ignore any of the categories found in the values of the entries.
use_log = true ```
log_level = "INFO" {
log_path = "./qc.log" "example": "general",
"example2": "sonarr"
[app_tags] }
protected_tag = "ipt" ```
non_protected_tag = "public"
You will need a ignored_domains.json in the root directory. This will ignore any torrents from these trackers.
[torrent] ```
age = 2419200 {
minimum_age = 432000 "iptorrents-empirehost": "ssl.empirehost.me",
delete_torrents = false "iptorrents-stackoverflow": "localhost.stackoverflow.tech",
"iptorrents-bgp": "routing.bgp.technology"
[pushover] }
use_pushover = false ```
po_key = "<key>>"
po_token = "<token>>" You will need a ignored_tags.json in the root directory. This will ignore any torrents with these tags.
```
[apprise] {
use_apprise = false "first":"first",
host = "192.168.x.x" "second":"second",
port = 8088 "third":"third"
aurls = 'mailto://user:pass@gmail.com' }
[dragnet]
enable_dragnet = false
dragnet_outfile = "./orphaned.csv"
[telemetry]
enable_telemetry = false
telemetry_outfile = "./telemetry.csv"
[ignored_categories]
tech = "tech"
books = "books"
general = "general"
[ignored_domains]
iptorrents-empirehost = "ssl.empirehost.me"
iptorrents-stackoverflow = "localhost.stackoverflow.tech"
iptorrents-bgp = "routing.bgp.technology"
[ignored_tags]
save = "save"
[healthcheck]
use_healthcheck = false
healthcheck_url = "https://example.com/ping/<uuid>>"
``` ```

19
config.json.example Normal file
View File

@ -0,0 +1,19 @@
{
"host": "192.168.1.1",
"port": 8080,
"username": "admin",
"password": "admin",
"loglevel": "INFO",
"logpath": "./qc.log",
"protected_tag": "ipt",
"non_protected_tag": "public",
"age": 2419200,
"minimum_age": 432000,
"use_pushover": true,
"use_log": true,
"po_key": "",
"po_token": "",
"delete_torrents": false,
"enable_dragnet": true,
"dragnet_outfile": "./orphaned.csv"
}

View File

@ -1,51 +0,0 @@
[qbittorrent]
host = "192.168.x.x"
port = 8080
username = "user"
password = "pass"
[logging]
use_log = true
log_level = "INFO"
log_path = "./qc.log"
[app_tags]
protected_tag = "ipt"
non_protected_tag = "public"
[torrent]
max_age = 2419200
min_age = 432000
delete_torrents = false
[pushover]
use_pushover = false
po_key = "<key>>"
po_token = "<token>>"
[apprise]
use_apprise = false
host = "192.168.x.x"
port = 8088
aurls = 'mailto://user:pass@gmail.com'
[dragnet]
enable_dragnet = false
dragnet_outfile = "./orphaned.csv"
[ignored_categories]
tech = "tech"
books = "books"
general = "general"
[ignored_domains]
iptorrents-empirehost = "ssl.empirehost.me"
iptorrents-stackoverflow = "localhost.stackoverflow.tech"
iptorrents-bgp = "routing.bgp.technology"
[ignored_tags]
save = "save"
[healthcheck]
use_healthcheck = true
healthcheck_url = "https://example.com/ping/<uuid>>"

View File

@ -1,7 +0,0 @@
#!/bin/sh
CRON_CONFIG_FILE="/opt/crontab"
echo "${CRON} python /opt/qbit-maid.py" > $CRON_CONFIG_FILE
exec supercronic -passthrough-logs -quiet $CRON_CONFIG_FILE

View File

@ -0,0 +1,3 @@
{
"example": "general"
}

View File

@ -0,0 +1,5 @@
{
"iptorrents-empirehost": "ssl.empirehost.me",
"iptorrents-stackoverflow": "localhost.stackoverflow.tech",
"iptorrents-bgp": "routing.bgp.technology"
}

View File

@ -0,0 +1,5 @@
{
"first":"first",
"second":"second",
"third":"third"
}

View File

@ -1,234 +0,0 @@
# pushover 1.2
#
# Copyright (C) 2013-2018 Thibaut Horel <thibaut.horel@gmail.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import requests
BASE_URL = "https://api.pushover.net/1/"
MESSAGE_URL = BASE_URL + "messages.json"
USER_URL = BASE_URL + "users/validate.json"
SOUND_URL = BASE_URL + "sounds.json"
RECEIPT_URL = BASE_URL + "receipts/"
GLANCE_URL = BASE_URL + "glances.json"
class RequestError(Exception):
"""Exception which is raised when Pushover's API returns an error code.
The list of errors is stored in the :attr:`errors` attribute.
"""
def __init__(self, errors):
Exception.__init__(self)
self.errors = errors
def __str__(self):
return "\n==> " + "\n==> ".join(self.errors)
class Request(object):
"""Base class to send a request to the Pushover server and check the return
status code. The request is sent on instantiation and raises
a :class:`RequestError` exception when the request is rejected.
"""
def __init__(self, method, url, payload):
files = {}
if "attachment" in payload:
files["attachment"] = payload["attachment"]
del payload["attachment"]
self.payload = payload
self.files = files
request = getattr(requests, method)(url, params=payload, files=files)
self.answer = request.json()
if 400 <= request.status_code < 500:
raise RequestError(self.answer["errors"])
def __str__(self):
return str(self.answer)
class MessageRequest(Request):
"""This class represents a message request to the Pushover API. You do not
need to create it yourself, but the :func:`Pushover.message` function
returns :class:`MessageRequest` objects.
The :attr:`answer` attribute contains a JSON representation of the answer
made by the Pushover API. When sending a message with a priority of 2, you
can poll the status of the notification with the :func:`poll` function.
"""
params = {
"expired": "expires_at",
"called_back": "called_back_at",
"acknowledged": "acknowledged_at",
}
def __init__(self, payload):
Request.__init__(self, "post", MESSAGE_URL, payload)
self.status = {"done": True}
if payload.get("priority", 0) == 2:
self.url = RECEIPT_URL + self.answer["receipt"]
self.status["done"] = False
for param, when in MessageRequest.params.items():
self.status[param] = False
self.status[when] = 0
def poll(self):
"""If the message request has a priority of 2, Pushover keeps sending
the same notification until the client acknowledges it. Calling the
:func:`poll` function fetches the status of the :class:`MessageRequest`
object until the notifications either expires, is acknowledged by the
client, or the callback url is reached. The status is available in the
``status`` dictionary.
Returns ``True`` when the request has expired or been acknowledged and
``False`` otherwise so that a typical handling of a priority-2
notification can look like this::
request = p.message("Urgent!", priority=2, expire=120, retry=60)
while not request.poll():
# do something
time.sleep(5)
print request.status
"""
if not self.status["done"]:
r = Request("get", self.url + ".json", {"token": self.payload["token"]})
for param, when in MessageRequest.params.items():
self.status[param] = bool(r.answer[param])
self.status[when] = int(r.answer[when])
for param in ["acknowledged_by", "acknowledged_by_device"]:
self.status[param] = r.answer[param]
self.status["last_delivered_at"] = int(r.answer["last_delivered_at"])
if any(self.status[param] for param in MessageRequest.params):
self.status["done"] = True
return self.status["done"]
def cancel(self):
"""If the message request has a priority of 2, Pushover keeps sending
the same notification until it either reaches its ``expire`` value or
is aknowledged by the client. Calling the :func:`cancel` function
cancels the notification early.
"""
if not self.status["done"]:
return Request(
"post", self.url + "/cancel.json", {"token": self.payload["token"]}
)
else:
return None
class Pushover(object):
"""This is the main class of the module. It represents a Pushover app and
is tied to a unique API token.
* ``token``: Pushover API token
"""
_SOUNDS = None
message_keywords = [
"title",
"priority",
"sound",
"callback",
"timestamp",
"url",
"url_title",
"device",
"retry",
"expire",
"html",
"attachment",
]
glance_keywords = ["title", "text", "subtext", "count", "percent", "device"]
def __init__(self, token):
self.token = token
@property
def sounds(self):
"""Return a dictionary of sounds recognized by Pushover and that can be
used in a notification message.
"""
if not Pushover._SOUNDS:
request = Request("get", SOUND_URL, {"token": self.token})
Pushover._SOUNDS = request.answer["sounds"]
return Pushover._SOUNDS
def verify(self, user, device=None):
"""Verify that the `user` and optional `device` exist. Returns
`None` when the user/device does not exist or a list of the user's
devices otherwise.
"""
payload = {"user": user, "token": self.token}
if device:
payload["device"] = device
try:
request = Request("post", USER_URL, payload)
except RequestError:
return None
else:
return request.answer["devices"]
def message(self, user, message, **kwargs):
"""Send `message` to the user specified by `user`. It is possible
to specify additional properties of the message by passing keyword
arguments. The list of valid keywords is ``title, priority, sound,
callback, timestamp, url, url_title, device, retry, expire and html``
which are described in the Pushover API documentation.
For convenience, you can simply set ``timestamp=True`` to set the
timestamp to the current timestamp.
An image can be attached to a message by passing a file-like object
to the `attachment` keyword argument.
This method returns a :class:`MessageRequest` object.
"""
payload = {"message": message, "user": user, "token": self.token}
for key, value in kwargs.items():
if key not in Pushover.message_keywords:
raise ValueError("{0}: invalid message parameter".format(key))
elif key == "timestamp" and value is True:
payload[key] = int(time.time())
elif key == "sound" and value not in self.sounds:
raise ValueError("{0}: invalid sound".format(value))
else:
payload[key] = value
return MessageRequest(payload)
def glance(self, user, **kwargs):
"""Send a glance to the user. The default property is ``text``, as this
is used on most glances, however a valid glance does not need to
require text and can be constructed using any combination of valid
keyword properties. The list of valid keywords is ``title, text,
subtext, count, percent and device`` which are described in the
Pushover Glance API documentation.
This method returns a :class:`GlanceRequest` object.
"""
payload = {"user": user, "token": self.token}
for key, value in kwargs.items():
if key not in Pushover.glance_keywords:
raise ValueError("{0}: invalid glance parameter".format(key))
else:
payload[key] = value
return Request("post", GLANCE_URL, payload)

135
qbit-maid.py Executable file → Normal file
View File

@ -1,105 +1,59 @@
#The first file shall contain an client to the qbit api and the processing of the torrents.
import qbittorrentapi import qbittorrentapi
import pushover import pushover
from tomllib import load from json import load
from qlist import * from qlist import *
from qlogging import * from qlogging import *
from qprocess import * from qprocess import *
from AppriseClient import apprise_notify
import time import time
import datetime import datetime
import logging import logging
from collections import Counter from collections import Counter
import csv import csv
import requests as r
import os
r.packages.urllib3.disable_warnings()
class Qbt: class Qbt:
def __init__(self): def __init__(self):
"""Main object, should be calling functions from qlist.py, qlogging.py and qprocess.py""" """Main object, should be calling functions from qlist.py, qlogging.py and qprocess.py"""
# Open the config. Needs a json file with the data in config.json.example # Open the config. Needs a json file with the data in config.json.example
self.st = datetime.datetime.now() self.st = datetime.datetime.now()
c = open('./config.json')
if os.getenv("toml_path"): self.config = load(c)
config_file_path=os.getenv("toml_path") w = open('./ignored_categories.json')
with open(config_file_path, 'rb') as c: self.cat_whitelist = load(w)
self.config = load(c) tg = open('./ignored_tags.json')
if os.path.exists('./config.toml'): self.ignored_tags = load(tg)
config_file_path = './config.toml' # Create the api object
with open(config_file_path, 'rb') as c:
self.config = load(c)
# # Create the api object
self.qbt_client = qbittorrentapi.Client( self.qbt_client = qbittorrentapi.Client(
# qbittorrent host=self.config["host"],
host=self.config["qbittorrent"]["host"], port=self.config["port"],
port=self.config["qbittorrent"]["port"], username=self.config["username"],
username=self.config["qbittorrent"]["username"], password=self.config["password"],
password=self.config["qbittorrent"]["password"],
) )
# Create the logging and pushover objects # Create the logging and pushover objects
self.tl = logging self.tl = logging
self.po = pushover self.po = pushover
self.ct = Counter self.ct = Counter
self.cv = csv self.cv = csv
# Init config.toml # Variables torlog uses from config.json
self.use_pushover = self.config["use_pushover"]
# logging self.use_log = self.config["use_log"]
self.use_log = self.config["logging"]["use_log"] self.po_key = self.config["po_key"]
self.log_path = self.config["logging"]["log_path"] self.po_token = self.config["po_token"]
self.log_level = self.config["logging"]["log_level"] self.logpath = self.config["logpath"]
self.loglevel = self.config["loglevel"]
#app_tags self.tracker_protected_tag = self.config["protected_tag"]
self.tracker_protected_tag = self.config["app_tags"]["protected_tag"] self.tracker_non_protected_tag = self.config["non_protected_tag"]
self.tracker_non_protected_tag = self.config["app_tags"]["non_protected_tag"] self.minimum_age = self.config["minimum_age"]
self.age = self.config["age"]
#torrent self.enable_dragnet = self.config["enable_dragnet"]
self.delete_torrents = self.config["torrent"]["delete_torrents"] self.dragnet_outfile = self.config["dragnet_outfile"]
self.min_age = self.config["torrent"]["min_age"]
self.max_age = self.config["torrent"]["max_age"]
#pushover
self.use_pushover = self.config["pushover"]["use_pushover"]
self.po_key = self.config["pushover"]["po_key"]
self.po_token = self.config["pushover"]["po_token"]
#apprise
self.use_apprise = self.config["apprise"]["use_apprise"]
self.apprise_host = self.config["apprise"]["host"]
self.apprise_port = self.config["apprise"]["port"]
self.apprise_aurls = self.config["apprise"]["aurls"]
#dragnet
self.enable_dragnet = self.config["dragnet"]["enable_dragnet"]
self.dragnet_outfile = self.config["dragnet"]["dragnet_outfile"]
#telemetry
self.enable_telemetry = self.config["telemetry"]["enable_telemetry"]
self.telemetry_outfile = self.config["telemetry"]["telemetry_outfile"]
#ignored_categories
self.cat_whitelist = self.config["ignored_categories"]
#ignored_domains
self.tracker_whitelist = self.config["ignored_domains"]
#ignored_tags
self.ignored_tags = self.config["ignored_domains"]
#healthcheck
self.use_healthcheck = self.config["healthcheck"]["use_healthcheck"]
self.healthcheck_url = self.config["healthcheck"]["healthcheck_url"]
# Calling log and notify functions # Calling log and notify functions
tor_log(self) torlog(self)
tor_notify(self) tornotify(self)
self.t = time self.t = time
#start healthcheck job
if self.use_healthcheck:
send_ping(self, r, self.healthcheck_url.rstrip("/") + "/start" )
# Pulling domain names to treat carefully # Pulling domain names to treat carefully
f = open('./ignored_domains.json')
self.tracker_whitelist = load(f)
self.tracker_list = [] self.tracker_list = []
self.up_tor_counter = 0 self.up_tor_counter = 0
self.preme_tor_counter = 0 self.preme_tor_counter = 0
@ -116,29 +70,24 @@ class Qbt:
self.tl.exception(e) self.tl.exception(e)
self.po.send_message(e, title="qbit-maid API ERROR") self.po.send_message(e, title="qbit-maid API ERROR")
# Pulling all torrent data # Pulling all torrent data
self.torrent_list = self.qbt_client.torrents_info() self.torrentlist = self.qbt_client.torrents_info()
#Main process block #Main process block
if self.use_log: if self.use_log:
list_qbit_api_info(self) listqbitapiinfo(self)
list_first_tor(self) listfirsttor(self)
debug_torrent_list(self) buildtorlist(self)
build_tor_list(self) processcounts(self)
process_counts(self)
if self.use_log: if self.use_log:
torrent_count(self) torrentcount(self)
tor_processor(self) torprocessor(self)
if self.use_log: if self.use_log:
print_processor(self) printprocessor(self)
if self.delete_torrents: if self.config["delete_torrents"]:
tor_delete(self) tordelete(self)
self.et = datetime.datetime.now() self.et = datetime.datetime.now()
get_script_runtime(self) getscriptruntime(self)
if self.use_pushover: if self.use_pushover:
tor_notify_summary(self) tornotifysummary(self)
if self.use_apprise:
tor_notify_apprise(self, r, apprise_notify)
if self.use_healthcheck:
send_ping(self, r, self.healthcheck_url)
# Run # Run
if __name__== "__main__": if __name__== "__main__":
Qbt() Qbt()

87
qlist.py Executable file → Normal file
View File

@ -1,84 +1,73 @@
def build_tor_list(self): def buildtorlist(self):
"""Builds multiple lists of torrents to be sorted. Also adds tags to the torents. """Builds multiple lists of torrents to be sorted. Also adds tags to the torents.
There are more effecient ways of doing things but I did this rather quickly. There are more effecient ways of doing things but I did this rather quickly.
V2 will certainly be more performant. The reason two lists were used was so that torrents V2 will certainly be more performant. The reason two lists were used was so that torrents
that are in public trackers woudln't be around as long as torrents from a private tracker. that are in public trackers woudln't be around as long as torrents from a private tracker.
""" """
self.total_torrents = len(self.torrent_list) self.total_torrents = len(self.torrentlist)
while self.torrent_list: while self.torrentlist:
torrent = self.torrent_list.pop() torrent = self.torrentlist.pop()
if self.use_log: if self.use_log:
self.tl.debug(f'---Analyzing ["{torrent["name"][0:20]}..."] {torrent["infohash_v1"]}---') self.tl.debug(f'["{torrent["name"][0:20]}..."] {torrent["infohash_v1"]}')
# Need a way to tag when the tracker is blank if isignoredtag(self.ignored_tags.values(),torrent['tags']):
if is_tracker_blank(torrent['tracker']):
if self.use_log:
self.tl.warning(f'Torrent doesn\'t have a tracker ["{torrent["name"][0:20]}..."] hash: {torrent["hash"]}')
self.ignored_counter += 1 self.ignored_counter += 1
continue continue
elif is_cat_ignored(torrent['category'], self.cat_whitelist.values()): # if torrent['added_on'] + self.minimum_age >= self.t.time():
if self.use_log: if ispreme(torrent['added_on'], self.minimum_age, self.t.time()):
self.tl.info(f'Ignored category: ["{torrent["name"][0:20]}..."] category:[{torrent["category"]}] hash: {torrent["hash"]}')
self.ignored_counter += 1
continue
elif is_ignored_tag(self.ignored_tags.values(),torrent['tags']):
if self.use_log:
self.tl.info(f'Ignored tag: ["{torrent["name"][0:20]}..."] tags: {torrent["tags"]} hash: {torrent["hash"]}')
self.ignored_counter += 1
continue
if is_tag_blank(torrent['tags']):
if self.use_log:
self.tl.debug(f'Tagging new torrent: ["{torrent["name"][0:20]}..."] {torrent["tracker"]}hash: {torrent["hash"]}')
if is_protected_tracker(torrent['tracker'], self.tracker_whitelist.values()):
self.qbt_client.torrents_add_tags(self.tracker_protected_tag,torrent['hash'])
elif is_not_protected_tracker(torrent['tracker'], self.tracker_whitelist.values()):
self.qbt_client.torrents_add_tags(self.tracker_non_protected_tag,torrent['hash'])
if is_preme(torrent['seeding_time'], self.min_age):
self.preme_tor_counter += 1 self.preme_tor_counter += 1
self.tl.debug(f'Premature torrent: ["{torrent["name"][0:20]}..."] Seconds Seeded: [{torrent["seeding_time"]}] hash: {torrent["hash"]}')
continue continue
elif is_protected_tracker(torrent['tracker'], self.tracker_whitelist.values()): # if torrent['category'] in self.cat_whitelist.values():
if is_tag_blank(torrent['tags']): if iscatignored(torrent['category'], self.cat_whitelist.values()):
self.tl.info(f'Ignored torrent:["{torrent["name"][0:20]}..."]')
self.ignored_counter += 1
continue
# if torrent['tracker'] == '':
if istrackerblank(torrent['tracker']):
if self.use_log:
self.tl.warning(f'Torrent doesn\'t have a tracker ["{torrent["name"][0:20]}..."] [{torrent["tracker"]}]hash: {torrent["hash"]}')
self.ignored_counter += 1
continue
# if torrent['tracker'].split('/')[2] in self.tracker_whitelist.values():
if isprotectedtracker(torrent['tracker'], self.tracker_whitelist.values()):
if self.use_log:
self.tl.debug(f'Protected torrent: {torrent["tracker"]}hash: {torrent["hash"]}')
# if torrent['tags'] == '':
if istagblank(torrent['tags']):
self.qbt_client.torrents_add_tags(self.tracker_protected_tag,torrent['hash']) self.qbt_client.torrents_add_tags(self.tracker_protected_tag,torrent['hash'])
if self.use_log:
self.tl.debug(f'Tagging Protected torrent: ["{torrent["name"][0:20]}..."] {torrent["tracker"]}hash: {torrent["hash"]}')
self.tracker_list.append(torrent) self.tracker_list.append(torrent)
elif is_not_protected_tracker(torrent['tracker'], self.tracker_whitelist.values()): if isnotprotectedtracker(torrent['tracker'], self.tracker_whitelist.values()):
if is_tag_blank(torrent['tags']): if self.use_log:
self.tl.debug(f'Non-protected torrent: {torrent["tracker"]}hash: {torrent["hash"]}')
# if torrent['tags'] == '':
if istagblank(torrent['tags']):
self.qbt_client.torrents_add_tags(self.tracker_non_protected_tag,torrent['hash']) self.qbt_client.torrents_add_tags(self.tracker_non_protected_tag,torrent['hash'])
if self.use_log:
self.tl.debug(f'Tagging Non-protected torrent: ["{torrent["name"][0:20]}..."] {torrent["tracker"]}hash: {torrent["hash"]}')
self.tracker_list.append(torrent) self.tracker_list.append(torrent)
def ispreme(added, minage, time):
def is_preme(seeding_time, minage): if added + minage >= time:
if seeding_time <= minage:
return True return True
def is_cat_ignored(cat, catlist): def iscatignored(cat, catlist):
if cat in catlist: if cat in catlist:
return True return True
def is_tracker_blank(tracker): def istrackerblank(tracker):
if tracker == '': if tracker == '':
return True return True
def is_protected_tracker(tracker, trackerlist): def isprotectedtracker(tracker, trackerlist):
if tracker == '':
return False
if tracker.split('/')[2] in trackerlist: if tracker.split('/')[2] in trackerlist:
return True return True
def is_not_protected_tracker(tracker, trackerlist): def isnotprotectedtracker(tracker, trackerlist):
if tracker == '':
return False
if tracker.split('/')[2] not in trackerlist: if tracker.split('/')[2] not in trackerlist:
return True return True
def is_tag_blank(tag): def istagblank(tag):
if tag == '': if tag == '':
return True return True
def is_ignored_tag(igtags, tortags): def isignoredtag(igtags, tortags):
for igt in igtags: for igt in igtags:
if igt in tortags: if igt in tortags:
return True return True

87
qlogging.py Executable file → Normal file
View File

@ -1,44 +1,27 @@
def tor_log(self): def torlog(self):
"""Setting up the log file, if self.use_log is set to true and self.loglevel is DEBUG OR INFO""" """Setting up the log file, if self.use_log is set to true and self.loglevel is DEBUG OR INFO"""
if self.use_log: if self.use_log:
if self.log_level.lower() == 'debug': if self.loglevel == 'DEBUG':
self.tl.basicConfig(filename=self.log_path, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.DEBUG) self.tl.basicConfig(filename=self.logpath, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.DEBUG)
elif self.log_level.lower() == 'info': elif self.loglevel == 'INFO':
self.tl.basicConfig(filename=self.log_path, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.INFO) self.tl.basicConfig(filename=self.logpath, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.INFO)
elif self.log_level.lower() == 'warn':
self.tl.basicConfig(filename=self.log_path, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.WARN)
elif self.log_level.lower() == 'error':
self.tl.basicConfig(filename=self.log_path, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.ERROR)
def tor_notify(self): def tornotify(self):
"""Seting up to use pushover, if self.use_pushover is set to true and """Seting up to use pushover, if self.use_pushover is set to true and
if valid self.po_key and self.po_token is provided in the config file""" if valid self.po_key and self.po_token is provided in the config file"""
if self.use_pushover: if self.use_pushover:
self.poc = self.po.Pushover(self.po_token) self.poc = self.po.Client(self.po_key, api_token=self.po_token)
def tor_notify_apprise(self, req_obj, app_obj):
"""Use apprise"""
body = f" Total: {self.total_torrents}\n\
Premature: {self.preme_tor_counter}\n\
Ignored: {self.ignored_counter}\n\
Protected: {self.c[self.tracker_protected_tag]}\n\
Non-protected: {self.c[self.tracker_non_protected_tag]}\n\
Orphaned: {self.up_tor_counter}\n\
Marked for deletion: {len(self.torrent_hash_delete_list)}\n\
{self.extm}"
title = "--- qbit-maid summary ---"
app_obj(req_obj, self.apprise_host, self.apprise_port, self.apprise_aurls, title, body)
def tornotifytest(self): def tornotifytest(self):
"""Used to make sure tornotify is working and messages are getting to the client""" """Used to make sure tornotify is working and messages are getting to the client"""
self.poc.message(self.po_key, "Test Message", title="qbit-maid") self.poc.send_message("Test Message", title="qbit-maid")
def process_counts(self): def processcounts(self):
self.c = self.ct() self.c = self.ct()
for item in self.tracker_list: for item in self.tracker_list:
self.c[item["tags"]] += 1 self.c[item["tags"]] += 1
def print_processor(self): def printprocessor(self):
"""Print summary of torrents""" """Print summary of torrents"""
self.tl.info(f'Total: {self.total_torrents}') self.tl.info(f'Total: {self.total_torrents}')
self.tl.info(f'Premature: {self.preme_tor_counter}') self.tl.info(f'Premature: {self.preme_tor_counter}')
@ -48,9 +31,9 @@ def print_processor(self):
self.tl.info(f'Orphaned: {self.up_tor_counter}') self.tl.info(f'Orphaned: {self.up_tor_counter}')
self.tl.info(f'Marked for deletion: {len(self.torrent_hash_delete_list)}') self.tl.info(f'Marked for deletion: {len(self.torrent_hash_delete_list)}')
def tor_notify_summary(self): def tornotifysummary(self):
"""Main notification method when the app is used in an automated fashion""" """Main notification method when the app is used in an automated fashion"""
self.poc.message(self.po_key, f" Total: {self.total_torrents}\n\ self.poc.send_message(f" Total: {self.total_torrents}\n\
Premature: {self.preme_tor_counter}\n\ Premature: {self.preme_tor_counter}\n\
Ignored: {self.ignored_counter}\n\ Ignored: {self.ignored_counter}\n\
Protected: {self.c[self.tracker_protected_tag]}\n\ Protected: {self.c[self.tracker_protected_tag]}\n\
@ -59,38 +42,50 @@ def tor_notify_summary(self):
Marked for deletion: {len(self.torrent_hash_delete_list)}\n\ Marked for deletion: {len(self.torrent_hash_delete_list)}\n\
{self.extm}", title="--- qbit-maid summary ---") {self.extm}", title="--- qbit-maid summary ---")
def list_first_tor(self, index=0): def getunixtimestamp(self):
"""Used for debuging and development related to unixtimestamps, not used in main script but useful"""
self.uts = self.t.time()
self.tl.info(self.uts)
def writetor(self, filepath='./torrentinfo.json'):
"""Write all torrent data to a file.
Useful for development of new features.
"""
pass
def listfirsttor(self, index=0):
"""Only lists the first torrent""" """Only lists the first torrent"""
self.tl.debug('First torrent in the list:') self.tl.debug('First torrent in the list:')
torrent = self.torrent_list[index] torrent = self.torrentlist[index]
for k,v in torrent.items(): for k,v in torrent.items():
self.tl.debug(f'{k}: {v}') self.tl.debug(f'{k}: {v}')
self.tl.debug('\n') self.tl.debug('\n')
def list_qbit_api_info(self): def listqbitapiinfo(self):
"""Writes torrent info to log file""" """Writes torrent info to log file"""
self.tl.debug(f'qBittorrent: {self.qbt_client.app.version}') self.tl.debug(f'qBittorrent: {self.qbt_client.app.version}')
self.tl.debug(f'qBittorrent Web API: {self.qbt_client.app.web_api_version}') self.tl.debug(f'qBittorrent Web API: {self.qbt_client.app.web_api_version}')
def torrent_count(self): def torrentcount(self):
"""write torrent counts to log file""" """write torrent counts to log file"""
self.tl.debug(f'*** Torrents with tag["{self.tracker_protected_tag}"] {self.c[self.tracker_protected_tag]} ***') self.tl.debug(f'torrents that are protected {self.tracker_list.count("ipt")}')
self.tl.debug(f'*** Torrents with tag["{self.tracker_non_protected_tag}"] {self.c[self.tracker_non_protected_tag]} ***') self.tl.debug(f'torrents that aren\'t protected {self.tracker_list.count("public")}')
def get_script_runtime(self): def torlisttags(self):
pass
def debugpremecal(self):
for torrent in self.torrentlist:
if torrent['infohash_v1'] == 'a89b484ea375094af53ce89ecbea14bf086d6284':
print(torrent["name"][0:20])
print(torrent['added_on'] + self.minimum_age >= self.t.time())
def getscriptruntime(self):
elapsed_time = self.et - self.st elapsed_time = self.et - self.st
if self.use_log: if self.use_log:
self.tl.info(f'Execution time: [{elapsed_time}]') self.tl.info(f'Execution time: [{elapsed_time}]')
if self.use_pushover: if self.use_pushover:
self.extm = f"Execution time: [{elapsed_time}]" self.extm = f"Execution time: [{elapsed_time}]"
if self.use_apprise:
self.extm = f"Execution time: [{elapsed_time}]"
def send_ping(self, req_obj, healthcheck_url): def getobjecttype(object):
try: print(type(object))
req_obj.get(healthcheck_url, timeout=5)
except req_obj.RequestException as e:
self.tl.info(f"Ping failed: {e}")
def debug_torrent_list(self):
self.tl.debug(self.torrent_list)

59
qprocess.py Executable file → Normal file
View File

@ -1,82 +1,83 @@
def tor_processor(self): from cgitb import enable
def torprocessor(self):
"""Main logic to sort through both self.tracker_nonprotected_list and self.tracker_protected_list """Main logic to sort through both self.tracker_nonprotected_list and self.tracker_protected_list
If torrent meets criteria for deletion, its infohash_v1 will be appended to self.torrent_hash_delete_list If torrent meets criteria for deletion, its infohash_v1 will be appended to self.torrent_hash_delete_list
""" """
for canidate in self.tracker_list: for canidate in self.tracker_list:
if self.enable_telemetry: # if canidate['state'] == 'downloading':
header = ['state','ratio','tags','added','hash','name','tracker'] if isdownloading(canidate['state']):
row = [canidate['state'],canidate['ratio'],canidate["tags"],canidate['added_on'],canidate['infohash_v1'],canidate["name"][0:20],canidate['tracker']]
write_csv(self.cv,self.telemetry_outfile,header,row)
if self.use_log:
self.tl.debug(f'---Reviewing canidate: ["{canidate["name"][0:20]}..."] {canidate["infohash_v1"]}---')
if is_downloading(canidate['state']):
if self.use_log: if self.use_log:
self.tl.info(f'["{canidate["name"][0:20]}..."] is still downloading and will be skipped.') self.tl.info(f'["{canidate["name"][0:20]}..."] is still downloading and will be skipped.')
continue continue
elif is_protected_under_ratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]): # elif canidate['ratio'] < float(1.05) and self.tracker_protected_tag in canidate["tags"]:
elif isprotectedunderratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
if self.use_log: if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] is below a 1.05 ratio({canidate["ratio"]})') self.tl.debug(f'["{canidate["name"][0:20]}..."] is below a 1.05 ratio({canidate["ratio"]})')
if is_old_tor(canidate['time_active'], self.max_age): # if canidate['added_on'] + self.age <= self.t.time():
if isoldtor(canidate['added_on'], self.age, self.t.time()):
if self.use_log: if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] Seconds old: {canidate["time_active"]} Delta: {canidate["time_active"] - self.max_age}') self.tl.debug(f'["{canidate["name"][0:20]}..."] Seconds old: {self.t.time() - self.age - canidate["added_on"]}')
self.torrent_hash_delete_list.append(canidate['infohash_v1']) self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log: if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion from the protected list.') self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion from the protected list.')
elif is_protected_over_ratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]): # elif canidate['ratio'] >= float(1.05) and self.tracker_protected_tag in canidate["tags"]:
elif isprotectedoverratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
if self.use_log: if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] is above a 1.05 ratio({canidate["ratio"]}).') self.tl.debug(f'["{canidate["name"][0:20]}..."] is above a 1.05 ratio({canidate["ratio"]}).')
self.torrent_hash_delete_list.append(canidate['infohash_v1']) self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log: if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion from the protected list.') self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion from the protected list.')
elif is_not_protected_tor(self.tracker_non_protected_tag, canidate["tags"]): # elif self.tracker_non_protected_tag in canidate["tags"]:
elif isnonprotectedtor(self.tracker_non_protected_tag, canidate["tags"]):
self.torrent_hash_delete_list.append(canidate['infohash_v1']) self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log: if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion.') self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion.')
else: else:
if self.enable_dragnet: if self.enable_dragnet:
header = ['state','ratio','tags','added','thash','tname','trname'] dragnet(self,canidate['state'],canidate['ratio'],canidate["tags"],canidate['added_on'],self.age,self.t.time(),canidate['infohash_v1'],canidate["name"][0:20])
row = [canidate['state'],canidate['ratio'],canidate["tags"],canidate['added_on'],canidate['infohash_v1'],canidate["name"][0:20],canidate['tracker']]
write_csv(self.cv,self.dragnet_outfile,header,row)
self.tl.info(f'["{canidate["name"][0:20]}..."] is orphaned.') self.tl.info(f'["{canidate["name"][0:20]}..."] is orphaned.')
self.up_tor_counter += 1 self.up_tor_counter += 1
continue continue
def tor_delete_tags(self): def tordeletetags(self):
tag_list = [self.tracker_protected_tag, self.tracker_non_protected_tag] tag_list = [self.tracker_protected_tag, self.tracker_non_protected_tag]
self.qbt_client.torrents_delete_tags(tag_list) self.qbt_client.torrents_delete_tags(tag_list)
def tor_delete(self): def tordelete(self):
"""Remove torrents, will also delete files, this keeps the filesystem clean. """Remove torrents, will also delete files, this keeps the filesystem clean.
Only pass self.torrent_hash_delete_list if you would like to keep the files.""" Only pass self.torrent_hash_delete_list if you would like to keep the files."""
if self.use_log: if self.use_log:
self.tl.debug('Hash list submitted for deletion:') self.tl.debug('Hash list submitted for deletion:')
self.tl.debug(self.torrent_hash_delete_list) self.tl.debug(self.torrent_hash_delete_list)
if self.torrent_hash_delete_list: self.qbt_client.torrents_delete(True, self.torrent_hash_delete_list)
self.qbt_client.torrents_delete(True, self.torrent_hash_delete_list)
def is_downloading(state): def isdownloading(state):
if state == 'downloading': if state == 'downloading':
return True return True
def is_protected_under_ratio(torratio, setratio, settag, tortag): def isprotectedunderratio(torratio, setratio, settag, tortag):
if torratio < float(setratio) and settag in tortag: if torratio < float(setratio) and settag in tortag:
return True return True
def is_old_tor(realage, maxage): def isoldtor(toradd, setage, currenttime):
if realage >= maxage: if toradd + setage <= currenttime:
return True return True
def is_protected_over_ratio(torratio, setratio, settag, tortag): def isprotectedoverratio(torratio, setratio, settag, tortag):
if torratio >= float(setratio) and settag in tortag: if torratio >= float(setratio) and settag in tortag:
return True return True
def is_not_protected_tor(setnonprotectedtag, tortags): def isnonprotectedtor(setnonprotectedtag, tortags):
if setnonprotectedtag in tortags: if setnonprotectedtag in tortags:
return True return True
def write_csv(csv_obj,outfile,header,row): def dragnet(self,state,ratio,tags,added,age,time,thash,tname):
with open(outfile, 'a+', encoding='UTF8', newline='') as f: header = ['state','ratio','tags','added','age','time','thash','tname']
writer = csv_obj.writer(f) row = [state,ratio,tags,added,age,time,thash,tname]
with open(self.dragnet_outfile, 'w', encoding='UTF8', newline='') as f:
writer = self.cv.writer(f)
if f.tell() == 0: if f.tell() == 0:
writer.writerow(header) writer.writerow(header)
writer.writerow(row) writer.writerow(row)

View File

@ -1 +0,0 @@
qbittorrent_api==2022.5.32

92
test_qbitmaid.py Executable file → Normal file
View File

@ -1,111 +1,97 @@
import unittest import unittest
import requests as r from qlist import ispreme,iscatignored,istrackerblank,isprotectedtracker,isnotprotectedtracker,istagblank,isignoredtag
from qlist import is_preme,is_cat_ignored,is_tracker_blank,is_protected_tracker,is_not_protected_tracker,is_tag_blank,is_ignored_tag from qprocess import isdownloading,isprotectedunderratio,isoldtor,isprotectedoverratio,isnonprotectedtor
from qprocess import is_downloading,is_protected_under_ratio,is_old_tor,is_protected_over_ratio,is_not_protected_tor
from qlogging import send_ping
class TestQbitmaid(unittest.TestCase): class TestQbitmaid(unittest.TestCase):
def test_ispreme_sanity(self): def test_ispreme_sanity(self):
self.assertTrue(is_preme(1,1)) self.assertTrue(ispreme(1,1,1))
self.assertTrue(is_preme(1,2)) self.assertFalse(ispreme(1,1,3))
self.assertFalse(is_preme(2,1))
def test_ispreme(self): def test_ispreme(self):
pass pass
def test_iscatignored_sanity(self): def test_iscatignored_sanity(self):
self.assertTrue(is_cat_ignored('a', ['a','b','c'])) self.assertTrue(iscatignored('a', ['a','b','c']))
self.assertTrue(is_cat_ignored('b', ['a','b','c'])) self.assertTrue(iscatignored('b', ['a','b','c']))
self.assertTrue(is_cat_ignored('c', ['a','b','c'])) self.assertTrue(iscatignored('c', ['a','b','c']))
self.assertFalse(is_cat_ignored('d', ['a','b','c'])) self.assertFalse(iscatignored('d', ['a','b','c']))
self.assertFalse(is_cat_ignored(1, ['a','b','d'])) self.assertFalse(iscatignored(1, ['a','b','c']))
self.assertFalse(is_cat_ignored(1.0000000, ['a','b','c'])) self.assertFalse(iscatignored(1.0000000, ['a','b','c']))
def test_iscatignored(self): def test_iscatignored(self):
pass pass
def test_istrackerblank_sanity(self): def test_istrackerblank_sanity(self):
self.assertTrue(is_tracker_blank('')) self.assertTrue(istrackerblank(''))
self.assertFalse(is_tracker_blank('a')) self.assertFalse(istrackerblank('a'))
self.assertFalse(is_tracker_blank(1)) self.assertFalse(istrackerblank(1))
self.assertFalse(is_tracker_blank(1.00000000)) self.assertFalse(istrackerblank(1.00000000))
def test_istrackerblank(self): def test_istrackerblank(self):
pass pass
def test_isprotectedtracker_sanity(self): def test_isprotectedtracker_sanity(self):
self.assertTrue(is_protected_tracker('https://a.com/',['a.com','b.me','c.io'])) self.assertTrue(isprotectedtracker('https://a.com/',['a.com','b.me','c.io']))
self.assertFalse(is_protected_tracker('https://google.com/',['a.com','b.me','c.io'])) self.assertFalse(isprotectedtracker('https://google.com/',['a.com','b.me','c.io']))
self.assertFalse(is_protected_tracker('https://d.com',['a.com','b.me','c.io'])) self.assertFalse(isprotectedtracker('https://d.com',['a.com','b.me','c.io']))
def test_isprotectedtracker(self): def test_isprotectedtracker(self):
pass pass
def test_isnotprotectedtracker_sanity(self): def test_isnotprotectedtracker_sanity(self):
self.assertFalse(is_not_protected_tracker('https://a.com/',['a.com','b.me','c.io'])) self.assertFalse(isnotprotectedtracker('https://a.com/',['a.com','b.me','c.io']))
self.assertTrue(is_not_protected_tracker('https://google.com/',['a.com','b.me','c.io'])) self.assertTrue(isnotprotectedtracker('https://google.com/',['a.com','b.me','c.io']))
self.assertTrue(is_not_protected_tracker('https://d.com',['a.com','b.me','c.io'])) self.assertTrue(isnotprotectedtracker('https://d.com',['a.com','b.me','c.io']))
def test_isnotprotectedtracker(self): def test_isnotprotectedtracker(self):
pass pass
def test_istagblank(self): def test_istagblank(self):
self.assertTrue(is_tag_blank('')) self.assertTrue(istagblank(''))
self.assertFalse(is_tag_blank('a')) self.assertFalse(istagblank('a'))
self.assertFalse(is_tag_blank(1)) self.assertFalse(istagblank(1))
self.assertFalse(is_tag_blank(1.0001)) self.assertFalse(istagblank(1.0001))
self.assertFalse(is_tag_blank(False)) self.assertFalse(istagblank(False))
self.assertFalse(is_tag_blank(True)) self.assertFalse(istagblank(True))
def test_isdownloading_sanity(self): def test_isdownloading_sanity(self):
self.assertTrue(is_downloading('downloading')) self.assertTrue(isdownloading('downloading'))
def test_isdownloading(self): def test_isdownloading(self):
self.assertFalse(is_downloading('DOWNLOADING')) self.assertFalse(isdownloading('DOWNLOADING'))
self.assertFalse(is_downloading('dOwNlOaDiNg')) self.assertFalse(isdownloading('dOwNlOaDiNg'))
def test_isprotectedunderratio_sanity(self): def test_isprotectedunderratio_sanity(self):
self.assertTrue(is_protected_under_ratio(0.5,1,'a','a,b,c')) self.assertTrue(isprotectedunderratio(0.5,1,'a','a,b,c'))
def test_isprotectedunderratio(self): def test_isprotectedunderratio(self):
pass pass
def test_isoldtor_sanity(self): def test_isoldtor_sanity(self):
self.assertFalse(is_old_tor(1,2)) self.assertTrue(isoldtor(1,2,4))
def test_isoldtor(self): def test_isoldtor(self):
self.assertTrue(is_old_tor(1,1)) self.assertFalse(isoldtor(1661150664,2419200,1662049004.2101078))
self.assertTrue(is_old_tor(2,1)) self.assertFalse(isoldtor(1661150664,2419200,1662049004))
self.assertFalse(is_old_tor(1,2)) self.assertFalse(isoldtor(1661150664.000000,2419200.0000000,1662049004.2101078))
def test_isprotectedoverratio_sanity(self): def test_isprotectedoverratio_sanity(self):
self.assertTrue(is_protected_over_ratio(2,1,'a','a,b,c')) self.assertTrue(isprotectedoverratio(2,1,'a','a,b,c'))
def test_isprotectedoverratio(self): def test_isprotectedoverratio(self):
pass pass
def test_isnonprotectedtor_sanity(self): def test_isnonprotectedtor_sanity(self):
self.assertTrue(is_not_protected_tor('a','a,b,c')) self.assertTrue(isnonprotectedtor('a','a,b,c'))
def test_isnonprotectedtor(self): def test_isnonprotectedtor(self):
pass pass
def test_isignoredtag_sanity(self): def test_isignoredtag_sanity(self):
self.assertTrue(is_ignored_tag(['a','b','c'], 'first,second,third,a')) self.assertTrue(isignoredtag(['a','b','c'], 'first,second,third,a'))
def test_isignoredtag(self):
self.assertTrue(is_ignored_tag(['save'], 'save,public,ipt'))
def test_sendpingstart_sanity(self):
send_ping(self, r, "https://thunder.jonb.io/ping/921625e5-5b76-4f45-a0c3-56145e16f3bb" + "/start")
url = "https://thunder.jonb.io/ping/921625e5-5b76-4f45-a0c3-56145e16f3bb"
send_ping(self, r, url)
def test_sendping_start(self):
url = "https://thunder.jonb.io/ping/921625e5-5b76-4f45-a0c3-56145e16f3bb/"
send_ping(self, r, url.strip("/") + "/start")
send_ping(self, r, "https://thunder.jonb.io/ping/921625e5-5b76-4f45-a0c3-56145e16f3bb")
def test_isignoredtag_sanity(self):
self.assertTrue(isignoredtag(['a','b','c'], 'first,second,third,a'))
# def test__sanity(self): # def test__sanity(self):
# pass # pass

View File

@ -1,41 +0,0 @@
from qprocess import write_csv
import csv
import unittest
import os
# Torrent Items needed for testing
# canidate['state'] {canidate["ratio"]} {torrent["tags"]} torrent["seeding_time"] {torrent["category"]} {torrent["name"][0:20]} {canidate["time_active"]}
class TestWriteCSV(unittest.TestCase):
def test_write_csv_dragnet(self):
self.cv = csv
outfile = './test_dragnet_outfile.csv'
state = 'downloading'
ratio = 1.05
tags = 'ipt'
added = 1
age = 240000
time = 123456
thash = 'asfasdf23412adfqwer'
tname = 'thisismynamehahahah'
trname = 'https://localhost.stackoverflow.tech/317332f1c125bc9c1b9b14fb8e054908/announce'
header = ['state','ratio','tags','added','age','time','thash','tname','trname']
row = [state,ratio,tags,added,age,time,thash,tname,trname]
write_csv(self.cv,outfile,header,row)
self.assertTrue(os.path.exists(outfile))
def test_write_csv_telemetry(self):
self.cv = csv
outfile = './test_telemetry_outfile.csv'
state = 'downloading'
ratio = 1.05
tags = 'ipt'
added = 1
thash = 'asfasdf23412adfqwer'
tname = 'thisismynamehahahah'
trname = 'https://localhost.stackoverflow.tech/317332f1c125bc9c1b9b14fb8e054908/announce'
header = ['state','ratio','tags','added','hash','name','tracker']
row = [state,ratio,tags,added,thash,tname,trname]
write_csv(self.cv,outfile,header,row)
self.assertTrue(os.path.exists(outfile))
if __name__ == '__main__':
unittest.main()