Compare commits

...

13 Commits
v0.1.0 ... main

20 changed files with 362 additions and 552 deletions

View File

@ -1,4 +1,10 @@
[mypy-aioresponses.*]
ignore_missing_imports=true
[mypy-feedparser.*]
ignore_missing_imports=true
[mypy-pytest_mock.*]
ignore_missing_imports=true
[mypy]

View File

@ -1,27 +1,39 @@
NAME = ytrssil
TESTS = tests
.PHONY: setup-dev flake8 isort isort-fix mypy lint build clean
FILES_PY = $(shell find $(CURDIR)/$(NAME) $(CURDIR)/$(TESTS) -type f -name "*.py")
TEST_PY = $(shell find $(CURDIR)/$(TESTS) -type f -name "*.py")
NAME = ytrssil
FILES_PY = $(shell find $(CURDIR)/$(NAME) -type f -name "*.py")
setup-dev:
pip install -r requirements-dev.txt
pip install -e .
test:
python -m unittest discover $(CURDIR)/$(TESTS)
flake8:
@flake8 $(FILES_PY)
mypy:
@mypy --strict $(FILES_PY)
isort:
@isort -c $(FILES_PY)
validate: flake8 mypy isort
isort-fix:
@isort $(FILES_PY)
coverage:
python -m coverage run -m unittest discover $(CURDIR)/$(TESTS)
python -m coverage html
mypy:
@mypy --strict $(FILES_PY)
lint: flake8 isort mypy
build:
python setup.py sdist bdist_wheel
clean:
rm -rf $(CURDIR)/build
rm -rf $(CURDIR)/dist
rm -rf $(CURDIR)/$(NAME).egg-info
publish:
@git checkout $(shell git tag | sort -V | tail -n1) >/dev/null 2>&1
@$(MAKE) clean > /dev/null
@$(MAKE) build > /dev/null
@twine upload dist/*
@$(MAKE) clean > /dev/null
@git switch main >/dev/null 2>&1

View File

@ -1,11 +1,20 @@
# YouTube RSS manager
This is a simple CLI to manage YouTube subscriptions through RSS feeds
and watch new videos using `mpv`. It keeps track of watched videos in a local
sqlite database.
and watch new videos using `mpv`.
## Configuration
It looks for a list of RSS URLs in `$XDG_CONFIG_HOME/ytrssil/feeds`
(~/.config/ by default), with one URL per line. Only YouTube channel feeds
are supported at this moment.
It looks for a configuration in `$XDG_CONFIG_HOME/ytrssil/config.json`
(~/.config/ by default).
Example:
```json
{
"username": "username",
"password": "password",
"api_url": "https://example.com",
"max_resolution": "1080"
}
```

View File

@ -1,3 +1,5 @@
flake8==3.*
jedi==0.*
types-requests==2.28.11.*
flake8==5.*
mypy==0.*
isort
wheel==0.37.*

View File

@ -1,3 +1,2 @@
aiohttp==3.7.*
feedparser==6.0.*
requests==2.28.*
inject==4.3.*

View File

@ -18,10 +18,9 @@ setup(
long_description=long_description,
long_description_content_type='text/markdown',
license_files=('LICENSE',),
homepage='https://gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil',
repository='https://gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil',
documentation='https://gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil',
version='0.1.0',
url='https://gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil',
version_config=True,
setup_requires=['setuptools-git-versioning'],
packages=['ytrssil'],
package_data={'': ['py.typed']},
include_package_data=True,
@ -35,6 +34,7 @@ setup(
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'License :: OSI Approved :: BSD License',
'Operating System :: POSIX :: Linux',
],

View File

@ -1,10 +1,10 @@
from collections.abc import Sequence
from ytrssil.api import get_new_videos
from ytrssil.datatypes import Channel, Video
from ytrssil.api import get_new_video_count, get_new_videos
from ytrssil.datatypes import Video
__all__: Sequence[str] = (
'Channel',
'Video',
'get_new_video_count',
'get_new_videos',
)

4
ytrssil/__main__.py Normal file
View File

@ -0,0 +1,4 @@
from ytrssil.cli import main
if __name__ == '__main__':
main()

View File

@ -1,21 +1,21 @@
from typing import cast
from inject import autoparams
from ytrssil.bindings import setup_dependencies
from ytrssil.datatypes import Video
from ytrssil.fetch import Fetcher
from ytrssil.repository import ChannelRepository
from ytrssil.protocols import Client
def get_new_videos() -> list[Video]:
setup_dependencies()
@autoparams()
def _get_new_videos(
repository_manager: ChannelRepository,
fetcher: Fetcher,
) -> dict[str, Video]:
with repository_manager as _:
_, new_videos = fetcher.fetch_new_videos()
return new_videos
def _get_new_videos(client: Client) -> list[Video]:
return client.get_new_videos()
return list(_get_new_videos().values())
return cast(list[Video], _get_new_videos())
def get_new_video_count() -> int:
return len(get_new_videos())

View File

@ -1,21 +1,16 @@
from inject import Binder, Injector, configure, get_injector_or_die
from inject import Binder, Injector, clear_and_configure, get_injector_or_die
from ytrssil.config import Configuration
from ytrssil.fetch import Fetcher, create_fetcher
from ytrssil.parse import Parser, create_feed_parser
from ytrssil.repository import ChannelRepository, create_channel_repository
from ytrssil.client import HttpClient
from ytrssil.config import Configuration, load_config
from ytrssil.protocols import Client
def dependency_configuration(binder: Binder) -> None:
config = Configuration()
binder.bind(Configuration, config)
binder.bind_to_provider
binder.bind_to_constructor(ChannelRepository, create_channel_repository)
binder.bind_to_constructor(Fetcher, create_fetcher)
binder.bind_to_constructor(Parser, create_feed_parser)
binder.bind(Configuration, load_config())
binder.bind_to_constructor(Client, HttpClient)
def setup_dependencies() -> Injector:
configure(dependency_configuration)
clear_and_configure(dependency_configuration)
return get_injector_or_die()

View File

@ -1,20 +1,18 @@
from collections.abc import Iterator
from datetime import datetime, timezone
from operator import attrgetter
from os import execv, fork
from subprocess import PIPE, Popen
from sys import argv, stderr
from typing import Any
from inject import autoparams
from ytrssil.bindings import setup_dependencies
from ytrssil.constants import mpv_options
from ytrssil.config import Configuration
from ytrssil.datatypes import Video
from ytrssil.fetch import Fetcher
from ytrssil.repository import ChannelRepository
from ytrssil.protocols import Client
def user_query(videos: dict[str, Video], reverse: bool = False) -> list[Video]:
def user_query(videos: list[Video], reverse: bool = False) -> list[str]:
p = Popen(
['fzf', '-m'],
stdout=PIPE,
@ -22,19 +20,22 @@ def user_query(videos: dict[str, Video], reverse: bool = False) -> list[Video]:
)
video_list: Iterator[Video]
if reverse:
video_list = reversed(videos.values())
video_list = reversed(videos)
else:
video_list = iter(videos.values())
video_list = iter(videos)
input_bytes = '\n'.join(map(str, video_list)).encode('UTF-8')
stdout, _ = p.communicate(input=input_bytes)
videos_str: list[str] = stdout.decode('UTF-8').strip().split('\n')
ret: list[Video] = []
ret: list[str] = []
for video_str in videos_str:
if video_str == '':
continue
*_, video_id = video_str.split(' - ')
try:
ret.append(videos[video_id])
ret.append(video_id)
except KeyError:
pass
@ -42,90 +43,164 @@ def user_query(videos: dict[str, Video], reverse: bool = False) -> list[Video]:
@autoparams()
def watch_videos(
repository_manager: ChannelRepository,
fetcher: Fetcher,
) -> int:
with repository_manager as repository:
channels, new_videos = fetcher.fetch_new_videos()
if not new_videos:
print('No new videos', file=stderr)
return 1
def fetch(client: Client) -> int:
client.fetch()
return 0
selected_videos = user_query(new_videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
video_urls = [video.url for video in selected_videos]
cmd = ['/usr/bin/mpv', *mpv_options, *video_urls]
if (fork() == 0):
execv(cmd[0], cmd)
@autoparams()
def register(client: Client) -> int:
client.register()
return 0
for video in selected_videos:
selected_channel = channels[video.channel_id]
selected_channel.mark_video_as_watched(video)
watch_timestamp = datetime.utcnow().replace(tzinfo=timezone.utc)
repository.update_video(video, watch_timestamp)
@autoparams('client')
def subscribe_to_channel(client: Client, channel_id: str) -> int:
client.subscribe_to_channel(channel_id)
return 0
@autoparams()
def watch_videos(config: Configuration, client: Client) -> int:
videos = client.get_new_videos()
if not videos:
print('No new videos', file=stderr)
return 1
selected_videos = user_query(videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
video_urls = [
f'https://www.youtube.com/watch?v={video_id}'
for video_id in selected_videos
]
cmd = ['/usr/bin/mpv', *config.mpv_options, *video_urls]
if (fork() == 0):
execv(cmd[0], cmd)
for video_id in selected_videos:
client.mark_video_as_watched(video_id)
return 0
@autoparams()
def watch_history(
repository_manager: ChannelRepository,
fetcher: Fetcher,
) -> int:
with repository_manager as repository:
watched_videos = repository.get_watched_videos()
selected_videos = user_query(watched_videos, reverse=True)
if not selected_videos:
print('No video selected', file=stderr)
return 1
def print_url(client: Client) -> int:
videos = client.get_new_videos()
if not videos:
print('No new videos', file=stderr)
return 1
video_urls = [video.url for video in selected_videos]
cmd = ['/usr/bin/mpv', *mpv_options, *video_urls]
if (fork() == 0):
execv(cmd[0], cmd)
selected_videos = user_query(videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
for video_id in selected_videos:
client.mark_video_as_watched(video_id)
print(f'https://www.youtube.com/watch?v={video_id}')
return 0
@autoparams()
def mark_as_watched(
repository_manager: ChannelRepository,
fetcher: Fetcher,
up_to_date: datetime
) -> int:
with repository_manager as repository:
channels, new_videos = fetcher.fetch_new_videos()
for video in sorted(new_videos.values(), key=attrgetter('timestamp')):
if video.timestamp >= up_to_date:
continue
def mark_as_watched(client: Client) -> int:
videos = client.get_new_videos()
if not videos:
print('No new videos', file=stderr)
return 1
selected_channel = channels[video.channel_id]
selected_channel.mark_video_as_watched(video)
watch_timestamp = datetime.utcnow().replace(tzinfo=timezone.utc)
repository.update_video(video, watch_timestamp)
selected_videos = user_query(videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
for video_id in selected_videos:
client.mark_video_as_watched(video_id)
return 0
def main() -> int:
@autoparams()
def watch_history(config: Configuration, client: Client) -> int:
videos = client.get_watched_videos()
if not videos:
print('No new videos', file=stderr)
return 1
selected_videos = user_query(videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
video_urls = [
f'https://www.youtube.com/watch?v={video_id}'
for video_id in selected_videos
]
cmd = ['/usr/bin/mpv', *config.mpv_options, *video_urls]
if (fork() == 0):
execv(cmd[0], cmd)
return 0
@autoparams()
def mark_as_unwatched(client: Client) -> int:
videos = client.get_watched_videos()
if not videos:
print('No new videos', file=stderr)
return 1
selected_videos = user_query(videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
for video_id in selected_videos:
client.mark_video_as_unwatched(video_id)
return 0
def main(args: list[str] = argv) -> Any:
setup_dependencies()
command: str
try:
command = argv[1]
command = args[1]
except IndexError:
command = 'watch'
if command == 'watch':
if command == 'fetch':
return fetch()
elif command == 'register':
return register()
elif command == 'subscribe':
if len(args) < 3:
print(
'Missing channel ID argument for subscribe command',
file=stderr,
)
return 1
return subscribe_to_channel(channel_id=args[2])
elif command == 'watch':
return watch_videos()
elif command == 'print':
return print_url()
elif command == 'history':
return watch_history()
elif command == 'mark':
up_to_date = datetime.fromisoformat(argv[2])
return mark_as_watched(up_to_date=up_to_date)
return mark_as_watched()
elif command == 'unmark':
return mark_as_unwatched()
else:
print(f'Unknown command "{command}"', file=stderr)
print(
'Available commands: fetch, watch, print, history, mark, unmark',
file=stderr,
)
return 1
return 0

73
ytrssil/client.py Normal file
View File

@ -0,0 +1,73 @@
import requests
from inject import autoparams
from ytrssil.config import Configuration
from ytrssil.datatypes import Video
class HttpClient:
@autoparams('config')
def __init__(self, config: Configuration) -> None:
self.base_url: str = config.api_url
self.auth = requests.auth.HTTPBasicAuth(
config.username,
config.password,
)
def fetch(self) -> None:
resp = requests.post(url=f'{self.base_url}/fetch')
resp.raise_for_status()
def register(self) -> None:
resp = requests.post(url=f'{self.base_url}/register', json={
'username': self.auth.username,
'password': self.auth.password,
})
resp.raise_for_status()
def subscribe_to_channel(self, channel_id: str) -> None:
resp = requests.post(
url=f'{self.base_url}/api/channels/{channel_id}/subscribe',
auth=self.auth,
)
resp.raise_for_status()
def get_new_videos(self) -> list[Video]:
resp = requests.get(
url=f'{self.base_url}/api/videos/new',
auth=self.auth,
)
resp.raise_for_status()
data = resp.json()
ret: list[Video] = []
for video_data in data['videos']:
ret.append(Video(**video_data))
return ret
def get_watched_videos(self) -> list[Video]:
resp = requests.get(
url=f'{self.base_url}/api/videos/watched',
auth=self.auth,
)
resp.raise_for_status()
data = resp.json()
ret: list[Video] = []
for video_data in data['videos']:
ret.append(Video(**video_data))
return ret
def mark_video_as_watched(self, video_id: str) -> None:
resp = requests.post(
url=f'{self.base_url}/api/videos/{video_id}/watch',
auth=self.auth,
)
resp.raise_for_status()
def mark_video_as_unwatched(self, video_id: str) -> None:
resp = requests.post(
url=f'{self.base_url}/api/videos/{video_id}/unwatch',
auth=self.auth,
)
resp.raise_for_status()

View File

@ -1,19 +1,33 @@
import json
import os
from collections.abc import Iterator
from dataclasses import dataclass
from ytrssil.constants import config_dir
from typing import Literal
@dataclass
class Configuration:
channel_repository_type: str = 'sqlite'
fetcher_type: str = 'aiohttp'
feed_parser_type: str = 'feedparser'
username: str
password: str
api_url: str = 'https://ytrssil.theedgeofrage.com'
max_resolution: Literal['480', '720', '1080', '1440', '2160'] = '1440'
@property
def mpv_options(self) -> list[str]:
return ['--no-terminal', (
'--ytdl-format=bestvideo[height<=?'
f'{self.max_resolution}]+bestaudio/best'
)]
def get_feed_urls() -> Iterator[str]:
file_path = os.path.join(config_dir, 'feeds')
with open(file_path, 'r') as f:
for line in f:
yield line.strip()
def load_config() -> Configuration:
config_prefix: str
try:
config_prefix = os.environ['XDG_CONFIG_HOME']
except KeyError:
config_prefix = os.path.expanduser('~/.config')
config_path: str = os.path.join(config_prefix, 'ytrssil', 'config.json')
with open(config_path) as f:
config_data = json.load(f)
return Configuration(**config_data)

View File

@ -1,13 +0,0 @@
import os
config_prefix: str
try:
config_prefix = os.environ['XDG_CONFIG_HOME']
except KeyError:
config_prefix = os.path.expanduser('~/.config')
config_dir: str = os.path.join(config_prefix, 'ytrssil')
mpv_options: list[str] = [
'--no-terminal',
'--ytdl-format=bestvideo[height<=?1080]+bestaudio/best',
]

View File

@ -1,52 +1,30 @@
from dataclasses import dataclass, field
from dataclasses import dataclass
from datetime import datetime
from typing import Union
from typing import Optional
@dataclass
class Video:
video_id: str
name: str
url: str
channel_id: str
title: str
channel_name: str
timestamp: datetime
watch_timestamp: Union[datetime, None] = None
published_timestamp: datetime
watch_timestamp: Optional[datetime] = None
def __str__(self) -> str:
return f'{self.channel_name} - {self.name} - {self.video_id}'
return f'{self.channel_name} - {self.title} - {self.video_id}'
@dataclass
class Channel:
channel_id: str
name: str
url: str
new_videos: dict[str, Video] = field(default_factory=lambda: dict())
watched_videos: dict[str, Video] = field(default_factory=lambda: dict())
def add_video(self, video: Video) -> bool:
if (
video.video_id in self.watched_videos
or video.video_id in self.new_videos
):
return False
self.new_videos[video.video_id] = video
return True
def remove_old_videos(self) -> None:
vid_list: list[Video] = sorted(
self.watched_videos.values(),
key=lambda x: x.timestamp,
)
for video in vid_list[15:]:
self.watched_videos.pop(video.video_id)
def mark_video_as_watched(self, video: Video) -> None:
self.new_videos.pop(video.video_id)
self.watched_videos[video.video_id] = video
self.remove_old_videos()
def __str__(self) -> str:
return f'{self.name} - {len(self.new_videos)}'
return f'{self.name} - {self.channel_id}'
@dataclass
class User:
username: str
password: str

View File

@ -1,2 +0,0 @@
class ChannelNotFound(Exception):
pass

View File

@ -1,60 +0,0 @@
from abc import ABCMeta, abstractmethod
from asyncio import gather, run
from collections.abc import Iterable
from aiohttp import ClientResponse, ClientSession
from inject import autoparams
from ytrssil.config import Configuration, get_feed_urls
from ytrssil.datatypes import Channel, Video
from ytrssil.parse import Parser
from ytrssil.repository import ChannelRepository
class Fetcher(metaclass=ABCMeta):
@abstractmethod
def fetch_feeds(self, urls: Iterable[str]) -> Iterable[str]:
pass
@autoparams('parser', 'repository')
def fetch_new_videos(
self,
parser: Parser,
repository: ChannelRepository,
) -> tuple[dict[str, Channel], dict[str, Video]]:
feed_urls = get_feed_urls()
channels: dict[str, Channel] = {}
new_videos: dict[str, Video] = {}
for feed in self.fetch_feeds(feed_urls):
channel = parser(feed)
channels[channel.channel_id] = channel
new_videos.update(channel.new_videos)
return channels, new_videos
class AioHttpFetcher(Fetcher):
async def request(self, session: ClientSession, url: str) -> ClientResponse:
return await session.request(method='GET', url=url)
async def async_fetch_feeds(self, urls: Iterable[str]) -> Iterable[str]:
async with ClientSession() as session:
responses: list[ClientResponse] = await gather(*[
self.request(session, url) for url in urls
])
return [
await response.text(encoding='UTF-8')
for response in responses
]
def fetch_feeds(self, urls: Iterable[str]) -> Iterable[str]:
return run(self.async_fetch_feeds(urls))
@autoparams()
def create_fetcher(config: Configuration) -> Fetcher:
fetcher_type = config.fetcher_type
if fetcher_type == 'aiohttp':
return AioHttpFetcher()
else:
raise Exception(f'Unknown feed fetcher type: "{fetcher_type}"')

View File

@ -1,58 +0,0 @@
from abc import ABCMeta, abstractmethod
from datetime import datetime
import feedparser
from inject import autoparams
from ytrssil.config import Configuration
from ytrssil.datatypes import Channel, Video
from ytrssil.exceptions import ChannelNotFound
from ytrssil.repository import ChannelRepository
class Parser(metaclass=ABCMeta):
@autoparams('channel_repository')
def __init__(self, channel_repository: ChannelRepository) -> None:
self.repository = channel_repository
@abstractmethod
def __call__(self, feed_content: str) -> Channel:
pass
class FeedparserParser(Parser):
def __call__(self, feed_content: str) -> Channel:
d = feedparser.parse(feed_content)
channel_id: str = d['feed']['yt_channelid']
try:
channel = self.repository.get_channel(channel_id)
except ChannelNotFound:
channel = Channel(
channel_id=channel_id,
name=d['feed']['title'],
url=d['feed']['link'],
)
self.repository.create_channel(channel)
for entry in d['entries']:
video = Video(
video_id=entry['yt_videoid'],
name=entry['title'],
url=entry['link'],
timestamp=datetime.fromisoformat(entry['published']),
channel_id=channel.channel_id,
channel_name=channel.name,
)
if channel.add_video(video):
self.repository.add_new_video(channel, video)
return channel
@autoparams()
def create_feed_parser(config: Configuration) -> Parser:
parser_type = config.feed_parser_type
if parser_type == 'feedparser':
return FeedparserParser()
else:
raise Exception(f'Unknown feed parser type: "{parser_type}"')

32
ytrssil/protocols.py Normal file
View File

@ -0,0 +1,32 @@
from typing import Protocol
from ytrssil.datatypes import Video
class Client(Protocol):
def fetch(self) -> None: # pragma: no cover
...
def register(self) -> None: # pragma: no cover
...
def subscribe_to_channel(
self,
channel_id: str,
) -> None: # pragma: no cover
...
def get_new_videos(self) -> list[Video]: # pragma: no cover
...
def get_watched_videos(self) -> list[Video]: # pragma: no cover
...
def mark_video_as_watched(self, video_id: str) -> None: # pragma: no cover
...
def mark_video_as_unwatched(
self,
video_id: str,
) -> None: # pragma: no cover
...

View File

@ -1,256 +0,0 @@
from __future__ import annotations
import os
from abc import ABCMeta, abstractmethod
from datetime import datetime
from sqlite3 import connect
from typing import Any, Union
from inject import autoparams
from ytrssil.config import Configuration
from ytrssil.constants import config_dir
from ytrssil.datatypes import Channel, Video
from ytrssil.exceptions import ChannelNotFound
class ChannelRepository(metaclass=ABCMeta):
@abstractmethod
def __enter__(self) -> ChannelRepository:
pass
@abstractmethod
def __exit__(
self,
exc_type: Any,
exc_value: Any,
exc_traceback: Any,
) -> None:
pass
@abstractmethod
def get_channel(self, channel_id: str) -> Channel:
pass
@abstractmethod
def get_all_channels(self) -> list[Channel]:
pass
@abstractmethod
def get_watched_videos(self) -> dict[str, Video]:
pass
@abstractmethod
def create_channel(self, channel: Channel) -> None:
pass
@abstractmethod
def add_new_video(self, channel: Channel, video: Video) -> None:
pass
@abstractmethod
def update_video(self, video: Video, watch_timestamp: datetime) -> None:
pass
class SqliteChannelRepository(ChannelRepository):
def __init__(self) -> None:
os.makedirs(config_dir, exist_ok=True)
self.file_path: str = os.path.join(config_dir, 'channels.db')
self.setup_database()
def setup_database(self) -> None:
connection = connect(self.file_path)
cursor = connection.cursor()
cursor.execute('PRAGMA foreign_keys = ON')
cursor.execute(
'CREATE TABLE IF NOT EXISTS channels ('
'channel_id VARCHAR PRIMARY KEY, name VARCHAR, url VARCHAR UNIQUE)'
)
cursor.execute(
'CREATE TABLE IF NOT EXISTS videos ('
'video_id VARCHAR PRIMARY KEY, name VARCHAR, url VARCHAR UNIQUE, '
'timestamp VARCHAR, watch_timestamp VARCHAR, channel_id VARCHAR, '
'FOREIGN KEY(channel_id) REFERENCES channels(channel_id))'
)
connection.commit()
connection.close()
def __enter__(self) -> ChannelRepository:
self.connection = connect(self.file_path)
return self
def __exit__(
self,
exc_type: Any,
exc_value: Any,
exc_traceback: Any,
) -> None:
self.connection.close()
def channel_to_params(self, channel: Channel) -> dict[str, str]:
return {
'channel_id': channel.channel_id,
'name': channel.name,
'url': channel.url,
}
def channel_data_to_channel(
self,
channel_data: tuple[str, str, str],
) -> Channel:
channel = Channel(
channel_id=channel_data[0],
name=channel_data[1],
url=channel_data[2],
)
for video in self.get_videos_for_channel(channel):
if video.watch_timestamp is not None:
channel.watched_videos[video.video_id] = video
else:
channel.new_videos[video.video_id] = video
return channel
def video_to_params(self, video: Video) -> dict[str, Any]:
watch_timestamp: Union[str, None]
if video.watch_timestamp is not None:
watch_timestamp = video.watch_timestamp.isoformat()
else:
watch_timestamp = video.watch_timestamp
return {
'video_id': video.video_id,
'name': video.name,
'url': video.url,
'timestamp': video.timestamp.isoformat(),
'watch_timestamp': watch_timestamp,
'channel_id': video.channel_id,
}
def video_data_to_video(
self,
video_data: tuple[str, str, str, str, str],
channel_id: str,
channel_name: str,
) -> Video:
watch_timestamp: Union[datetime, None]
if video_data[4] is not None:
watch_timestamp = datetime.fromisoformat(video_data[4])
else:
watch_timestamp = video_data[4]
return Video(
video_id=video_data[0],
name=video_data[1],
url=video_data[2],
timestamp=datetime.fromisoformat(video_data[3]),
watch_timestamp=watch_timestamp,
channel_id=channel_id,
channel_name=channel_name,
)
def get_channel(self, channel_id: str) -> Channel:
cursor = self.connection.cursor()
cursor.execute(
'SELECT * FROM channels WHERE channel_id=:channel_id',
{'channel_id': channel_id},
)
try:
return self.channel_data_to_channel(next(cursor))
except StopIteration:
raise ChannelNotFound
def get_all_channels(self) -> list[Channel]:
cursor = self.connection.cursor()
cursor.execute('SELECT * FROM channels')
return [
self.channel_data_to_channel(channel_data)
for channel_data in cursor
]
def get_videos_for_channel(
self,
channel: Channel,
) -> list[Video]:
cursor = self.connection.cursor()
cursor.execute(
'SELECT video_id, name, url, timestamp, watch_timestamp '
'FROM videos WHERE channel_id=:channel_id',
{'channel_id': channel.channel_id},
)
return [
self.video_data_to_video(
video_data=video_data,
channel_id=channel.channel_id,
channel_name=channel.name,
)
for video_data in cursor
]
def get_watched_videos(self) -> dict[str, Video]:
cursor = self.connection.cursor()
cursor.execute(
'SELECT video_id, videos.name, videos.url, timestamp, '
'watch_timestamp, channels.channel_id, channels.name FROM videos '
'LEFT JOIN channels ON channels.channel_id=videos.channel_id WHERE '
'watch_timestamp IS NOT NULL ORDER BY timestamp'
)
return {
video_data[0]: self.video_data_to_video(
video_data=video_data,
channel_id=video_data[5],
channel_name=video_data[6],
)
for video_data in cursor
}
def create_channel(self, channel: Channel) -> None:
cursor = self.connection.cursor()
cursor.execute(
'INSERT INTO channels VALUES (:channel_id, :name, :url)',
self.channel_to_params(channel),
)
self.connection.commit()
def update_channel(self, channel: Channel) -> None:
cursor = self.connection.cursor()
cursor.execute(
'UPDATE channels SET channel_id = :channel_id, name = :name, '
'url = :url WHERE channel_id=:channel_id',
self.channel_to_params(channel),
)
self.connection.commit()
def add_new_video(self, channel: Channel, video: Video) -> None:
cursor = self.connection.cursor()
cursor.execute(
'INSERT INTO videos VALUES (:video_id, :name, '
':url, :timestamp, :watch_timestamp, :channel_id)',
self.video_to_params(video),
)
self.connection.commit()
def update_video(self, video: Video, watch_timestamp: datetime) -> None:
cursor = self.connection.cursor()
cursor.execute(
'UPDATE videos SET watch_timestamp = :watch_timestamp '
'WHERE video_id=:video_id',
{
'watch_timestamp': watch_timestamp.isoformat(),
'video_id': video.video_id,
},
)
self.connection.commit()
@autoparams()
def create_channel_repository(config: Configuration) -> ChannelRepository:
repo_type = config.channel_repository_type
if repo_type == 'sqlite':
return SqliteChannelRepository()
else:
raise Exception(f'Unknown channel repository type: "{repo_type}"')