Initial commit, win32 and darwin support only

This commit is contained in:
Kat Inskip 2023-09-08 17:07:20 -07:00 committed by Kat Inskip
commit 40df439299
Signed by: kat
GPG key ID: 465E64DECEA8CF0F
14 changed files with 527 additions and 0 deletions

160
.gitignore vendored Normal file
View file

@ -0,0 +1,160 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

3
README.md Normal file
View file

@ -0,0 +1,3 @@
# konawall-py
A rewrite of konawall-rs in Python so that image manipulation is easier and cross-platform support is sort-of easier.

0
__init__.py Normal file
View file

14
custom_errors.py Normal file
View file

@ -0,0 +1,14 @@
class RequestFailed(Exception):
"Raised when a request fails."
def __init__(self, status_code: int):
self.status_code = status_code
self.message = f"Request failed with status code {self.status_code}"
super().__init__(self.message)
class UnsupportedPlatform(Exception):
"Raised when the platform is not supported."
def __init__(self, message: str):
self.message = message
super().__init__(self.message)

12
custom_print.py Normal file
View file

@ -0,0 +1,12 @@
import termcolor
"""
Print a key-value pair with a key and value coloured differently.
:param key: The key to print
:param value: The value to print
:param newline: Whether to print a newline after the value
:returns: None
"""
def kv_print(key: str, value: str, newline: bool = False) -> None:
print(termcolor.colored(key, "cyan") + ": " + termcolor.colored(value, "white"), end="\n" if newline else " ")

31
downloader.py Normal file
View file

@ -0,0 +1,31 @@
import logging
import tempfile
import requests
from custom_print import kv_print
"""
Download files given a list of URLs
:param files: A list of URLs to download from
"""
def download_files(files: list) -> list:
logging.debug(f"download_posts() called with files=[{', '.join(files)}]")
# Keep a list of downloaded files
downloaded_files: list = []
# Download the images
for i, url in enumerate(files):
logging.debug(f"Downloading {url}")
# Get the image data
image = requests.get(url)
# Create a temporary file to store the image
image_file = tempfile.NamedTemporaryFile(delete=False)
logging.debug(f"Created temporary file {image_file.name}")
# Write the image data to the file
image_file.write(image.content)
# Close the file
image_file.close()
# Give the user data about the downloaded image
kv_print(f"Image {str(i+1)}", image_file.name, newline=True)
# Add the file to the list of downloaded files
downloaded_files.append(image_file.name)
return downloaded_files

60
environment.py Normal file
View file

@ -0,0 +1,60 @@
import sys
import os
import logging
from custom_errors import UnsupportedPlatform
from module_loader import environment_handlers
"""
This detects the DE/WM from the Linux environment because it's not provided by the platform
"""
def detect_linux_environment():
if os.environ.get("SWAYSOCK"):
return "sway"
return_unmodified_if_these = [
# TODO: implement
"gnome", # dconf
"cinnamon", # dconf
"mate", # dconf
"deepin", # dconf
"xfce4", # xfconf
"lxde", # pcmanfm
"kde", # qdbus
]
modified_mapping = {
"fluxbox": "feh",
"blackbox": "feh",
"openbox": "feh",
"i3": "feh",
"ubuntustudio": "kde",
"ubuntu": "gnome",
"lubuntu": "lxde",
"xubuntu": "xfce4",
"kubuntu": "kde",
"ubuntugnome": "gnome",
}
desktop_session = os.environ.get("DESKTOP_SESSION")
if desktop_session in return_unmodified_if_these:
return desktop_session
elif desktop_session in modified_mapping:
return modified_mapping[desktop_session]
else:
UnsupportedPlatform(f"Desktop session {desktop_session} is not supported, sorry!")
def detect_environment():
if sys.platform == "linux":
environment = detect_linux_environment()
logging.info(f"Detected environment is {environment} running on Linux")
else:
environment = sys.platform
logging.info(f"Detected environment is {environment}")
return environment
"""
This sets wallpapers on any platform, as long as it is supported.
"""
def set_environment_wallpapers(environment: str, files: list):
if environment in environment_handlers:
environment_handlers[f"{environment}_setter"](files)
logging.info("Wallpapers set!")
else:
UnsupportedPlatform(f"Environment {environment} is not supported, sorry!")

