Rearchitecture the project for Poetry support.

This commit is contained in:
Kat Inskip 2023-10-06 16:07:16 -07:00
parent 6cc517c163
commit 4ae3ddc2c0
16 changed files with 603 additions and 47 deletions

0
konawall/__init__.py Normal file
View file

65
konawall/cli.py Normal file
View file

@ -0,0 +1,65 @@
import os
import sys
import logging
import argparse
import screeninfo
from environment import set_environment_wallpapers, detect_environment
from module_loader import import_dir, environment_handlers, source_handlers
from imager import combine_to_viewport
def main():
parser = argparse.ArgumentParser(
prog="konawall",
description="Set wallpapers from various sources on various platforms",
epilog="If you need help with this, I'm @floofywitch on Discord and Telegram. ^^;"
)
parser.add_argument("-v", "--verbose", help="increase output verbosity", action="store_true")
parser.add_argument("-e", "--environment", help="override the environment detection", type=str)
parser.add_argument("-s", "--source", help="override the source provider", type=str, default="konachan")
parser.add_argument("-c", "--count", help="override the number of wallpapers to fetch", type=int)
parser.add_argument("-q", "--quiet", help="silence all output", action="store_true")
parser.add_argument("tags", nargs="+")
args = parser.parse_args()
if args.quiet:
sys.stdout = open(os.devnull, "w")
if args.verbose:
log_level = logging.DEBUG
else:
log_level = logging.INFO
logging.basicConfig(
level=log_level,
filename="app.log",
filemode="a",
format="%(asctime)s - %(levelname)s - %(message)s"
)
logging.debug(f"Called with args={args}")
import_dir(os.path.join(os.path.dirname(os.path.abspath( __file__ )), "sources"))
logging.debug(f"Loaded source handlers: {', '.join(source_handlers)}")
import_dir(os.path.join(os.path.dirname(os.path.abspath( __file__ )), "environments"))
logging.debug(f"Loaded environment handlers: {', '.join(environment_handlers)}")
environment = detect_environment()
environment_handlers[environment + "_init"]()
displays = screeninfo.get_monitors()
if not args.count:
count = len(displays)
else:
count = args.count
files = source_handlers[args.source](count, args.tags)
if not args.environment:
set_environment_wallpapers(environment, files, displays)
else:
environment_handlers[f"{args.environment}_setter"](files, displays)
logging.debug("Wallpapers set!")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,10 @@
seconds = 300
automatic = true
source = "konachan"
tags = [
"rating:s"
]
[logging]
file = "INFO"
console = "DEBUG"

14
konawall/custom_errors.py Normal file
View file

@ -0,0 +1,14 @@
class RequestFailed(Exception):
"Raised when a request fails."
def __init__(self, status_code: int):
self.status_code = status_code
self.message = f"Request failed with status code {self.status_code}"
super().__init__(self.message)
class UnsupportedPlatform(Exception):
"Raised when the platform is not supported."
def __init__(self, message: str):
self.message = message
super().__init__(self.message)

14
konawall/custom_print.py Normal file
View file

@ -0,0 +1,14 @@
import logging
import termcolor
"""
Print a key-value pair with a key and value.
:param key: The key to print
:param value: The value to print
:param level: The logging level to print at
:returns: None
"""
def kv_print(key: str, value: str, level: str = "INFO") -> None:
logger = getattr(logging, level.lower())
logger(f"{key}: {value}")

31
konawall/downloader.py Normal file
View file

@ -0,0 +1,31 @@
import logging
import tempfile
import requests
from custom_print import kv_print
"""
Download files given a list of URLs
:param files: A list of URLs to download from
"""
def download_files(files: list) -> list:
logging.debug(f"download_posts() called with files=[{', '.join(files)}]")
# Keep a list of downloaded files
downloaded_files: list = []
# Download the images
for i, url in enumerate(files):
logging.debug(f"Downloading {url}")
# Get the image data
image = requests.get(url)
# Create a temporary file to store the image
image_file = tempfile.NamedTemporaryFile(delete=False)
logging.debug(f"Created temporary file {image_file.name}")
# Write the image data to the file
image_file.write(image.content)
# Close the file
image_file.close()
# Give the user data about the downloaded image
kv_print(f"Image {str(i+1)}", image_file.name)
# Add the file to the list of downloaded files
downloaded_files.append(image_file.name)
return downloaded_files

