Compare commits

..

No commits in common. "main" and "v1.0.0" have entirely different histories.
main ... v1.0.0

21 changed files with 307 additions and 992 deletions

View File

@ -1,178 +0,0 @@
*.example
LICENSE
*.log
README.md
requirements.txt
Dockerfile
*docker-test*
*.log
*.json
*.csv
*.toml
*.git*
.dockerignore
.DS_Store
.vscode/*
thunder-tests/*
.drone.yml
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

View File

@ -1,55 +0,0 @@
kind: pipeline
name: default
steps:
- name: docker
image: plugins/docker
settings:
registry: git.jonb.io
dry_run: false
username: jblu
password:
from_secret: gittea_drone
repo: git.jonb.io/jblu/qbit-maid
tags:
- latest
when:
branch:
- main
event:
- push
- name: docker-test
image: plugins/docker
settings:
registry: git.jonb.io
dry_run: false
username: jblu
password:
from_secret: gittea_drone
repo: git.jonb.io/jblu/qbit-maid
tags:
- dev
when:
branch:
- dev*
event:
- push
- name: test-main
image: git.jonb.io/jblu/qbit-maid:latest
commands:
- python test_qbitmaid.py
- python test_write_csv.py
when:
branch:
- main
event:
- push
- name: test-dev
image: git.jonb.io/jblu/qbit-maid:dev
commands:
- python test_qbitmaid.py
- python test_write_csv.py
when:
branch:
- dev*
event:
- push

1
.gitignore vendored Executable file → Normal file
View File

@ -1,7 +1,6 @@
*.log
*.json
*.csv
*.toml
# Byte-compiled / optimized / DLL files
__pycache__/

View File

@ -1,40 +0,0 @@
import requests as r
from tomllib import load
import os
def apprise_notify(req_obj, host, port, aurls, title, body):
payload = {'urls': aurls,'title': title,'body': body,}
url = f'http://{host}:{port}/notify/'
apprise_response = req_obj.post(url, json = payload ,verify=False)
return apprise_response
class AppriseClient:
def __init__(self):
self.config = ''
try:
if os.environ["DOCKER"]:
self.host = os.environ["host"]
self.port = os.environ["port"]
self.aurls = os.environ["aurls"]
self.title = os.environ["title"]
self.body = os.environ["body"]
if os.environ["toml_path"]:
config_file_path=os.environ["toml_path"]
with open(config_file_path, 'rb') as c:
self.config = load(c)
except:
KeyError
if os.path.exists('./config.toml'):
config_file_path = './config.toml'
with open(config_file_path, 'rb') as c:
self.config = load(c)
if self.config:
self.host = self.config["apprise"]["host"]
self.port = self.config["apprise"]["port"]
self.aurls = self.config["apprise"]["aurls"]
self.title = self.config["apprise"]["title"]
self.body = self.config["apprise"]["body"]
self.apprise_response = apprise_notify(r,self.host,self.port,self.aurls,self.title,self.body)
if __name__ == "__main__":
AppriseClient()

View File

@ -1,8 +0,0 @@
FROM python:alpine3.18
WORKDIR /
COPY . opt
RUN apk add --no-cache supercronic
RUN pip install requests
RUN pip install qbittorrent-api
RUN chmod +x /opt/entrypoint.sh
CMD ["/opt/entrypoint.sh"]

21
LICENSE
View File

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2022 Jonathan Logan Branan
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

170
README.md Executable file → Normal file
View File

@ -1,8 +1,6 @@
# qbit-maid
> Warning: This application removes torrents that are over the minimum age and that are not part of the ignored categories, domains or tags. Please use the delete_torrents feature set to false when first testing its functionality.
[![Build Status](https://drone.jonb.io/api/badges/jblu/qbit-maid/status.svg?ref=refs/heads/main)](https://drone.jonb.io/jblu/qbit-maid)
## Warning: This application removes torrents that are over the minimum age and that are not part of the ignored categories, domains or tags. Please use the delete_torrents feature set to false when first testing its functionality.
The objective is to remove torrents based on the following criteria:
- tracker domain name
@ -10,92 +8,94 @@ The objective is to remove torrents based on the following criteria:
- ratio
- state
## Install
### Docker(Recommended)
[package](https://git.jonb.io/jblu/-/packages/container/qbit-maid/latest)
docker pull git.jonb.io/jblu/qbit-maid:latest
#### Docker Run Command:
> Please note it is best practice to escape spaces in variables. That is why there is backslashes in the cron schedule.
docker run --name qbit-maid -v /opt/qbit-maid:/config/ -e CRON=0\ 1\ *\ *\ * -e toml_path=/config/config.toml git.jonb.io/jblu/qbit-maid
#### Docker Compose
```mermaid
graph TD;
qbit-maid.py-->qlogging.py;
qbit-maid.py-->qlist.py;
qbit-maid.py-->qprocess.py;
qlogging.py-->qbit-maid.py;
qlist.py-->qbit-maid.py;
qprocess.py-->qbit-maid.py;
```
version: '3.3'
services:
qbit-maid:
image: git.jonb.io/jblu/qbit-maid
container_name: qbit-maid
volumes:
- /opt/qbit-maid:/config
environment:
- CRON="0 1 * * *"
- toml_path=/config/config.toml
| File | Purpose |
| --- | --- |
| qbit-maid.py | Client to the qbit api and calls functions from the other files |
| qlist.py | Builds out torrent lists |
| qlogging.py | Logging and push notification communication |
| qprocess.py | Submits qualifying torrents for deletion |
| test_qbitmaid.py | Unit tests |
| ignored_categories.json | whitelist for categorys to ignore |
| ignored_tags.json | whitelist for torrent tags to ignore |
| ignored_trackers.json | whitelist of fqdn names to ignore |
You will need a config.json in the root directory.
| Key | Value |
| --- | --- |
| host | string, ip or hostname of qbittorrent server |
| port | number, port of admin gui(used for api aswell) |
| username | admin account for qbittorrent |
| password | password for admin account |
| loglevel | is what log messages are written to the log file. INFO or DEBUG are valid entries(case sensitive) |
| protected_tag | used to mark torrents to handle with care |
| non_protected_tag | we don't care about these torrents |
| logpath | will write a log in root directory if left as is other wise specify other path using forward slashes |
| age | number, seconds for how long we keep torrents from IPTORRENTS |
| minimum_age | age in seconds torrents should reached before they are removed |
| use_pushover | true or false to enable or disable pushover notification summary |
| use_log | true or false to enable or disable writing to alog file |
| po_key | pushover key |
| po_token | pushover api token |
| delete_torrents | true or false to enable or disable deletion. Useful for dry-runs |
| enable_dragnet | true or false to enable dragnet functionality. Useful for debugging |
It should look something like this:
Config.json
```
### Via Git
git clone https://git.jonb.io/jblu/qbit-maid.git
Qbit-maid will look for an environment variable *toml_path* for its configuration.If it doesn't find it, it will look for a config.toml file in it's own directory.
##### config.toml
{
"host": "192.168.1.1",
"port": 8080,
"username": "admin",
"password": "admin",
"loglevel": "INFO",
"logpath": "./qc.log",
"protected_tag": "ipt",
"non_protected_tag": "public",
"age": 2419200,
"minimum_age": 432000,
"use_pushover": false,
"use_log": true,
"po_key": "",
"po_token": "",
"delete_torrents": false
"enable_dragnet": false,
"dragnet_outfile": "./orphaned.csv"
}
```
[qbittorrent]
host = "192.168.x.x"
port = 8080
username = "user"
password = "pass"
[logging]
use_log = true
log_level = "INFO"
log_path = "./qc.log"
You will need a ignored_categories.json in the root directory. This will ignore any of the categories found in the values of the entries.
```
{
"example": "general",
"example2": "sonarr"
}
```
[app_tags]
protected_tag = "ipt"
non_protected_tag = "public"
You will need a ignored_domains.json in the root directory. This will ignore any torrents from these trackers.
```
{
"iptorrents-empirehost": "ssl.empirehost.me",
"iptorrents-stackoverflow": "localhost.stackoverflow.tech",
"iptorrents-bgp": "routing.bgp.technology"
}
```
[torrent]
age = 2419200
minimum_age = 432000
delete_torrents = false
[pushover]
use_pushover = false
po_key = "<key>>"
po_token = "<token>>"
[apprise]
use_apprise = false
host = "192.168.x.x"
port = 8088
aurls = 'mailto://user:pass@gmail.com'
[dragnet]
enable_dragnet = false
dragnet_outfile = "./orphaned.csv"
[telemetry]
enable_telemetry = false
telemetry_outfile = "./telemetry.csv"
[ignored_categories]
tech = "tech"
books = "books"
general = "general"
[ignored_domains]
iptorrents-empirehost = "ssl.empirehost.me"
iptorrents-stackoverflow = "localhost.stackoverflow.tech"
iptorrents-bgp = "routing.bgp.technology"
[ignored_tags]
save = "save"
[healthcheck]
use_healthcheck = false
healthcheck_url = "https://example.com/ping/<uuid>>"
You will need a ignored_tags.json in the root directory. This will ignore any torrents with these tags.
```
{
"first":"first",
"second":"second",
"third":"third"
}
```

19
config.json.example Normal file
View File

@ -0,0 +1,19 @@
{
"host": "192.168.1.1",
"port": 8080,
"username": "admin",
"password": "admin",
"loglevel": "INFO",
"logpath": "./qc.log",
"protected_tag": "ipt",
"non_protected_tag": "public",
"age": 2419200,
"minimum_age": 432000,
"use_pushover": true,
"use_log": true,
"po_key": "",
"po_token": "",
"delete_torrents": false,
"enable_dragnet": true,
"dragnet_outfile": "./orphaned.csv"
}

View File

@ -1,51 +0,0 @@
[qbittorrent]
host = "192.168.x.x"
port = 8080
username = "user"
password = "pass"
[logging]
use_log = true
log_level = "INFO"
log_path = "./qc.log"
[app_tags]
protected_tag = "ipt"
non_protected_tag = "public"
[torrent]
max_age = 2419200
min_age = 432000
delete_torrents = false
[pushover]
use_pushover = false
po_key = "<key>>"
po_token = "<token>>"
[apprise]
use_apprise = false
host = "192.168.x.x"
port = 8088
aurls = 'mailto://user:pass@gmail.com'
[dragnet]
enable_dragnet = false
dragnet_outfile = "./orphaned.csv"
[ignored_categories]
tech = "tech"
books = "books"
general = "general"
[ignored_domains]
iptorrents-empirehost = "ssl.empirehost.me"
iptorrents-stackoverflow = "localhost.stackoverflow.tech"
iptorrents-bgp = "routing.bgp.technology"
[ignored_tags]
save = "save"
[healthcheck]
use_healthcheck = true
healthcheck_url = "https://example.com/ping/<uuid>>"

View File

@ -1,7 +0,0 @@
#!/bin/sh
CRON_CONFIG_FILE="/opt/crontab"
echo "${CRON} python /opt/qbit-maid.py" > $CRON_CONFIG_FILE
exec supercronic -passthrough-logs -quiet $CRON_CONFIG_FILE

View File

@ -0,0 +1,3 @@
{
"example": "general"
}

View File

@ -0,0 +1,5 @@
{
"iptorrents-empirehost": "ssl.empirehost.me",
"iptorrents-stackoverflow": "localhost.stackoverflow.tech",
"iptorrents-bgp": "routing.bgp.technology"
}

View File

@ -0,0 +1,5 @@
{
"first":"first",
"second":"second",
"third":"third"
}

View File

@ -1,234 +0,0 @@
# pushover 1.2
#
# Copyright (C) 2013-2018 Thibaut Horel <thibaut.horel@gmail.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time
import requests
BASE_URL = "https://api.pushover.net/1/"
MESSAGE_URL = BASE_URL + "messages.json"
USER_URL = BASE_URL + "users/validate.json"
SOUND_URL = BASE_URL + "sounds.json"
RECEIPT_URL = BASE_URL + "receipts/"
GLANCE_URL = BASE_URL + "glances.json"
class RequestError(Exception):
"""Exception which is raised when Pushover's API returns an error code.
The list of errors is stored in the :attr:`errors` attribute.
"""
def __init__(self, errors):
Exception.__init__(self)
self.errors = errors
def __str__(self):
return "\n==> " + "\n==> ".join(self.errors)
class Request(object):
"""Base class to send a request to the Pushover server and check the return
status code. The request is sent on instantiation and raises
a :class:`RequestError` exception when the request is rejected.
"""
def __init__(self, method, url, payload):
files = {}
if "attachment" in payload:
files["attachment"] = payload["attachment"]
del payload["attachment"]
self.payload = payload
self.files = files
request = getattr(requests, method)(url, params=payload, files=files)
self.answer = request.json()
if 400 <= request.status_code < 500:
raise RequestError(self.answer["errors"])
def __str__(self):
return str(self.answer)
class MessageRequest(Request):
"""This class represents a message request to the Pushover API. You do not
need to create it yourself, but the :func:`Pushover.message` function
returns :class:`MessageRequest` objects.
The :attr:`answer` attribute contains a JSON representation of the answer
made by the Pushover API. When sending a message with a priority of 2, you
can poll the status of the notification with the :func:`poll` function.
"""
params = {
"expired": "expires_at",
"called_back": "called_back_at",
"acknowledged": "acknowledged_at",
}
def __init__(self, payload):
Request.__init__(self, "post", MESSAGE_URL, payload)
self.status = {"done": True}
if payload.get("priority", 0) == 2:
self.url = RECEIPT_URL + self.answer["receipt"]
self.status["done"] = False
for param, when in MessageRequest.params.items():
self.status[param] = False
self.status[when] = 0
def poll(self):
"""If the message request has a priority of 2, Pushover keeps sending
the same notification until the client acknowledges it. Calling the
:func:`poll` function fetches the status of the :class:`MessageRequest`
object until the notifications either expires, is acknowledged by the
client, or the callback url is reached. The status is available in the
``status`` dictionary.
Returns ``True`` when the request has expired or been acknowledged and
``False`` otherwise so that a typical handling of a priority-2
notification can look like this::
request = p.message("Urgent!", priority=2, expire=120, retry=60)
while not request.poll():
# do something
time.sleep(5)
print request.status
"""
if not self.status["done"]:
r = Request("get", self.url + ".json", {"token": self.payload["token"]})
for param, when in MessageRequest.params.items():
self.status[param] = bool(r.answer[param])
self.status[when] = int(r.answer[when])
for param in ["acknowledged_by", "acknowledged_by_device"]:
self.status[param] = r.answer[param]
self.status["last_delivered_at"] = int(r.answer["last_delivered_at"])
if any(self.status[param] for param in MessageRequest.params):
self.status["done"] = True
return self.status["done"]
def cancel(self):
"""If the message request has a priority of 2, Pushover keeps sending
the same notification until it either reaches its ``expire`` value or
is aknowledged by the client. Calling the :func:`cancel` function
cancels the notification early.
"""
if not self.status["done"]:
return Request(
"post", self.url + "/cancel.json", {"token": self.payload["token"]}
)
else:
return None
class Pushover(object):
"""This is the main class of the module. It represents a Pushover app and
is tied to a unique API token.
* ``token``: Pushover API token
"""
_SOUNDS = None
message_keywords = [
"title",
"priority",
"sound",
"callback",
"timestamp",
"url",
"url_title",
"device",
"retry",
"expire",
"html",
"attachment",
]
glance_keywords = ["title", "text", "subtext", "count", "percent", "device"]
def __init__(self, token):
self.token = token
@property
def sounds(self):
"""Return a dictionary of sounds recognized by Pushover and that can be
used in a notification message.
"""
if not Pushover._SOUNDS:
request = Request("get", SOUND_URL, {"token": self.token})
Pushover._SOUNDS = request.answer["sounds"]
return Pushover._SOUNDS
def verify(self, user, device=None):
"""Verify that the `user` and optional `device` exist. Returns
`None` when the user/device does not exist or a list of the user's
devices otherwise.
"""
payload = {"user": user, "token": self.token}
if device:
payload["device"] = device
try:
request = Request("post", USER_URL, payload)
except RequestError:
return None
else:
return request.answer["devices"]
def message(self, user, message, **kwargs):
"""Send `message` to the user specified by `user`. It is possible
to specify additional properties of the message by passing keyword
arguments. The list of valid keywords is ``title, priority, sound,
callback, timestamp, url, url_title, device, retry, expire and html``
which are described in the Pushover API documentation.
For convenience, you can simply set ``timestamp=True`` to set the
timestamp to the current timestamp.
An image can be attached to a message by passing a file-like object
to the `attachment` keyword argument.
This method returns a :class:`MessageRequest` object.
"""
payload = {"message": message, "user": user, "token": self.token}
for key, value in kwargs.items():
if key not in Pushover.message_keywords:
raise ValueError("{0}: invalid message parameter".format(key))
elif key == "timestamp" and value is True:
payload[key] = int(time.time())
elif key == "sound" and value not in self.sounds:
raise ValueError("{0}: invalid sound".format(value))
else:
payload[key] = value
return MessageRequest(payload)
def glance(self, user, **kwargs):
"""Send a glance to the user. The default property is ``text``, as this
is used on most glances, however a valid glance does not need to
require text and can be constructed using any combination of valid
keyword properties. The list of valid keywords is ``title, text,
subtext, count, percent and device`` which are described in the
Pushover Glance API documentation.
This method returns a :class:`GlanceRequest` object.
"""
payload = {"user": user, "token": self.token}
for key, value in kwargs.items():
if key not in Pushover.glance_keywords:
raise ValueError("{0}: invalid glance parameter".format(key))
else:
payload[key] = value
return Request("post", GLANCE_URL, payload)

135
qbit-maid.py Executable file → Normal file
View File

@ -1,105 +1,59 @@
#The first file shall contain an client to the qbit api and the processing of the torrents.
import qbittorrentapi
import pushover
from tomllib import load
from json import load
from qlist import *
from qlogging import *
from qprocess import *
from AppriseClient import apprise_notify
import time
import datetime
import logging
from collections import Counter
import csv
import requests as r
import os
r.packages.urllib3.disable_warnings()
class Qbt:
def __init__(self):
"""Main object, should be calling functions from qlist.py, qlogging.py and qprocess.py"""
# Open the config. Needs a json file with the data in config.json.example
self.st = datetime.datetime.now()
if os.getenv("toml_path"):
config_file_path=os.getenv("toml_path")
with open(config_file_path, 'rb') as c:
self.config = load(c)
if os.path.exists('./config.toml'):
config_file_path = './config.toml'
with open(config_file_path, 'rb') as c:
self.config = load(c)
# # Create the api object
c = open('./config.json')
self.config = load(c)
w = open('./ignored_categories.json')
self.cat_whitelist = load(w)
tg = open('./ignored_tags.json')
self.ignored_tags = load(tg)
# Create the api object
self.qbt_client = qbittorrentapi.Client(
# qbittorrent
host=self.config["qbittorrent"]["host"],
port=self.config["qbittorrent"]["port"],
username=self.config["qbittorrent"]["username"],
password=self.config["qbittorrent"]["password"],
host=self.config["host"],
port=self.config["port"],
username=self.config["username"],
password=self.config["password"],
)
# Create the logging and pushover objects
self.tl = logging
self.po = pushover
self.ct = Counter
self.cv = csv
# Init config.toml
# logging
self.use_log = self.config["logging"]["use_log"]
self.log_path = self.config["logging"]["log_path"]
self.log_level = self.config["logging"]["log_level"]
#app_tags
self.tracker_protected_tag = self.config["app_tags"]["protected_tag"]
self.tracker_non_protected_tag = self.config["app_tags"]["non_protected_tag"]
#torrent
self.delete_torrents = self.config["torrent"]["delete_torrents"]
self.min_age = self.config["torrent"]["min_age"]
self.max_age = self.config["torrent"]["max_age"]
#pushover
self.use_pushover = self.config["pushover"]["use_pushover"]
self.po_key = self.config["pushover"]["po_key"]
self.po_token = self.config["pushover"]["po_token"]
#apprise
self.use_apprise = self.config["apprise"]["use_apprise"]
self.apprise_host = self.config["apprise"]["host"]
self.apprise_port = self.config["apprise"]["port"]
self.apprise_aurls = self.config["apprise"]["aurls"]
#dragnet
self.enable_dragnet = self.config["dragnet"]["enable_dragnet"]
self.dragnet_outfile = self.config["dragnet"]["dragnet_outfile"]
#telemetry
self.enable_telemetry = self.config["telemetry"]["enable_telemetry"]
self.telemetry_outfile = self.config["telemetry"]["telemetry_outfile"]
#ignored_categories
self.cat_whitelist = self.config["ignored_categories"]
#ignored_domains
self.tracker_whitelist = self.config["ignored_domains"]
#ignored_tags
self.ignored_tags = self.config["ignored_domains"]
#healthcheck
self.use_healthcheck = self.config["healthcheck"]["use_healthcheck"]
self.healthcheck_url = self.config["healthcheck"]["healthcheck_url"]
# Variables torlog uses from config.json
self.use_pushover = self.config["use_pushover"]
self.use_log = self.config["use_log"]
self.po_key = self.config["po_key"]
self.po_token = self.config["po_token"]
self.logpath = self.config["logpath"]
self.loglevel = self.config["loglevel"]
self.tracker_protected_tag = self.config["protected_tag"]
self.tracker_non_protected_tag = self.config["non_protected_tag"]
self.minimum_age = self.config["minimum_age"]
self.age = self.config["age"]
self.enable_dragnet = self.config["enable_dragnet"]
self.dragnet_outfile = self.config["dragnet_outfile"]
# Calling log and notify functions
tor_log(self)
tor_notify(self)
torlog(self)
tornotify(self)
self.t = time
#start healthcheck job
if self.use_healthcheck:
send_ping(self, r, self.healthcheck_url.rstrip("/") + "/start" )
# Pulling domain names to treat carefully
f = open('./ignored_domains.json')
self.tracker_whitelist = load(f)
self.tracker_list = []
self.up_tor_counter = 0
self.preme_tor_counter = 0
@ -116,29 +70,24 @@ class Qbt:
self.tl.exception(e)
self.po.send_message(e, title="qbit-maid API ERROR")
# Pulling all torrent data
self.torrent_list = self.qbt_client.torrents_info()
self.torrentlist = self.qbt_client.torrents_info()
#Main process block
if self.use_log:
list_qbit_api_info(self)
list_first_tor(self)
debug_torrent_list(self)
build_tor_list(self)
process_counts(self)
listqbitapiinfo(self)
listfirsttor(self)
buildtorlist(self)
processcounts(self)
if self.use_log:
torrent_count(self)
tor_processor(self)
torrentcount(self)
torprocessor(self)
if self.use_log:
print_processor(self)
if self.delete_torrents:
tor_delete(self)
printprocessor(self)
if self.config["delete_torrents"]:
tordelete(self)
self.et = datetime.datetime.now()
get_script_runtime(self)
getscriptruntime(self)
if self.use_pushover:
tor_notify_summary(self)
if self.use_apprise:
tor_notify_apprise(self, r, apprise_notify)
if self.use_healthcheck:
send_ping(self, r, self.healthcheck_url)
tornotifysummary(self)
# Run
if __name__== "__main__":
Qbt()

87
qlist.py Executable file → Normal file
View File

@ -1,84 +1,73 @@
def build_tor_list(self):
def buildtorlist(self):
"""Builds multiple lists of torrents to be sorted. Also adds tags to the torents.
There are more effecient ways of doing things but I did this rather quickly.
V2 will certainly be more performant. The reason two lists were used was so that torrents
that are in public trackers woudln't be around as long as torrents from a private tracker.
"""
self.total_torrents = len(self.torrent_list)
while self.torrent_list:
torrent = self.torrent_list.pop()
self.total_torrents = len(self.torrentlist)
while self.torrentlist:
torrent = self.torrentlist.pop()
if self.use_log:
self.tl.debug(f'---Analyzing ["{torrent["name"][0:20]}..."] {torrent["infohash_v1"]}---')
# Need a way to tag when the tracker is blank
if is_tracker_blank(torrent['tracker']):
if self.use_log:
self.tl.warning(f'Torrent doesn\'t have a tracker ["{torrent["name"][0:20]}..."] hash: {torrent["hash"]}')
self.tl.debug(f'["{torrent["name"][0:20]}..."] {torrent["infohash_v1"]}')
if isignoredtag(self.ignored_tags.values(),torrent['tags']):
self.ignored_counter += 1
continue
elif is_cat_ignored(torrent['category'], self.cat_whitelist.values()):
if self.use_log:
self.tl.info(f'Ignored category: ["{torrent["name"][0:20]}..."] category:[{torrent["category"]}] hash: {torrent["hash"]}')
self.ignored_counter += 1
continue
elif is_ignored_tag(self.ignored_tags.values(),torrent['tags']):
if self.use_log:
self.tl.info(f'Ignored tag: ["{torrent["name"][0:20]}..."] tags: {torrent["tags"]} hash: {torrent["hash"]}')
self.ignored_counter += 1
continue
if is_tag_blank(torrent['tags']):
if self.use_log:
self.tl.debug(f'Tagging new torrent: ["{torrent["name"][0:20]}..."] {torrent["tracker"]}hash: {torrent["hash"]}')
if is_protected_tracker(torrent['tracker'], self.tracker_whitelist.values()):
self.qbt_client.torrents_add_tags(self.tracker_protected_tag,torrent['hash'])
elif is_not_protected_tracker(torrent['tracker'], self.tracker_whitelist.values()):
self.qbt_client.torrents_add_tags(self.tracker_non_protected_tag,torrent['hash'])
if is_preme(torrent['seeding_time'], self.min_age):
# if torrent['added_on'] + self.minimum_age >= self.t.time():
if ispreme(torrent['added_on'], self.minimum_age, self.t.time()):
self.preme_tor_counter += 1
self.tl.debug(f'Premature torrent: ["{torrent["name"][0:20]}..."] Seconds Seeded: [{torrent["seeding_time"]}] hash: {torrent["hash"]}')
continue
elif is_protected_tracker(torrent['tracker'], self.tracker_whitelist.values()):
if is_tag_blank(torrent['tags']):
# if torrent['category'] in self.cat_whitelist.values():
if iscatignored(torrent['category'], self.cat_whitelist.values()):
self.tl.info(f'Ignored torrent:["{torrent["name"][0:20]}..."]')
self.ignored_counter += 1
continue
# if torrent['tracker'] == '':
if istrackerblank(torrent['tracker']):
if self.use_log:
self.tl.warning(f'Torrent doesn\'t have a tracker ["{torrent["name"][0:20]}..."] [{torrent["tracker"]}]hash: {torrent["hash"]}')
self.ignored_counter += 1
continue
# if torrent['tracker'].split('/')[2] in self.tracker_whitelist.values():
if isprotectedtracker(torrent['tracker'], self.tracker_whitelist.values()):
if self.use_log:
self.tl.debug(f'Protected torrent: {torrent["tracker"]}hash: {torrent["hash"]}')
# if torrent['tags'] == '':
if istagblank(torrent['tags']):
self.qbt_client.torrents_add_tags(self.tracker_protected_tag,torrent['hash'])
if self.use_log:
self.tl.debug(f'Tagging Protected torrent: ["{torrent["name"][0:20]}..."] {torrent["tracker"]}hash: {torrent["hash"]}')
self.tracker_list.append(torrent)
elif is_not_protected_tracker(torrent['tracker'], self.tracker_whitelist.values()):
if is_tag_blank(torrent['tags']):
if isnotprotectedtracker(torrent['tracker'], self.tracker_whitelist.values()):
if self.use_log:
self.tl.debug(f'Non-protected torrent: {torrent["tracker"]}hash: {torrent["hash"]}')
# if torrent['tags'] == '':
if istagblank(torrent['tags']):
self.qbt_client.torrents_add_tags(self.tracker_non_protected_tag,torrent['hash'])
if self.use_log:
self.tl.debug(f'Tagging Non-protected torrent: ["{torrent["name"][0:20]}..."] {torrent["tracker"]}hash: {torrent["hash"]}')
self.tracker_list.append(torrent)
def is_preme(seeding_time, minage):
if seeding_time <= minage:
def ispreme(added, minage, time):
if added + minage >= time:
return True
def is_cat_ignored(cat, catlist):
def iscatignored(cat, catlist):
if cat in catlist:
return True
def is_tracker_blank(tracker):
def istrackerblank(tracker):
if tracker == '':
return True
def is_protected_tracker(tracker, trackerlist):
if tracker == '':
return False
def isprotectedtracker(tracker, trackerlist):
if tracker.split('/')[2] in trackerlist:
return True
def is_not_protected_tracker(tracker, trackerlist):
if tracker == '':
return False
def isnotprotectedtracker(tracker, trackerlist):
if tracker.split('/')[2] not in trackerlist:
return True
def is_tag_blank(tag):
def istagblank(tag):
if tag == '':
return True
def is_ignored_tag(igtags, tortags):
def isignoredtag(igtags, tortags):
for igt in igtags:
if igt in tortags:
return True

87
qlogging.py Executable file → Normal file
View File

@ -1,44 +1,27 @@
def tor_log(self):
def torlog(self):
"""Setting up the log file, if self.use_log is set to true and self.loglevel is DEBUG OR INFO"""
if self.use_log:
if self.log_level.lower() == 'debug':
self.tl.basicConfig(filename=self.log_path, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.DEBUG)
elif self.log_level.lower() == 'info':
self.tl.basicConfig(filename=self.log_path, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.INFO)
elif self.log_level.lower() == 'warn':
self.tl.basicConfig(filename=self.log_path, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.WARN)
elif self.log_level.lower() == 'error':
self.tl.basicConfig(filename=self.log_path, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.ERROR)
if self.loglevel == 'DEBUG':
self.tl.basicConfig(filename=self.logpath, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.DEBUG)
elif self.loglevel == 'INFO':
self.tl.basicConfig(filename=self.logpath, format='%(asctime)s:%(levelname)s:%(message)s', encoding='utf-8', datefmt='%m/%d/%Y %I:%M:%S %p',level=self.tl.INFO)
def tor_notify(self):
def tornotify(self):
"""Seting up to use pushover, if self.use_pushover is set to true and
if valid self.po_key and self.po_token is provided in the config file"""
if self.use_pushover:
self.poc = self.po.Pushover(self.po_token)
def tor_notify_apprise(self, req_obj, app_obj):
"""Use apprise"""
body = f" Total: {self.total_torrents}\n\
Premature: {self.preme_tor_counter}\n\
Ignored: {self.ignored_counter}\n\
Protected: {self.c[self.tracker_protected_tag]}\n\
Non-protected: {self.c[self.tracker_non_protected_tag]}\n\
Orphaned: {self.up_tor_counter}\n\
Marked for deletion: {len(self.torrent_hash_delete_list)}\n\
{self.extm}"
title = "--- qbit-maid summary ---"
app_obj(req_obj, self.apprise_host, self.apprise_port, self.apprise_aurls, title, body)
self.poc = self.po.Client(self.po_key, api_token=self.po_token)
def tornotifytest(self):
"""Used to make sure tornotify is working and messages are getting to the client"""
self.poc.message(self.po_key, "Test Message", title="qbit-maid")
self.poc.send_message("Test Message", title="qbit-maid")
def process_counts(self):
def processcounts(self):
self.c = self.ct()
for item in self.tracker_list:
self.c[item["tags"]] += 1
def print_processor(self):
def printprocessor(self):
"""Print summary of torrents"""
self.tl.info(f'Total: {self.total_torrents}')
self.tl.info(f'Premature: {self.preme_tor_counter}')
@ -48,9 +31,9 @@ def print_processor(self):
self.tl.info(f'Orphaned: {self.up_tor_counter}')
self.tl.info(f'Marked for deletion: {len(self.torrent_hash_delete_list)}')
def tor_notify_summary(self):
def tornotifysummary(self):
"""Main notification method when the app is used in an automated fashion"""
self.poc.message(self.po_key, f" Total: {self.total_torrents}\n\
self.poc.send_message(f" Total: {self.total_torrents}\n\
Premature: {self.preme_tor_counter}\n\
Ignored: {self.ignored_counter}\n\
Protected: {self.c[self.tracker_protected_tag]}\n\
@ -59,38 +42,50 @@ def tor_notify_summary(self):
Marked for deletion: {len(self.torrent_hash_delete_list)}\n\
{self.extm}", title="--- qbit-maid summary ---")
def list_first_tor(self, index=0):
def getunixtimestamp(self):
"""Used for debuging and development related to unixtimestamps, not used in main script but useful"""
self.uts = self.t.time()
self.tl.info(self.uts)
def writetor(self, filepath='./torrentinfo.json'):
"""Write all torrent data to a file.
Useful for development of new features.
"""
pass
def listfirsttor(self, index=0):
"""Only lists the first torrent"""
self.tl.debug('First torrent in the list:')
torrent = self.torrent_list[index]
torrent = self.torrentlist[index]
for k,v in torrent.items():
self.tl.debug(f'{k}: {v}')
self.tl.debug('\n')
def list_qbit_api_info(self):
def listqbitapiinfo(self):
"""Writes torrent info to log file"""
self.tl.debug(f'qBittorrent: {self.qbt_client.app.version}')
self.tl.debug(f'qBittorrent Web API: {self.qbt_client.app.web_api_version}')
def torrent_count(self):
def torrentcount(self):
"""write torrent counts to log file"""
self.tl.debug(f'*** Torrents with tag["{self.tracker_protected_tag}"] {self.c[self.tracker_protected_tag]} ***')
self.tl.debug(f'*** Torrents with tag["{self.tracker_non_protected_tag}"] {self.c[self.tracker_non_protected_tag]} ***')
self.tl.debug(f'torrents that are protected {self.tracker_list.count("ipt")}')
self.tl.debug(f'torrents that aren\'t protected {self.tracker_list.count("public")}')
def get_script_runtime(self):
def torlisttags(self):
pass
def debugpremecal(self):
for torrent in self.torrentlist:
if torrent['infohash_v1'] == 'a89b484ea375094af53ce89ecbea14bf086d6284':
print(torrent["name"][0:20])
print(torrent['added_on'] + self.minimum_age >= self.t.time())
def getscriptruntime(self):
elapsed_time = self.et - self.st
if self.use_log:
self.tl.info(f'Execution time: [{elapsed_time}]')
if self.use_pushover:
self.extm = f"Execution time: [{elapsed_time}]"
if self.use_apprise:
self.extm = f"Execution time: [{elapsed_time}]"
def send_ping(self, req_obj, healthcheck_url):
try:
req_obj.get(healthcheck_url, timeout=5)
except req_obj.RequestException as e:
self.tl.info(f"Ping failed: {e}")
def debug_torrent_list(self):
self.tl.debug(self.torrent_list)
def getobjecttype(object):
print(type(object))

59
qprocess.py Executable file → Normal file
View File

@ -1,82 +1,83 @@
def tor_processor(self):
from cgitb import enable
def torprocessor(self):
"""Main logic to sort through both self.tracker_nonprotected_list and self.tracker_protected_list
If torrent meets criteria for deletion, its infohash_v1 will be appended to self.torrent_hash_delete_list
"""
for canidate in self.tracker_list:
if self.enable_telemetry:
header = ['state','ratio','tags','added','hash','name','tracker']
row = [canidate['state'],canidate['ratio'],canidate["tags"],canidate['added_on'],canidate['infohash_v1'],canidate["name"][0:20],canidate['tracker']]
write_csv(self.cv,self.telemetry_outfile,header,row)
if self.use_log:
self.tl.debug(f'---Reviewing canidate: ["{canidate["name"][0:20]}..."] {canidate["infohash_v1"]}---')
if is_downloading(canidate['state']):
# if canidate['state'] == 'downloading':
if isdownloading(canidate['state']):
if self.use_log:
self.tl.info(f'["{canidate["name"][0:20]}..."] is still downloading and will be skipped.')
continue
elif is_protected_under_ratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
# elif canidate['ratio'] < float(1.05) and self.tracker_protected_tag in canidate["tags"]:
elif isprotectedunderratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] is below a 1.05 ratio({canidate["ratio"]})')
if is_old_tor(canidate['time_active'], self.max_age):
# if canidate['added_on'] + self.age <= self.t.time():
if isoldtor(canidate['added_on'], self.age, self.t.time()):
if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] Seconds old: {canidate["time_active"]} Delta: {canidate["time_active"] - self.max_age}')
self.tl.debug(f'["{canidate["name"][0:20]}..."] Seconds old: {self.t.time() - self.age - canidate["added_on"]}')
self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion from the protected list.')
elif is_protected_over_ratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
# elif canidate['ratio'] >= float(1.05) and self.tracker_protected_tag in canidate["tags"]:
elif isprotectedoverratio(canidate['ratio'], 1.05, self.tracker_protected_tag, canidate["tags"]):
if self.use_log:
self.tl.debug(f'["{canidate["name"][0:20]}..."] is above a 1.05 ratio({canidate["ratio"]}).')
self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion from the protected list.')
elif is_not_protected_tor(self.tracker_non_protected_tag, canidate["tags"]):
# elif self.tracker_non_protected_tag in canidate["tags"]:
elif isnonprotectedtor(self.tracker_non_protected_tag, canidate["tags"]):
self.torrent_hash_delete_list.append(canidate['infohash_v1'])
if self.use_log:
self.tl.info(f'Submitted ["{canidate["name"][0:20]}..."] for deletion.')
else:
if self.enable_dragnet:
header = ['state','ratio','tags','added','thash','tname','trname']
row = [canidate['state'],canidate['ratio'],canidate["tags"],canidate['added_on'],canidate['infohash_v1'],canidate["name"][0:20],canidate['tracker']]
write_csv(self.cv,self.dragnet_outfile,header,row)
dragnet(self,canidate['state'],canidate['ratio'],canidate["tags"],canidate['added_on'],self.age,self.t.time(),canidate['infohash_v1'],canidate["name"][0:20])
self.tl.info(f'["{canidate["name"][0:20]}..."] is orphaned.')
self.up_tor_counter += 1
continue
def tor_delete_tags(self):
def tordeletetags(self):
tag_list = [self.tracker_protected_tag, self.tracker_non_protected_tag]
self.qbt_client.torrents_delete_tags(tag_list)
def tor_delete(self):
def tordelete(self):
"""Remove torrents, will also delete files, this keeps the filesystem clean.
Only pass self.torrent_hash_delete_list if you would like to keep the files."""
if self.use_log:
self.tl.debug('Hash list submitted for deletion:')
self.tl.debug(self.torrent_hash_delete_list)
if self.torrent_hash_delete_list:
self.qbt_client.torrents_delete(True, self.torrent_hash_delete_list)
self.qbt_client.torrents_delete(True, self.torrent_hash_delete_list)
def is_downloading(state):
def isdownloading(state):
if state == 'downloading':
return True
def is_protected_under_ratio(torratio, setratio, settag, tortag):
def isprotectedunderratio(torratio, setratio, settag, tortag):
if torratio < float(setratio) and settag in tortag:
return True
def is_old_tor(realage, maxage):
if realage >= maxage:
def isoldtor(toradd, setage, currenttime):
if toradd + setage <= currenttime:
return True
def is_protected_over_ratio(torratio, setratio, settag, tortag):
def isprotectedoverratio(torratio, setratio, settag, tortag):
if torratio >= float(setratio) and settag in tortag:
return True
def is_not_protected_tor(setnonprotectedtag, tortags):
def isnonprotectedtor(setnonprotectedtag, tortags):
if setnonprotectedtag in tortags:
return True
def write_csv(csv_obj,outfile,header,row):
with open(outfile, 'a+', encoding='UTF8', newline='') as f:
writer = csv_obj.writer(f)
def dragnet(self,state,ratio,tags,added,age,time,thash,tname):
header = ['state','ratio','tags','added','age','time','thash','tname']
row = [state,ratio,tags,added,age,time,thash,tname]
with open(self.dragnet_outfile, 'w', encoding='UTF8', newline='') as f:
writer = self.cv.writer(f)
if f.tell() == 0:
writer.writerow(header)
writer.writerow(row)

View File

@ -1 +0,0 @@
qbittorrent_api==2022.5.32

92
test_qbitmaid.py Executable file → Normal file
View File

@ -1,111 +1,97 @@
import unittest
import requests as r
from qlist import is_preme,is_cat_ignored,is_tracker_blank,is_protected_tracker,is_not_protected_tracker,is_tag_blank,is_ignored_tag
from qprocess import is_downloading,is_protected_under_ratio,is_old_tor,is_protected_over_ratio,is_not_protected_tor
from qlogging import send_ping
from qlist import ispreme,iscatignored,istrackerblank,isprotectedtracker,isnotprotectedtracker,istagblank,isignoredtag
from qprocess import isdownloading,isprotectedunderratio,isoldtor,isprotectedoverratio,isnonprotectedtor
class TestQbitmaid(unittest.TestCase):
def test_ispreme_sanity(self):
self.assertTrue(is_preme(1,1))
self.assertTrue(is_preme(1,2))
self.assertFalse(is_preme(2,1))
self.assertTrue(ispreme(1,1,1))
self.assertFalse(ispreme(1,1,3))
def test_ispreme(self):
pass
def test_iscatignored_sanity(self):
self.assertTrue(is_cat_ignored('a', ['a','b','c']))
self.assertTrue(is_cat_ignored('b', ['a','b','c']))
self.assertTrue(is_cat_ignored('c', ['a','b','c']))
self.assertFalse(is_cat_ignored('d', ['a','b','c']))
self.assertFalse(is_cat_ignored(1, ['a','b','d']))
self.assertFalse(is_cat_ignored(1.0000000, ['a','b','c']))
self.assertTrue(iscatignored('a', ['a','b','c']))
self.assertTrue(iscatignored('b', ['a','b','c']))
self.assertTrue(iscatignored('c', ['a','b','c']))
self.assertFalse(iscatignored('d', ['a','b','c']))
self.assertFalse(iscatignored(1, ['a','b','c']))
self.assertFalse(iscatignored(1.0000000, ['a','b','c']))
def test_iscatignored(self):
pass
def test_istrackerblank_sanity(self):
self.assertTrue(is_tracker_blank(''))
self.assertFalse(is_tracker_blank('a'))
self.assertFalse(is_tracker_blank(1))
self.assertFalse(is_tracker_blank(1.00000000))
self.assertTrue(istrackerblank(''))
self.assertFalse(istrackerblank('a'))
self.assertFalse(istrackerblank(1))
self.assertFalse(istrackerblank(1.00000000))
def test_istrackerblank(self):
pass
def test_isprotectedtracker_sanity(self):
self.assertTrue(is_protected_tracker('https://a.com/',['a.com','b.me','c.io']))
self.assertFalse(is_protected_tracker('https://google.com/',['a.com','b.me','c.io']))
self.assertFalse(is_protected_tracker('https://d.com',['a.com','b.me','c.io']))
self.assertTrue(isprotectedtracker('https://a.com/',['a.com','b.me','c.io']))
self.assertFalse(isprotectedtracker('https://google.com/',['a.com','b.me','c.io']))
self.assertFalse(isprotectedtracker('https://d.com',['a.com','b.me','c.io']))
def test_isprotectedtracker(self):
pass
def test_isnotprotectedtracker_sanity(self):
self.assertFalse(is_not_protected_tracker('https://a.com/',['a.com','b.me','c.io']))
self.assertTrue(is_not_protected_tracker('https://google.com/',['a.com','b.me','c.io']))
self.assertTrue(is_not_protected_tracker('https://d.com',['a.com','b.me','c.io']))
self.assertFalse(isnotprotectedtracker('https://a.com/',['a.com','b.me','c.io']))
self.assertTrue(isnotprotectedtracker('https://google.com/',['a.com','b.me','c.io']))
self.assertTrue(isnotprotectedtracker('https://d.com',['a.com','b.me','c.io']))
def test_isnotprotectedtracker(self):
pass
def test_istagblank(self):
self.assertTrue(is_tag_blank(''))
self.assertFalse(is_tag_blank('a'))
self.assertFalse(is_tag_blank(1))
self.assertFalse(is_tag_blank(1.0001))
self.assertFalse(is_tag_blank(False))
self.assertFalse(is_tag_blank(True))
self.assertTrue(istagblank(''))
self.assertFalse(istagblank('a'))
self.assertFalse(istagblank(1))
self.assertFalse(istagblank(1.0001))
self.assertFalse(istagblank(False))
self.assertFalse(istagblank(True))
def test_isdownloading_sanity(self):
self.assertTrue(is_downloading('downloading'))
self.assertTrue(isdownloading('downloading'))
def test_isdownloading(self):
self.assertFalse(is_downloading('DOWNLOADING'))
self.assertFalse(is_downloading('dOwNlOaDiNg'))
self.assertFalse(isdownloading('DOWNLOADING'))
self.assertFalse(isdownloading('dOwNlOaDiNg'))
def test_isprotectedunderratio_sanity(self):
self.assertTrue(is_protected_under_ratio(0.5,1,'a','a,b,c'))
self.assertTrue(isprotectedunderratio(0.5,1,'a','a,b,c'))
def test_isprotectedunderratio(self):
pass
def test_isoldtor_sanity(self):
self.assertFalse(is_old_tor(1,2))
self.assertTrue(isoldtor(1,2,4))
def test_isoldtor(self):
self.assertTrue(is_old_tor(1,1))
self.assertTrue(is_old_tor(2,1))
self.assertFalse(is_old_tor(1,2))
self.assertFalse(isoldtor(1661150664,2419200,1662049004.2101078))
self.assertFalse(isoldtor(1661150664,2419200,1662049004))
self.assertFalse(isoldtor(1661150664.000000,2419200.0000000,1662049004.2101078))
def test_isprotectedoverratio_sanity(self):
self.assertTrue(is_protected_over_ratio(2,1,'a','a,b,c'))
self.assertTrue(isprotectedoverratio(2,1,'a','a,b,c'))
def test_isprotectedoverratio(self):
pass
def test_isnonprotectedtor_sanity(self):
self.assertTrue(is_not_protected_tor('a','a,b,c'))
self.assertTrue(isnonprotectedtor('a','a,b,c'))
def test_isnonprotectedtor(self):
pass
def test_isignoredtag_sanity(self):
self.assertTrue(is_ignored_tag(['a','b','c'], 'first,second,third,a'))
self.assertTrue(isignoredtag(['a','b','c'], 'first,second,third,a'))
def test_isignoredtag(self):
self.assertTrue(is_ignored_tag(['save'], 'save,public,ipt'))
def test_sendpingstart_sanity(self):
send_ping(self, r, "https://thunder.jonb.io/ping/921625e5-5b76-4f45-a0c3-56145e16f3bb" + "/start")
url = "https://thunder.jonb.io/ping/921625e5-5b76-4f45-a0c3-56145e16f3bb"
send_ping(self, r, url)
def test_sendping_start(self):
url = "https://thunder.jonb.io/ping/921625e5-5b76-4f45-a0c3-56145e16f3bb/"
send_ping(self, r, url.strip("/") + "/start")
send_ping(self, r, "https://thunder.jonb.io/ping/921625e5-5b76-4f45-a0c3-56145e16f3bb")
def test_isignoredtag_sanity(self):
self.assertTrue(isignoredtag(['a','b','c'], 'first,second,third,a'))
# def test__sanity(self):
# pass

View File

@ -1,41 +0,0 @@
from qprocess import write_csv
import csv
import unittest
import os
# Torrent Items needed for testing
# canidate['state'] {canidate["ratio"]} {torrent["tags"]} torrent["seeding_time"] {torrent["category"]} {torrent["name"][0:20]} {canidate["time_active"]}
class TestWriteCSV(unittest.TestCase):
def test_write_csv_dragnet(self):
self.cv = csv
outfile = './test_dragnet_outfile.csv'
state = 'downloading'
ratio = 1.05
tags = 'ipt'
added = 1
age = 240000
time = 123456
thash = 'asfasdf23412adfqwer'
tname = 'thisismynamehahahah'
trname = 'https://localhost.stackoverflow.tech/317332f1c125bc9c1b9b14fb8e054908/announce'
header = ['state','ratio','tags','added','age','time','thash','tname','trname']
row = [state,ratio,tags,added,age,time,thash,tname,trname]
write_csv(self.cv,outfile,header,row)
self.assertTrue(os.path.exists(outfile))
def test_write_csv_telemetry(self):
self.cv = csv
outfile = './test_telemetry_outfile.csv'
state = 'downloading'
ratio = 1.05
tags = 'ipt'
added = 1
thash = 'asfasdf23412adfqwer'
tname = 'thisismynamehahahah'
trname = 'https://localhost.stackoverflow.tech/317332f1c125bc9c1b9b14fb8e054908/announce'
header = ['state','ratio','tags','added','hash','name','tracker']
row = [state,ratio,tags,added,thash,tname,trname]
write_csv(self.cv,outfile,header,row)
self.assertTrue(os.path.exists(outfile))
if __name__ == '__main__':
unittest.main()