Compare commits

...

13 Commits
v0.1.0 ... main

20 changed files with 362 additions and 552 deletions

View File

@ -1,4 +1,10 @@
[mypy-aioresponses.*]
ignore_missing_imports=true
[mypy-feedparser.*] [mypy-feedparser.*]
ignore_missing_imports=true ignore_missing_imports=true
[mypy-pytest_mock.*]
ignore_missing_imports=true
[mypy] [mypy]

View File

@ -1,27 +1,39 @@
NAME = ytrssil .PHONY: setup-dev flake8 isort isort-fix mypy lint build clean
TESTS = tests
FILES_PY = $(shell find $(CURDIR)/$(NAME) $(CURDIR)/$(TESTS) -type f -name "*.py") NAME = ytrssil
TEST_PY = $(shell find $(CURDIR)/$(TESTS) -type f -name "*.py")
FILES_PY = $(shell find $(CURDIR)/$(NAME) -type f -name "*.py")
setup-dev: setup-dev:
pip install -r requirements-dev.txt pip install -r requirements-dev.txt
pip install -e . pip install -e .
test:
python -m unittest discover $(CURDIR)/$(TESTS)
flake8: flake8:
@flake8 $(FILES_PY) @flake8 $(FILES_PY)
mypy:
@mypy --strict $(FILES_PY)
isort: isort:
@isort -c $(FILES_PY) @isort -c $(FILES_PY)
validate: flake8 mypy isort isort-fix:
@isort $(FILES_PY)
coverage: mypy:
python -m coverage run -m unittest discover $(CURDIR)/$(TESTS) @mypy --strict $(FILES_PY)
python -m coverage html
lint: flake8 isort mypy
build:
python setup.py sdist bdist_wheel
clean:
rm -rf $(CURDIR)/build
rm -rf $(CURDIR)/dist
rm -rf $(CURDIR)/$(NAME).egg-info
publish:
@git checkout $(shell git tag | sort -V | tail -n1) >/dev/null 2>&1
@$(MAKE) clean > /dev/null
@$(MAKE) build > /dev/null
@twine upload dist/*
@$(MAKE) clean > /dev/null
@git switch main >/dev/null 2>&1

View File

@ -1,11 +1,20 @@
# YouTube RSS manager # YouTube RSS manager
This is a simple CLI to manage YouTube subscriptions through RSS feeds This is a simple CLI to manage YouTube subscriptions through RSS feeds
and watch new videos using `mpv`. It keeps track of watched videos in a local and watch new videos using `mpv`.
sqlite database.
## Configuration ## Configuration
It looks for a list of RSS URLs in `$XDG_CONFIG_HOME/ytrssil/feeds` It looks for a configuration in `$XDG_CONFIG_HOME/ytrssil/config.json`
(~/.config/ by default), with one URL per line. Only YouTube channel feeds (~/.config/ by default).
are supported at this moment.
Example:
```json
{
"username": "username",
"password": "password",
"api_url": "https://example.com",
"max_resolution": "1080"
}
```

View File

@ -1,3 +1,5 @@
flake8==3.* types-requests==2.28.11.*
jedi==0.* flake8==5.*
mypy==0.* mypy==0.*
isort
wheel==0.37.*

View File

@ -1,3 +1,2 @@
aiohttp==3.7.* requests==2.28.*
feedparser==6.0.*
inject==4.3.* inject==4.3.*

View File

@ -18,10 +18,9 @@ setup(
long_description=long_description, long_description=long_description,
long_description_content_type='text/markdown', long_description_content_type='text/markdown',
license_files=('LICENSE',), license_files=('LICENSE',),
homepage='https://gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil', url='https://gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil',
repository='https://gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil', version_config=True,
documentation='https://gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil', setup_requires=['setuptools-git-versioning'],
version='0.1.0',
packages=['ytrssil'], packages=['ytrssil'],
package_data={'': ['py.typed']}, package_data={'': ['py.typed']},
include_package_data=True, include_package_data=True,
@ -35,6 +34,7 @@ setup(
'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3 :: Only', 'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'License :: OSI Approved :: BSD License', 'License :: OSI Approved :: BSD License',
'Operating System :: POSIX :: Linux', 'Operating System :: POSIX :: Linux',
], ],

View File

@ -1,10 +1,10 @@
from collections.abc import Sequence from collections.abc import Sequence
from ytrssil.api import get_new_videos from ytrssil.api import get_new_video_count, get_new_videos
from ytrssil.datatypes import Channel, Video from ytrssil.datatypes import Video
__all__: Sequence[str] = ( __all__: Sequence[str] = (
'Channel',
'Video', 'Video',
'get_new_video_count',
'get_new_videos', 'get_new_videos',
) )

4
ytrssil/__main__.py Normal file
View File

@ -0,0 +1,4 @@
from ytrssil.cli import main
if __name__ == '__main__':
main()

View File

@ -1,21 +1,21 @@
from typing import cast
from inject import autoparams from inject import autoparams
from ytrssil.bindings import setup_dependencies from ytrssil.bindings import setup_dependencies
from ytrssil.datatypes import Video from ytrssil.datatypes import Video
from ytrssil.fetch import Fetcher from ytrssil.protocols import Client
from ytrssil.repository import ChannelRepository
def get_new_videos() -> list[Video]: def get_new_videos() -> list[Video]:
setup_dependencies() setup_dependencies()
@autoparams() @autoparams()
def _get_new_videos( def _get_new_videos(client: Client) -> list[Video]:
repository_manager: ChannelRepository, return client.get_new_videos()
fetcher: Fetcher,
) -> dict[str, Video]:
with repository_manager as _:
_, new_videos = fetcher.fetch_new_videos()
return new_videos
return list(_get_new_videos().values()) return cast(list[Video], _get_new_videos())
def get_new_video_count() -> int:
return len(get_new_videos())

View File

@ -1,21 +1,16 @@
from inject import Binder, Injector, configure, get_injector_or_die from inject import Binder, Injector, clear_and_configure, get_injector_or_die
from ytrssil.config import Configuration from ytrssil.client import HttpClient
from ytrssil.fetch import Fetcher, create_fetcher from ytrssil.config import Configuration, load_config
from ytrssil.parse import Parser, create_feed_parser from ytrssil.protocols import Client
from ytrssil.repository import ChannelRepository, create_channel_repository
def dependency_configuration(binder: Binder) -> None: def dependency_configuration(binder: Binder) -> None:
config = Configuration() binder.bind(Configuration, load_config())
binder.bind(Configuration, config) binder.bind_to_constructor(Client, HttpClient)
binder.bind_to_provider
binder.bind_to_constructor(ChannelRepository, create_channel_repository)
binder.bind_to_constructor(Fetcher, create_fetcher)
binder.bind_to_constructor(Parser, create_feed_parser)
def setup_dependencies() -> Injector: def setup_dependencies() -> Injector:
configure(dependency_configuration) clear_and_configure(dependency_configuration)
return get_injector_or_die() return get_injector_or_die()

View File

@ -1,20 +1,18 @@
from collections.abc import Iterator from collections.abc import Iterator
from datetime import datetime, timezone
from operator import attrgetter
from os import execv, fork from os import execv, fork
from subprocess import PIPE, Popen from subprocess import PIPE, Popen
from sys import argv, stderr from sys import argv, stderr
from typing import Any
from inject import autoparams from inject import autoparams
from ytrssil.bindings import setup_dependencies from ytrssil.bindings import setup_dependencies
from ytrssil.constants import mpv_options from ytrssil.config import Configuration
from ytrssil.datatypes import Video from ytrssil.datatypes import Video
from ytrssil.fetch import Fetcher from ytrssil.protocols import Client
from ytrssil.repository import ChannelRepository
def user_query(videos: dict[str, Video], reverse: bool = False) -> list[Video]: def user_query(videos: list[Video], reverse: bool = False) -> list[str]:
p = Popen( p = Popen(
['fzf', '-m'], ['fzf', '-m'],
stdout=PIPE, stdout=PIPE,
@ -22,19 +20,22 @@ def user_query(videos: dict[str, Video], reverse: bool = False) -> list[Video]:
) )
video_list: Iterator[Video] video_list: Iterator[Video]
if reverse: if reverse:
video_list = reversed(videos.values()) video_list = reversed(videos)
else: else:
video_list = iter(videos.values()) video_list = iter(videos)
input_bytes = '\n'.join(map(str, video_list)).encode('UTF-8') input_bytes = '\n'.join(map(str, video_list)).encode('UTF-8')
stdout, _ = p.communicate(input=input_bytes) stdout, _ = p.communicate(input=input_bytes)
videos_str: list[str] = stdout.decode('UTF-8').strip().split('\n') videos_str: list[str] = stdout.decode('UTF-8').strip().split('\n')
ret: list[Video] = [] ret: list[str] = []
for video_str in videos_str: for video_str in videos_str:
if video_str == '':
continue
*_, video_id = video_str.split(' - ') *_, video_id = video_str.split(' - ')
try: try:
ret.append(videos[video_id]) ret.append(video_id)
except KeyError: except KeyError:
pass pass
@ -42,90 +43,164 @@ def user_query(videos: dict[str, Video], reverse: bool = False) -> list[Video]:
@autoparams() @autoparams()
def watch_videos( def fetch(client: Client) -> int:
repository_manager: ChannelRepository, client.fetch()
fetcher: Fetcher, return 0
) -> int:
with repository_manager as repository:
channels, new_videos = fetcher.fetch_new_videos()
if not new_videos:
print('No new videos', file=stderr)
return 1
selected_videos = user_query(new_videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
video_urls = [video.url for video in selected_videos] @autoparams()
cmd = ['/usr/bin/mpv', *mpv_options, *video_urls] def register(client: Client) -> int:
if (fork() == 0): client.register()
execv(cmd[0], cmd) return 0
for video in selected_videos:
selected_channel = channels[video.channel_id] @autoparams('client')
selected_channel.mark_video_as_watched(video) def subscribe_to_channel(client: Client, channel_id: str) -> int:
watch_timestamp = datetime.utcnow().replace(tzinfo=timezone.utc) client.subscribe_to_channel(channel_id)
repository.update_video(video, watch_timestamp) return 0
@autoparams()
def watch_videos(config: Configuration, client: Client) -> int:
videos = client.get_new_videos()
if not videos:
print('No new videos', file=stderr)
return 1
selected_videos = user_query(videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
video_urls = [
f'https://www.youtube.com/watch?v={video_id}'
for video_id in selected_videos
]
cmd = ['/usr/bin/mpv', *config.mpv_options, *video_urls]
if (fork() == 0):
execv(cmd[0], cmd)
for video_id in selected_videos:
client.mark_video_as_watched(video_id)
return 0 return 0
@autoparams() @autoparams()
def watch_history( def print_url(client: Client) -> int:
repository_manager: ChannelRepository, videos = client.get_new_videos()
fetcher: Fetcher, if not videos:
) -> int: print('No new videos', file=stderr)
with repository_manager as repository: return 1
watched_videos = repository.get_watched_videos()
selected_videos = user_query(watched_videos, reverse=True)
if not selected_videos:
print('No video selected', file=stderr)
return 1
video_urls = [video.url for video in selected_videos] selected_videos = user_query(videos)
cmd = ['/usr/bin/mpv', *mpv_options, *video_urls] if not selected_videos:
if (fork() == 0): print('No video selected', file=stderr)
execv(cmd[0], cmd) return 2
for video_id in selected_videos:
client.mark_video_as_watched(video_id)
print(f'https://www.youtube.com/watch?v={video_id}')
return 0 return 0
@autoparams() @autoparams()
def mark_as_watched( def mark_as_watched(client: Client) -> int:
repository_manager: ChannelRepository, videos = client.get_new_videos()
fetcher: Fetcher, if not videos:
up_to_date: datetime print('No new videos', file=stderr)
) -> int: return 1
with repository_manager as repository:
channels, new_videos = fetcher.fetch_new_videos()
for video in sorted(new_videos.values(), key=attrgetter('timestamp')):
if video.timestamp >= up_to_date:
continue
selected_channel = channels[video.channel_id] selected_videos = user_query(videos)
selected_channel.mark_video_as_watched(video) if not selected_videos:
watch_timestamp = datetime.utcnow().replace(tzinfo=timezone.utc) print('No video selected', file=stderr)
repository.update_video(video, watch_timestamp) return 2
for video_id in selected_videos:
client.mark_video_as_watched(video_id)
return 0 return 0
def main() -> int: @autoparams()
def watch_history(config: Configuration, client: Client) -> int:
videos = client.get_watched_videos()
if not videos:
print('No new videos', file=stderr)
return 1
selected_videos = user_query(videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
video_urls = [
f'https://www.youtube.com/watch?v={video_id}'
for video_id in selected_videos
]
cmd = ['/usr/bin/mpv', *config.mpv_options, *video_urls]
if (fork() == 0):
execv(cmd[0], cmd)
return 0
@autoparams()
def mark_as_unwatched(client: Client) -> int:
videos = client.get_watched_videos()
if not videos:
print('No new videos', file=stderr)
return 1
selected_videos = user_query(videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
for video_id in selected_videos:
client.mark_video_as_unwatched(video_id)
return 0
def main(args: list[str] = argv) -> Any:
setup_dependencies() setup_dependencies()
command: str command: str
try: try:
command = argv[1] command = args[1]
except IndexError: except IndexError:
command = 'watch' command = 'watch'
if command == 'watch': if command == 'fetch':
return fetch()
elif command == 'register':
return register()
elif command == 'subscribe':
if len(args) < 3:
print(
'Missing channel ID argument for subscribe command',
file=stderr,
)
return 1
return subscribe_to_channel(channel_id=args[2])
elif command == 'watch':
return watch_videos() return watch_videos()
elif command == 'print':
return print_url()
elif command == 'history': elif command == 'history':
return watch_history() return watch_history()
elif command == 'mark': elif command == 'mark':
up_to_date = datetime.fromisoformat(argv[2]) return mark_as_watched()
return mark_as_watched(up_to_date=up_to_date) elif command == 'unmark':
return mark_as_unwatched()
else: else:
print(f'Unknown command "{command}"', file=stderr) print(f'Unknown command "{command}"', file=stderr)
print(
'Available commands: fetch, watch, print, history, mark, unmark',
file=stderr,
)
return 1 return 1
return 0

73
ytrssil/client.py Normal file
View File

@ -0,0 +1,73 @@
import requests
from inject import autoparams
from ytrssil.config import Configuration
from ytrssil.datatypes import Video
class HttpClient:
@autoparams('config')
def __init__(self, config: Configuration) -> None:
self.base_url: str = config.api_url
self.auth = requests.auth.HTTPBasicAuth(
config.username,
config.password,
)
def fetch(self) -> None:
resp = requests.post(url=f'{self.base_url}/fetch')
resp.raise_for_status()
def register(self) -> None:
resp = requests.post(url=f'{self.base_url}/register', json={
'username': self.auth.username,
'password': self.auth.password,
})
resp.raise_for_status()
def subscribe_to_channel(self, channel_id: str) -> None:
resp = requests.post(
url=f'{self.base_url}/api/channels/{channel_id}/subscribe',
auth=self.auth,
)
resp.raise_for_status()
def get_new_videos(self) -> list[Video]:
resp = requests.get(
url=f'{self.base_url}/api/videos/new',
auth=self.auth,
)
resp.raise_for_status()
data = resp.json()
ret: list[Video] = []
for video_data in data['videos']:
ret.append(Video(**video_data))
return ret
def get_watched_videos(self) -> list[Video]:
resp = requests.get(
url=f'{self.base_url}/api/videos/watched',
auth=self.auth,
)
resp.raise_for_status()
data = resp.json()
ret: list[Video] = []
for video_data in data['videos']:
ret.append(Video(**video_data))
return ret
def mark_video_as_watched(self, video_id: str) -> None:
resp = requests.post(
url=f'{self.base_url}/api/videos/{video_id}/watch',
auth=self.auth,
)
resp.raise_for_status()
def mark_video_as_unwatched(self, video_id: str) -> None:
resp = requests.post(
url=f'{self.base_url}/api/videos/{video_id}/unwatch',
auth=self.auth,
)
resp.raise_for_status()

View File

@ -1,19 +1,33 @@
import json
import os import os
from collections.abc import Iterator
from dataclasses import dataclass from dataclasses import dataclass
from typing import Literal
from ytrssil.constants import config_dir
@dataclass @dataclass
class Configuration: class Configuration:
channel_repository_type: str = 'sqlite' username: str
fetcher_type: str = 'aiohttp' password: str
feed_parser_type: str = 'feedparser' api_url: str = 'https://ytrssil.theedgeofrage.com'
max_resolution: Literal['480', '720', '1080', '1440', '2160'] = '1440'
@property
def mpv_options(self) -> list[str]:
return ['--no-terminal', (
'--ytdl-format=bestvideo[height<=?'
f'{self.max_resolution}]+bestaudio/best'
)]
def get_feed_urls() -> Iterator[str]: def load_config() -> Configuration:
file_path = os.path.join(config_dir, 'feeds') config_prefix: str
with open(file_path, 'r') as f: try:
for line in f: config_prefix = os.environ['XDG_CONFIG_HOME']
yield line.strip() except KeyError:
config_prefix = os.path.expanduser('~/.config')
config_path: str = os.path.join(config_prefix, 'ytrssil', 'config.json')
with open(config_path) as f:
config_data = json.load(f)
return Configuration(**config_data)

View File

@ -1,13 +0,0 @@
import os
config_prefix: str
try:
config_prefix = os.environ['XDG_CONFIG_HOME']
except KeyError:
config_prefix = os.path.expanduser('~/.config')
config_dir: str = os.path.join(config_prefix, 'ytrssil')
mpv_options: list[str] = [
'--no-terminal',
'--ytdl-format=bestvideo[height<=?1080]+bestaudio/best',
]

View File

@ -1,52 +1,30 @@
from dataclasses import dataclass, field from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from typing import Union from typing import Optional
@dataclass @dataclass
class Video: class Video:
video_id: str video_id: str
name: str title: str
url: str
channel_id: str
channel_name: str channel_name: str
timestamp: datetime published_timestamp: datetime
watch_timestamp: Union[datetime, None] = None watch_timestamp: Optional[datetime] = None
def __str__(self) -> str: def __str__(self) -> str:
return f'{self.channel_name} - {self.name} - {self.video_id}' return f'{self.channel_name} - {self.title} - {self.video_id}'
@dataclass @dataclass
class Channel: class Channel:
channel_id: str channel_id: str
name: str name: str
url: str
new_videos: dict[str, Video] = field(default_factory=lambda: dict())
watched_videos: dict[str, Video] = field(default_factory=lambda: dict())
def add_video(self, video: Video) -> bool:
if (
video.video_id in self.watched_videos
or video.video_id in self.new_videos
):
return False
self.new_videos[video.video_id] = video
return True
def remove_old_videos(self) -> None:
vid_list: list[Video] = sorted(
self.watched_videos.values(),
key=lambda x: x.timestamp,
)
for video in vid_list[15:]:
self.watched_videos.pop(video.video_id)
def mark_video_as_watched(self, video: Video) -> None:
self.new_videos.pop(video.video_id)
self.watched_videos[video.video_id] = video
self.remove_old_videos()
def __str__(self) -> str: def __str__(self) -> str:
return f'{self.name} - {len(self.new_videos)}' return f'{self.name} - {self.channel_id}'
@dataclass
class User:
username: str
password: str

View File

@ -1,2 +0,0 @@
class ChannelNotFound(Exception):
pass

View File

@ -1,60 +0,0 @@
from abc import ABCMeta, abstractmethod
from asyncio import gather, run
from collections.abc import Iterable
from aiohttp import ClientResponse, ClientSession
from inject import autoparams
from ytrssil.config import Configuration, get_feed_urls
from ytrssil.datatypes import Channel, Video
from ytrssil.parse import Parser
from ytrssil.repository import ChannelRepository
class Fetcher(metaclass=ABCMeta):
@abstractmethod
def fetch_feeds(self, urls: Iterable[str]) -> Iterable[str]:
pass
@autoparams('parser', 'repository')
def fetch_new_videos(
self,
parser: Parser,
repository: ChannelRepository,
) -> tuple[dict[str, Channel], dict[str, Video]]:
feed_urls = get_feed_urls()
channels: dict[str, Channel] = {}
new_videos: dict[str, Video] = {}
for feed in self.fetch_feeds(feed_urls):
channel = parser(feed)
channels[channel.channel_id] = channel
new_videos.update(channel.new_videos)
return channels, new_videos
class AioHttpFetcher(Fetcher):
async def request(self, session: ClientSession, url: str) -> ClientResponse:
return await session.request(method='GET', url=url)
async def async_fetch_feeds(self, urls: Iterable[str]) -> Iterable[str]:
async with ClientSession() as session:
responses: list[ClientResponse] = await gather(*[
self.request(session, url) for url in urls
])
return [
await response.text(encoding='UTF-8')
for response in responses
]
def fetch_feeds(self, urls: Iterable[str]) -> Iterable[str]:
return run(self.async_fetch_feeds(urls))
@autoparams()
def create_fetcher(config: Configuration) -> Fetcher:
fetcher_type = config.fetcher_type
if fetcher_type == 'aiohttp':
return AioHttpFetcher()
else:
raise Exception(f'Unknown feed fetcher type: "{fetcher_type}"')

View File

@ -1,58 +0,0 @@
from abc import ABCMeta, abstractmethod
from datetime import datetime
import feedparser
from inject import autoparams
from ytrssil.config import Configuration
from ytrssil.datatypes import Channel, Video
from ytrssil.exceptions import ChannelNotFound
from ytrssil.repository import ChannelRepository
class Parser(metaclass=ABCMeta):
@autoparams('channel_repository')
def __init__(self, channel_repository: ChannelRepository) -> None:
self.repository = channel_repository
@abstractmethod
def __call__(self, feed_content: str) -> Channel:
pass
class FeedparserParser(Parser):
def __call__(self, feed_content: str) -> Channel:
d = feedparser.parse(feed_content)
channel_id: str = d['feed']['yt_channelid']
try:
channel = self.repository.get_channel(channel_id)
except ChannelNotFound:
channel = Channel(
channel_id=channel_id,
name=d['feed']['title'],
url=d['feed']['link'],
)
self.repository.create_channel(channel)
for entry in d['entries']:
video = Video(
video_id=entry['yt_videoid'],
name=entry['title'],
url=entry['link'],
timestamp=datetime.fromisoformat(entry['published']),
channel_id=channel.channel_id,
channel_name=channel.name,
)
if channel.add_video(video):
self.repository.add_new_video(channel, video)
return channel
@autoparams()
def create_feed_parser(config: Configuration) -> Parser:
parser_type = config.feed_parser_type
if parser_type == 'feedparser':
return FeedparserParser()
else:
raise Exception(f'Unknown feed parser type: "{parser_type}"')

32
ytrssil/protocols.py Normal file
View File

@ -0,0 +1,32 @@
from typing import Protocol
from ytrssil.datatypes import Video
class Client(Protocol):
def fetch(self) -> None: # pragma: no cover
...
def register(self) -> None: # pragma: no cover
...
def subscribe_to_channel(
self,
channel_id: str,
) -> None: # pragma: no cover
...
def get_new_videos(self) -> list[Video]: # pragma: no cover
...
def get_watched_videos(self) -> list[Video]: # pragma: no cover
...
def mark_video_as_watched(self, video_id: str) -> None: # pragma: no cover
...
def mark_video_as_unwatched(
self,
video_id: str,
) -> None: # pragma: no cover
...

View File

@ -1,256 +0,0 @@
from __future__ import annotations
import os
from abc import ABCMeta, abstractmethod
from datetime import datetime
from sqlite3 import connect
from typing import Any, Union
from inject import autoparams
from ytrssil.config import Configuration
from ytrssil.constants import config_dir
from ytrssil.datatypes import Channel, Video
from ytrssil.exceptions import ChannelNotFound
class ChannelRepository(metaclass=ABCMeta):
@abstractmethod
def __enter__(self) -> ChannelRepository:
pass
@abstractmethod
def __exit__(
self,
exc_type: Any,
exc_value: Any,
exc_traceback: Any,
) -> None:
pass
@abstractmethod
def get_channel(self, channel_id: str) -> Channel:
pass
@abstractmethod
def get_all_channels(self) -> list[Channel]:
pass
@abstractmethod
def get_watched_videos(self) -> dict[str, Video]:
pass
@abstractmethod
def create_channel(self, channel: Channel) -> None:
pass
@abstractmethod
def add_new_video(self, channel: Channel, video: Video) -> None:
pass
@abstractmethod
def update_video(self, video: Video, watch_timestamp: datetime) -> None:
pass
class SqliteChannelRepository(ChannelRepository):
def __init__(self) -> None:
os.makedirs(config_dir, exist_ok=True)
self.file_path: str = os.path.join(config_dir, 'channels.db')
self.setup_database()
def setup_database(self) -> None:
connection = connect(self.file_path)
cursor = connection.cursor()
cursor.execute('PRAGMA foreign_keys = ON')
cursor.execute(
'CREATE TABLE IF NOT EXISTS channels ('
'channel_id VARCHAR PRIMARY KEY, name VARCHAR, url VARCHAR UNIQUE)'
)
cursor.execute(
'CREATE TABLE IF NOT EXISTS videos ('
'video_id VARCHAR PRIMARY KEY, name VARCHAR, url VARCHAR UNIQUE, '
'timestamp VARCHAR, watch_timestamp VARCHAR, channel_id VARCHAR, '
'FOREIGN KEY(channel_id) REFERENCES channels(channel_id))'
)
connection.commit()
connection.close()
def __enter__(self) -> ChannelRepository:
self.connection = connect(self.file_path)
return self
def __exit__(
self,
exc_type: Any,
exc_value: Any,
exc_traceback: Any,
) -> None:
self.connection.close()
def channel_to_params(self, channel: Channel) -> dict[str, str]:
return {
'channel_id': channel.channel_id,
'name': channel.name,
'url': channel.url,
}
def channel_data_to_channel(
self,
channel_data: tuple[str, str, str],
) -> Channel:
channel = Channel(
channel_id=channel_data[0],
name=channel_data[1],
url=channel_data[2],
)
for video in self.get_videos_for_channel(channel):
if video.watch_timestamp is not None:
channel.watched_videos[video.video_id] = video
else:
channel.new_videos[video.video_id] = video
return channel
def video_to_params(self, video: Video) -> dict[str, Any]:
watch_timestamp: Union[str, None]
if video.watch_timestamp is not None:
watch_timestamp = video.watch_timestamp.isoformat()
else:
watch_timestamp = video.watch_timestamp
return {
'video_id': video.video_id,
'name': video.name,
'url': video.url,
'timestamp': video.timestamp.isoformat(),
'watch_timestamp': watch_timestamp,
'channel_id': video.channel_id,
}
def video_data_to_video(
self,
video_data: tuple[str, str, str, str, str],
channel_id: str,
channel_name: str,
) -> Video:
watch_timestamp: Union[datetime, None]
if video_data[4] is not None:
watch_timestamp = datetime.fromisoformat(video_data[4])
else:
watch_timestamp = video_data[4]
return Video(
video_id=video_data[0],
name=video_data[1],
url=video_data[2],
timestamp=datetime.fromisoformat(video_data[3]),
watch_timestamp=watch_timestamp,
channel_id=channel_id,
channel_name=channel_name,
)
def get_channel(self, channel_id: str) -> Channel:
cursor = self.connection.cursor()
cursor.execute(
'SELECT * FROM channels WHERE channel_id=:channel_id',
{'channel_id': channel_id},
)
try:
return self.channel_data_to_channel(next(cursor))
except StopIteration:
raise ChannelNotFound
def get_all_channels(self) -> list[Channel]:
cursor = self.connection.cursor()
cursor.execute('SELECT * FROM channels')
return [
self.channel_data_to_channel(channel_data)
for channel_data in cursor
]
def get_videos_for_channel(
self,
channel: Channel,
) -> list[Video]:
cursor = self.connection.cursor()
cursor.execute(
'SELECT video_id, name, url, timestamp, watch_timestamp '
'FROM videos WHERE channel_id=:channel_id',
{'channel_id': channel.channel_id},
)
return [
self.video_data_to_video(
video_data=video_data,
channel_id=channel.channel_id,
channel_name=channel.name,
)
for video_data in cursor
]
def get_watched_videos(self) -> dict[str, Video]:
cursor = self.connection.cursor()
cursor.execute(
'SELECT video_id, videos.name, videos.url, timestamp, '
'watch_timestamp, channels.channel_id, channels.name FROM videos '
'LEFT JOIN channels ON channels.channel_id=videos.channel_id WHERE '
'watch_timestamp IS NOT NULL ORDER BY timestamp'
)
return {
video_data[0]: self.video_data_to_video(
video_data=video_data,
channel_id=video_data[5],
channel_name=video_data[6],
)
for video_data in cursor
}
def create_channel(self, channel: Channel) -> None:
cursor = self.connection.cursor()
cursor.execute(
'INSERT INTO channels VALUES (:channel_id, :name, :url)',
self.channel_to_params(channel),
)
self.connection.commit()
def update_channel(self, channel: Channel) -> None:
cursor = self.connection.cursor()
cursor.execute(
'UPDATE channels SET channel_id = :channel_id, name = :name, '
'url = :url WHERE channel_id=:channel_id',
self.channel_to_params(channel),
)
self.connection.commit()
def add_new_video(self, channel: Channel, video: Video) -> None:
cursor = self.connection.cursor()
cursor.execute(
'INSERT INTO videos VALUES (:video_id, :name, '
':url, :timestamp, :watch_timestamp, :channel_id)',
self.video_to_params(video),
)
self.connection.commit()
def update_video(self, video: Video, watch_timestamp: datetime) -> None:
cursor = self.connection.cursor()
cursor.execute(
'UPDATE videos SET watch_timestamp = :watch_timestamp '
'WHERE video_id=:video_id',
{
'watch_timestamp': watch_timestamp.isoformat(),
'video_id': video.video_id,
},
)
self.connection.commit()
@autoparams()
def create_channel_repository(config: Configuration) -> ChannelRepository:
repo_type = config.channel_repository_type
if repo_type == 'sqlite':
return SqliteChannelRepository()
else:
raise Exception(f'Unknown channel repository type: "{repo_type}"')