60
konawall/environment.py Normal file
View file

@ -0,0 +1,60 @@
import sys
import os
import logging
from custom_errors import UnsupportedPlatform
from module_loader import environment_handlers
"""
This detects the DE/WM from the Linux environment because it's not provided by the platform
"""
def detect_linux_environment():
if os.environ.get("SWAYSOCK"):
return "sway"
return_unmodified_if_these = [
# TODO: implement
"gnome", # dconf
"cinnamon", # dconf
"mate", # dconf
"deepin", # dconf
"xfce4", # xfconf
"lxde", # pcmanfm
"kde", # qdbus
]
modified_mapping = {
"fluxbox": "feh",
"blackbox": "feh",
"openbox": "feh",
"i3": "feh",
"ubuntustudio": "kde",
"ubuntu": "gnome",
"lubuntu": "lxde",
"xubuntu": "xfce4",
"kubuntu": "kde",
"ubuntugnome": "gnome",
}
desktop_session = os.environ.get("DESKTOP_SESSION")
if desktop_session in return_unmodified_if_these:
return desktop_session
elif desktop_session in modified_mapping:
return modified_mapping[desktop_session]
else:
UnsupportedPlatform(f"Desktop session {desktop_session} is not supported, sorry!")
def detect_environment():
if sys.platform == "linux":
environment = detect_linux_environment()
logging.debug(f"Detected environment is {environment} running on Linux")
else:
environment = sys.platform
logging.debug(f"Detected environment is {environment}")
return environment
"""
This sets wallpapers on any platform, as long as it is supported.
"""
def set_environment_wallpapers(environment: str, files: list, displays: list):
if f"{environment}_setter" in environment_handlers:
environment_handlers[f"{environment}_setter"](files, displays)
logging.debug("Wallpapers set!")
else:
UnsupportedPlatform(f"Environment {environment} is not supported, sorry!")

View file

@ -0,0 +1,14 @@
import subprocess
from module_loader import add_environment
"""
This sets wallpapers on Darwin.
:param files: A list of files to set as wallpapers
"""
@add_environment("darwin_setter")
def set_wallpapers(files: list, displays: list):
for i, file in enumerate(files):
# Run osascript to set the wallpaper for each monitor
command = f'tell application "System Events" to set picture of desktop {i} to "{file}"'
subprocess.run(["osascript", "-e", command])

View file

@ -0,0 +1,35 @@
import os
import ctypes
import logging
from imager import combine_to_viewport
from module_loader import add_environment
"""
Pre-setting on Windows
"""
@add_environment("win32_init")
def init():
os.system("color")
logging.debug("Initialized for a Windows environment")
"""
This sets wallpapers on Windows.
:param files: A list of files to set as wallpapers
"""
@add_environment("win32_setter")
def set_wallpapers(files: list, displays: list):
import winreg
if len(files) > 1:
logging.debug("Several monitors detected, going the hard route")
desktop = winreg.OpenKey(winreg.HKEY_CURRENT_USER, "Control Panel\\Desktop", 0, winreg.KEY_ALL_ACCESS)
wallpaper_style = winreg.SetValueEx(desktop, "WallpaperStyle", 0, winreg.REG_SZ, "5")
desktop.Close()
file = combine_to_viewport(displays, files)
ctypes.windll.user32.SystemParametersInfoW(20, 0, file, 0)
else:
logging.debug("Detected only one monitor, setting wallpaper simply")
desktop = winreg.OpenKey(winreg.HKEY_CURRENT_USER, "Control Panel\\Desktop", 0, winreg.KEY_ALL_ACCESS)
wallpaper_style = winreg.SetValueEx(desktop, "WallpaperStyle", 0, winreg.REG_SZ, "3")
desktop.Close()
ctypes.windll.user32.SystemParametersInfoW(20, 0, files[0] , 0)