13
environments/darwin.py Normal file
View file

@ -0,0 +1,13 @@
import subprocess
from module_loader import add_environment
"""
This sets wallpapers on Darwin.
:param files: A list of files to set as wallpapers
"""
@add_environment("darwin_setter")
def set_wallpapers(files: list, displays: list):
for i, file in enumerate(files):
# Run osascript to set the wallpaper for each monitor
subprocess.run(["osascript", "-e", f'tell application "System Events" to set picture of desktop {i} file "{file}"'])

28
environments/win32.py Normal file
View file

@ -0,0 +1,28 @@
import os
import ctypes
import logging
from imager import combine_to_viewport
from module_loader import add_environment
"""
Pre-setting on Windows
"""
@add_environment("win32_init")
def init():
os.system("color")
logging.info("Initialized for a Windows environment")
"""
This sets wallpapers on Windows.
:param files: A list of files to set as wallpapers
"""
@add_environment("win32_setter")
def set_wallpapers(files: list, displays: list):
if len(files) > 1:
logging.info("Several monitors detected, going the hard route")
file = combine_to_viewport(displays, files)
ctypes.windll.user32.SystemParametersInfoW(20, 0, file, 0)
else:
logging.info("Detected only one monitor, setting wallpaper simply")
ctypes.windll.user32.SystemParametersInfoW(20, 0, files[0] , 0)

17
imager.py Normal file
View file

@ -0,0 +1,17 @@
import tempfile
import logging
from PIL import Image
def combine_to_viewport(displays: list, files: list):
# Create an image that is the size of the combined viewport, with offsets for each display
max_width = max([display.x + display.width for display in displays])
max_height = max([display.y + display.height for display in displays])
combined = Image.new("RGB", (max_width, max_height))
for i, file in enumerate(files):
open_image = Image.open(file, "r")
resized_image = open_image.resize((displays[i].width, displays[i].height))
combined.paste(resized_image, (displays[i].x, displays[i].y))
file = tempfile.NamedTemporaryFile(delete=False)
logging.info(f"Created temporary file {file.name} to save combined viewport image into")
combined.save(file.name, format="PNG")
return file

55
main.py Normal file
View file

@ -0,0 +1,55 @@
import os
import logging
import argparse
import screeninfo
from environment import set_environment_wallpapers, detect_environment
from module_loader import import_dir, environment_handlers, source_handlers
from imager import combine_to_viewport
def main():
parser = argparse.ArgumentParser(
prog="konawall",
description="Set wallpapers from various sources on various platforms",
epilog="If you need help with this, I'm @floofywitch on Discord and Telegram. ^^;"
)
parser.add_argument("-v", "--verbose", help="increase output verbosity", action="store_true")
parser.add_argument("-e", "--environment", help="override the environment detection", type=str)
parser.add_argument("-s", "--source", help="override the source provider", type=str, default="konachan")
parser.add_argument("-c", "--count", help="override the number of wallpapers to fetch", type=int)
parser.add_argument("tags", nargs="+")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.WARNING)
logging.debug(f"Called with args={args}")
import_dir(os.path.join(os.path.dirname(os.path.abspath( __file__ )), "sources"))
logging.info(f"Loaded source handlers: {', '.join(source_handlers)}")
import_dir(os.path.join(os.path.dirname(os.path.abspath( __file__ )), "environments"))
logging.info(f"Loaded environment handlers: {', '.join(environment_handlers)}")
environment = detect_environment()
environment_handlers[environment + "_init"]()
displays = screeninfo.get_monitors()
if not args.count:
count = len(displays)
else:
count = args.count
files = source_handlers[args.source](count, args.tags)
if not args.environment:
set_environment_wallpapers(environment, files, displays)
else:
environment_handlers[args.environment](files, displays)
logging.info("Wallpapers set!")
if __name__ == "__main__":
main()