334
konawall/gui.py Normal file
View file

@ -0,0 +1,334 @@
import wx
import wx.adv
import tempfile
from PIL import Image, ImageDraw
import os
import sys
import logging
import screeninfo
import tomllib
import subprocess
import importlib.metadata
from environment import set_environment_wallpapers, detect_environment
from module_loader import import_dir, environment_handlers, source_handlers
from custom_print import kv_print
from humanfriendly import format_timespan
class Konawall(wx.adv.TaskBarIcon):
def __init__(self, version, file_logger):
# Prevents it from closing before it has done any work on macOS
if wx.Platform == "__WXMAC__":
self.hidden_frame = wx.Frame(None)
self.hidden_frame.Hide()
self.wallpaper_rotation_counter = 0
self.file_logger = file_logger
self.version = version
self.title_string = f"Konawall - {version}"
self.description_string = "A hopefully cross-platform service for fetching wallpapers and setting them."
self.loaded_before = False
# Call the super function, make sure that the type is the statusitem for macOS
wx.adv.TaskBarIcon.__init__(self, wx.adv.TBI_CUSTOM_STATUSITEM)
# Detect environment and timer settings
self.environment = detect_environment()
self.toggle_wallpaper_rotation_menu_item = None
self.wallpaper_rotation_timer = wx.Timer(self, wx.ID_ANY)
# Reload (actually load) the config and modules.
self.reload_config()
self.import_modules()
# Start the timer to run every second
self.wallpaper_rotation_timer.Start(1000)
# Set up the taskbar icon, menu, bindings, ...
icon = self.generate_icon()
self.SetIcon(icon, self.title_string)
self.create_menu()
self.create_bindings()
# Run the first time, manually
self.rotate_wallpapers(None)
# wxPython requires a wx.Bitmap, so we generate one from a PIL.Image
def generate_icon(self):
width = 128
height = 128
# Missing texture style, magenta and black checkerboard
image = Image.new('RGB', (width, height), (0, 0, 0))
dc = ImageDraw.Draw(image)
dc.rectangle((0, 0, width//2, height//2), fill=(255, 0, 255))
dc.rectangle((width//2, height//2, width, height), fill=(255, 0, 255))
if "wxMSW" in wx.PlatformInfo:
image = image.resize((16, 16))
elif "wxGTK" in wx.PlatformInfo:
image = image.rescale((22, 22))
# Write image to temporary file
temp = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
image.save(temp.name)
# Convert to wxPython icon
icon = wx.Icon()
icon.CopyFromBitmap(wx.Bitmap(temp.name))
return icon
def toggle_timed_wallpaper_rotation_status(self):
return f"{'Dis' if self.rotate_wallpapers else 'En'}able Timer"
# Load in our source and environment handlers
def import_modules(self):
import_dir(os.path.join(os.path.dirname(os.path.abspath( __file__ )), "sources"))
kv_print("Loaded source handlers", ", ".join(source_handlers), level="debug")
import_dir(os.path.join(os.path.dirname(os.path.abspath( __file__ )), "environments"))
kv_print("Loaded environment handlers", ", ".join(environment_handlers), level="debug")
# Load a TOML file's key-value pairs into our class
def load_config(self):
if os.path.isfile("config.toml"):
# If the config file exists, load it as a dictionary into the config variable.
with open("config.toml", "rb") as f:
config = tomllib.load(f)
# for every key-value pair in the config variable , set the corresponding attribute of our class to it
for k, v in config.items():
kv_print(f"Loaded {k}", v)
setattr(self, k, v)
else:
# If there is no config file, get complainy.
dialog = wx.MessageDialog(
None,
"No config file found, using defaults.",
self.title_string,
wx.OK|wx.ICON_INFORMATION
)
dialog.ShowModal()
dialog.Destroy()
# Set some arbitrary defaults
self.rotate_wallpapers = True
self.interval = 10*60
self.tags = ["rating:s"]
self.logging = {}
self.logging["file"] = "INFO"
# Create the menu
def create_menu(self):
# Make it easier to define menu items
def create_menu_item(menu, label, func, help="", kind=wx.ITEM_NORMAL):
item = wx.MenuItem(menu, wx.ID_ANY, label)
menu.Bind(wx.EVT_MENU, func, id=item.GetId())
menu.Append(item)
return item
def create_info_item(menu, label, help=""):
item = wx.MenuItem(menu, wx.ID_ANY, label)
item.Enable(False)
menu.Append(item)
return item
def create_separator(menu):
item = wx.MenuItem(menu, id=wx.ID_SEPARATOR, kind=wx.ITEM_SEPARATOR)
menu.Append(item)
return item
# Create our Menu object
self.menu = wx.Menu()
# Program header
self.header_menu_item = create_menu_item(
self.menu,
self.title_string,
lambda event: self.create_message_dialog(f"{self.description_string}\n\nIf you need help with this, I'm @floofywitch on Discord and Telegram. ^^;"),
self.description_string,
)
create_separator(self.menu)
self.current_interval_menu_item = create_info_item(self.menu, f"Interval: {format_timespan(self.interval)}")
# Time remaining for automatic wallpaper rotation
self.timed_wallpaper_rotation_status_menu_item = create_info_item(self.menu, "Automatic wallpaper rotation disabled")
create_separator(self.menu)
# Change wallpapers
create_menu_item(
self.menu,
"Rotate Wallpapers",
self.rotate_wallpapers,
"Fetch new wallpapers and set them as your wallpapers"
)
# Toggle automatic wallpaper rotation
self.toggle_wallpaper_rotation_menu_item = create_menu_item(
self.menu,
self.toggle_timed_wallpaper_rotation_status(),
self.toggle_timed_wallpaper_rotation,
"Toggle the automatic wallpaper rotation timer"
)
create_separator(self.menu)
# Interactive config editing
create_menu_item(
self.menu,
"Edit Config",
self.edit_config_menu_item,
"Interactively edit the config file"
)
create_menu_item(
self.menu,
"Reload Config",
self.reload_config_menu_item,
"Reload the config file from disk"
)
# Exit
create_menu_item(
self.menu,
"Exit",
self.close_program_menu_item,
"Quit the application"
)
def close_program_menu_item(self, event):
wx.Exit()
# Interactively edit the config file
def edit_config_menu_item(self, event):
kv_print("User is editing", "config.toml")
# Check if we're on Windows, if so use Notepad
if sys.platform == "win32":
# I don't even know how to detect the default editor on Windows
subprocess.call("notepad.exe config.toml")
else:
# Open config file in default editor
subprocess.call(f"{os.environ['SHELL']} {os.environ['EDITOR']} config.toml")
# When file is done being edited, reload config
kv_print("User has edited", "config.toml")
self.reload_config()
# Reload the application
def reload_config(self):
kv_print(f"{'Rel' if self.loaded_before else 'L'}oading config from", "config.toml")
self.load_config()
# Handle finding the log level
if "file" in self.logging:
file_log_level = getattr(logging, self.logging["file"])
else:
file_log_level = logging.INFO
self.file_logger.setLevel(file_log_level)
if self.loaded_before == True:
# If we're reloading, we need to make sure the timer and menu item reflect our current state.
self.respect_timed_wallpaper_rotation_toggle()
self.respect_current_interval_status()
self.create_message_dialog("Config reloaded.")
# Finished loading
self.loaded_before = True
def reload_config_menu_item(self, event):
self.reload_config()
def create_message_dialog(self, message):
dialog = wx.MessageDialog(
None,
message,
self.title_string,
wx.OK|wx.ICON_INFORMATION
)
# Set the icon of the dialog to the same as the taskbar icon
dialog.ShowModal()
dialog.Destroy()
# Update the menu item of the current interval display to read correctly
def respect_current_interval_status(self):
self.current_interval_menu_item.SetItemLabel(f"Rotation interval: {format_timespan(self.interval)}")
# Set whether to rotate wallpapers automatically or not
def toggle_timed_wallpaper_rotation(self, event):
self.rotate_wallpapers = not self.rotate_wallpapers
self.respect_timed_wallpaper_rotation_toggle()
# Update the timer and the menu item to reflect our current state
def respect_timed_wallpaper_rotation_toggle(self):
if self.rotate_wallpapers and not self.wallpaper_rotation_timer.IsRunning():
self.wallpaper_rotation_timer.Start(1000)
elif not self.rotate_wallpapers and self.wallpaper_rotation_timer.IsRunning():
self.wallpaper_rotation_timer.Stop()
# Set the time left counter to show that it is disabled
self.timed_wallpaper_rotation_status_menu_item.SetItemLabel(f"Automatic wallpaper rotation disabled")
# Update the menu item for the toggle
self.toggle_wallpaper_rotation_menu_item.SetItemLabel(self.toggle_timed_wallpaper_rotation_status())
# Update wallpaper rotation time left counter
def respect_timed_wallpaper_rotation_status(self):
self.timed_wallpaper_rotation_status_menu_item.SetItemLabel(f"Next rotation: {format_timespan(self.interval - self.wallpaper_rotation_counter)} remaining")
# Perform the purpose of the application; get new wallpaper media and set 'em.
def rotate_wallpapers(self, event):
displays = screeninfo.get_monitors()
count = len(displays)
files = source_handlers[self.source](count, self.tags)
set_environment_wallpapers(self.environment, files, displays)
# For macOS
def CreatePopupMenu(self):
self.PopupMenu(self.menu)
# For everybody else who has bindable events
def show_popup_menu(self, event):
self.PopupMenu(self.menu)
# Every second, check if the wallpaper rotation timer has ticked over
def handle_timer_tick(self, event):
if self.wallpaper_rotation_counter >= self.interval:
# If it has, run the fetch and set mechanism
self.rotate_wallpapers(None)
self.wallpaper_rotation_counter = 0
else:
self.wallpaper_rotation_counter += 1
# Update the time left counter
self.respect_timed_wallpaper_rotation_status()
# When the user clicks on the taskbar icon or menu item, run the fetch and set mechanism
# then reset the wallpaper rotation timer
def handle_manual_wallpaper_rotation(self, event):
self.rotate_wallpapers(None)
self.wallpaper_rotation_counter = 0
# Bind application events
def create_bindings(self):
self.Bind(wx.adv.EVT_TASKBAR_LEFT_DOWN, self.handle_manual_wallpaper_rotation)
self.Bind(wx.adv.EVT_TASKBAR_RIGHT_DOWN, self.show_popup_menu)
# Implement the wallpaper rotation timer
self.Bind(wx.EVT_TIMER, self.handle_timer_tick, self.wallpaper_rotation_timer)
def main():
try:
version = f'v{importlib.metadata.version("konawall-py")}'
except:
version = "testing version"
file_logger = logging.FileHandler("app.log", mode="a")
#console_logger = logging.StreamHandler()
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
#console_logger,
file_logger,
]
)
app = wx.App(redirect=False)
app.SetExitOnFrameDelete(False)
Konawall(version, file_logger)
app.MainLoop()
if __name__ == "__main__":
main()

17
konawall/imager.py Normal file
View file

@ -0,0 +1,17 @@
import tempfile
import logging
from PIL import Image
def combine_to_viewport(displays: list, files: list):
# Create an image that is the size of the combined viewport, with offsets for each display
max_width = max([display.x + display.width for display in displays])
max_height = max([display.y + display.height for display in displays])
combined = Image.new("RGB", (max_width, max_height))
for i, file in enumerate(files):
open_image = Image.open(file, "r")
resized_image = open_image.resize((displays[i].width, displays[i].height))
combined.paste(resized_image, (displays[i].x, displays[i].y))
file = tempfile.NamedTemporaryFile(delete=False)
logging.debug(f"Created temporary file {file.name} to save combined viewport image into")
combined.save(file.name, format="PNG")
return file

70
konawall/module_loader.py Normal file
View file

@ -0,0 +1,70 @@
import imp
import os
import re
import inspect
import logging
from custom_print import kv_print
global environment_handlers
global source_handlers
environment_handlers = {}
source_handlers = {}
"""
This finds all modules in a directory
:param path: The path to the directory
:returns: A set of modules in the directory
"""
def modules_in_dir(path: str) -> set:
result = set()
for entry in os.listdir(path):
if os.path.isfile(os.path.join(path, entry)):
matches = re.search("(.+\.py)$", entry)
if matches:
result.add(matches.group(0))
return result
"""
This automatically loads all modules in a directory
:param path: The path to the directory
"""
def import_dir(path: str):
for filename in sorted(modules_in_dir(path)):
search_path = os.path.join(os.getcwd(), path)
module_name, _ = os.path.splitext(filename)
fp, path_name, description = imp.find_module(module_name, [search_path,])
imp.load_module(module_name, fp, path_name, description)
"""
This provides a dynamic way to load environment handlers through a decorator
:param environment: The name of the environment
:returns: A function for decoration
"""
def add_environment(environment: str) -> callable:
# Get the current frame
frame = inspect.stack()[1]
# From the current frame, extract the relative path to the file
path = frame[0].f_code.co_filename
def wrapper(function):
environment_handlers[environment] = function
kv_print(f"Loaded environment handler {environment} from", path, level="debug")
return wrapper
"""
This provides a dynamic way to load wallpaper sources through a decorator
:param source: The name of the source
:returns: A function for decoration
"""
def add_source(source: str) -> callable:
# Get the current frame
frame = inspect.stack()[1]
# From the current frame, extract the relative path to the file
path = frame[0].f_code.co_filename
def wrapper(function):
source_handlers[source] = function
kv_print(f"Loaded wallpaper source {source} from", path, level="debug")
return wrapper

View file

@ -0,0 +1,62 @@
import requests
import logging
from custom_print import kv_print
from custom_errors import RequestFailed
from module_loader import add_source
from downloader import download_files
"""
Turn a list of tags and a count into a list of URLs to download from
:param count: The number of images to provide download URLs for
:param user_tags: A list of tags to search for
:returns: A list of URLs to download from
"""
def request_posts(count: int, tags: list) -> list:
logging.debug(f"request_posts() called with count={count}, tags=[{', '.join(tags)}]")
# Make sure we get a different result every time by using "order:random" as a tag
if "order:random" not in tags:
tags.append("order:random")
# Tags are separated by a plus sign for this API
tag_string: str = "+".join(tags)
# Request URL for getting posts from the API
url: str = f"https://konachan.com/post.json?limit={str(count)}&tags={tag_string}"
logging.debug(f"Request URL: {url}")
response = requests.get(url, headers={"User-Agent": "konachan-py/alpha"})
# Check if the request was successful
logging.debug("Status code: " + str(response.status_code))
# List of URLs to download
post_urls: list = []
if response.status_code == 200:
# Get the JSON data from the response
json = response.json()
for post in json:
# Give the user data about the post retrieved
kv_print("Post ID", post["id"])
kv_print("Author", post["author"])
kv_print("Rating", post["rating"])
kv_print("Resolution", f"{post['width']}x{post['height']}")
kv_print("Tags", post["tags"])
kv_print("URL", post["file_url"])
# Append the URL to the list
post_urls.append(post["file_url"])
else:
# Raise an exception if the request failed
RequestFailed(response.status_code)
return post_urls
"""
Download a number of images from Konachan given a list of tags and a count
:param count: The number of images to download
:param tags: A list of tags to search for
"""
@add_source("konachan")
def handle(count: int, tags: list) -> list:
logging.debug(f"handle_konachan() called with count={count}, tags=[{', '.join(tags)}]")
# Get a list of URLs to download
post_urls: list = request_posts(count, tags)
# Download the images
files = download_files(post_urls)
# Return the downloaded files
return files