69
module_loader.py Normal file
View file

@ -0,0 +1,69 @@
import imp
import os
import re
import inspect
import logging
global environment_handlers
global source_handlers
environment_handlers = {}
source_handlers = {}
"""
This finds all modules in a directory
:param path: The path to the directory
:returns: A set of modules in the directory
"""
def modules_in_dir(path: str) -> set:
result = set()
for entry in os.listdir(path):
if os.path.isfile(os.path.join(path, entry)):
matches = re.search("(.+\.py)$", entry)
if matches:
result.add(matches.group(0))
return result
"""
This automatically loads all modules in a directory
:param path: The path to the directory
"""
def import_dir(path: str):
for filename in sorted(modules_in_dir(path)):
search_path = os.path.join(os.getcwd(), path)
module_name, _ = os.path.splitext(filename)
fp, path_name, description = imp.find_module(module_name, [search_path,])
imp.load_module(module_name, fp, path_name, description)
"""
This provides a dynamic way to load environment handlers through a decorator
:param environment: The name of the environment
:returns: A function for decoration
"""
def add_environment(environment: str) -> callable:
# Get the current frame
frame = inspect.stack()[1]
# From the current frame, extract the relative path to the file
path = frame[0].f_code.co_filename
def wrapper(function):
environment_handlers[environment] = function
logging.info(f"Loaded environment handler {environment} from {path}")
return wrapper
"""
This provides a dynamic way to load wallpaper sources through a decorator
:param source: The name of the source
:returns: A function for decoration
"""
def add_source(source: str) -> callable:
# Get the current frame
frame = inspect.stack()[1]
# From the current frame, extract the relative path to the file
path = frame[0].f_code.co_filename
def wrapper(function):
source_handlers[source] = function
logging.info(f"Loaded wallpaper source {source} from {path}")
return wrapper

4
requirements.txt Normal file
View file

@ -0,0 +1,4 @@
Pillow
screeninfo
requests
termcolor

61
sources/konachan.py Normal file
View file

@ -0,0 +1,61 @@
import requests
import logging
from custom_print import kv_print
from custom_errors import RequestFailed
from module_loader import add_source
from downloader import download_files
"""
Turn a list of tags and a count into a list of URLs to download from
:param count: The number of images to provide download URLs for
:param user_tags: A list of tags to search for
:returns: A list of URLs to download from
"""
def request_posts(count: int, tags: list) -> list:
logging.debug(f"request_posts() called with count={count}, tags=[{', '.join(tags)}]")
# Make sure we get a different result every time by using "order:random" as a tag
tags.append("order:random")
# Tags are separated by a plus sign for this API
tag_string: str = "+".join(tags)
# Request URL for getting posts from the API
url: str = f"https://konachan.com/post.json?limit={str(count)}&tags={tag_string}"
logging.debug(f"Request URL: {url}")
response = requests.get(url, headers={"User-Agent": "konachan-py/alpha"})
# Check if the request was successful
logging.debug("Status code: " + str(response.status_code))
# List of URLs to download
post_urls: list = []
if response.status_code == 200:
# Get the JSON data from the response
json = response.json()
for post in json:
# Give the user data about the post retrieved
kv_print("Post ID", post["id"])
kv_print("Author", post["author"])
kv_print("Rating", post["rating"])
kv_print("Resolution", f"{post['width']}x{post['height']}", newline=True)
kv_print("Tags", post["tags"], newline=True)
kv_print("URL", post["file_url"], newline=True)
# Append the URL to the list
post_urls.append(post["file_url"])
else:
# Raise an exception if the request failed
RequestFailed(response.status_code)
return post_urls
"""
Download a number of images from Konachan given a list of tags and a count
:param count: The number of images to download
:param tags: A list of tags to search for
"""
@add_source("konachan")
def handle(count: int, tags: list) -> list:
logging.debug(f"handle_konachan() called with count={count}, tags=[{', '.join(tags)}]")
# Get a list of URLs to download
post_urls: list = request_posts(count, tags)
# Download the images
files = download_files(post_urls)
# Return the downloaded files
return files