Rewrite to use ytrssil API

This commit is contained in:
Pavle Portic 2022-10-29 23:55:22 +02:00
parent 5f18adc0f1
commit 57a83f9fe2
Signed by: TheEdgeOfRage
GPG Key ID: 66AD4BA646FBC0D2
24 changed files with 254 additions and 970 deletions

View File

@ -1,29 +1,26 @@
NAME = ytrssil
TESTS = tests
.PHONY: setup-dev flake8 isort isort-fix mypy lint build clean
FILES_PY = $(shell find $(CURDIR)/$(NAME) $(CURDIR)/$(TESTS) -type f -name "*.py")
TEST_PY = $(shell find $(CURDIR)/$(TESTS) -type f -name "*.py")
NAME = ytrssil
FILES_PY = $(shell find $(CURDIR)/$(NAME) -type f -name "*.py")
setup-dev:
pip install -r requirements-dev.txt
pip install -e .
test:
python -m pytest --cov $(CURDIR)/$(NAME)
flake8:
@flake8 $(FILES_PY)
mypy:
@mypy --strict $(FILES_PY)
isort:
@isort -c $(FILES_PY)
validate: flake8 mypy isort
isort-fix:
@isort $(FILES_PY)
coverage:
python -m pytest --cov $(CURDIR)/$(NAME) --cov-report html
mypy:
@mypy --strict $(FILES_PY)
lint: flake8 isort mypy
build:
python setup.py sdist bdist_wheel
@ -31,6 +28,4 @@ build:
clean:
rm -rf $(CURDIR)/build
rm -rf $(CURDIR)/dist
rm -rf $(CURDIR)/htmlcov
rm -rf $(CURDIR)/.coverage
rm -rf $(CURDIR)/$(NAME).egg-info

View File

@ -1,14 +1,20 @@
# YouTube RSS manager
This is a simple CLI to manage YouTube subscriptions through RSS feeds
and watch new videos using `mpv`. It keeps track of watched videos in a local
sqlite database.
**This tool is still in early development and breaking changes across minor
versions are expected.**
and watch new videos using `mpv`.
## Configuration
It looks for a list of RSS URLs in `$XDG_CONFIG_HOME/ytrssil/feeds`
(~/.config/ by default), with one URL per line. Only YouTube channel feeds
are supported at this moment.
It looks for a configuration in `$XDG_CONFIG_HOME/ytrssil/config.json`
(~/.config/ by default).
Example:
```json
{
"username": "username",
"password": "password",
"api_url": "https://example.com",
"max_resolution": "1080"
}
```

View File

@ -1,7 +1,5 @@
aioresponses==0.7.*
flake8==3.*
types-requests==2.28.11.*
flake8==5.*
mypy==0.*
pytest==6.2.*
pytest-cov==3.0.0
pytest-mock==3.6.*
isort
wheel==0.37.*

View File

@ -1,3 +1,2 @@
aiohttp==3.7.*
feedparser==6.0.*
requests==2.28.*
inject==4.3.*

View File

View File

@ -1,34 +0,0 @@
from datetime import datetime
from ytrssil.datatypes import ChannelData, VideoData
FEED_XML: str = '''
<?xml version="1.0" encoding="UTF-8"?>
<feed xmlns:yt="http://www.youtube.com/xml/schemas/2015">
<yt:channelId>channel_id</yt:channelId>
<title>channel_name</title>
<entry>
<yt:videoId>video_id</yt:videoId>
<yt:channelId>channel_id</yt:channelId>
<title>video_name</title>
<link rel="alternate" href="https://www.youtube.com/watch?v=video_id"/>
<published>1970-01-01T00:00:00+00:00</published>
</entry>
</feed>
'''
TEST_VIDEO_DATA: VideoData = {
'video_id': 'video_id',
'name': 'video_name',
'url': 'https://www.youtube.com/watch?v=video_id',
'channel_id': 'channel_id',
'channel_name': 'channel_name',
'timestamp': datetime.fromisoformat('1970-01-01T00:00:00+00:00'),
'watch_timestamp': None,
}
TEST_CHANNEL_DATA: ChannelData = {
'channel_id': 'channel_id',
'name': 'channel_name',
'new_videos': {},
'watched_videos': {},
}

View File

@ -1,18 +0,0 @@
from ytrssil.bindings import setup_dependencies
from ytrssil.config import Configuration
from ytrssil.fetch import AioHttpFetcher
from ytrssil.parse import FeedparserParser
from ytrssil.protocols import ChannelRepository, Fetcher, Parser
from ytrssil.repository import SqliteChannelRepository
def test_setup_dependencies() -> None:
injector = setup_dependencies()
config = injector.get_instance(Configuration)
assert isinstance(config, Configuration)
assert isinstance(
injector.get_instance(ChannelRepository),
SqliteChannelRepository,
)
assert isinstance(injector.get_instance(Fetcher), AioHttpFetcher)
assert isinstance(injector.get_instance(Parser), FeedparserParser)

View File

@ -1,83 +0,0 @@
from datetime import datetime
from unittest.mock import MagicMock
from pytest_mock import MockerFixture
from tests.constants import TEST_CHANNEL_DATA, TEST_VIDEO_DATA
from ytrssil import cli
from ytrssil.datatypes import Channel, Video
def test_user_query(mocker: MockerFixture) -> None:
def mock_query(input: bytes) -> tuple[bytes, bytes]:
videos = input.decode('UTF-8').split('\n')
return (videos[0].encode('UTF-8'), b'')
popen_mock: MagicMock = mocker.patch.object(cli, 'Popen')
attrs = {'communicate': mock_query}
communicate_mock = mocker.MagicMock()
communicate_mock.configure_mock(**attrs)
popen_mock.return_value = communicate_mock
videos = {
f'video_id_{i}': Video(
video_id=f'video_id_{i}',
name='video',
url='url',
timestamp=datetime.utcnow(),
channel_id='channel_id',
channel_name='channel',
)
for i in range(2)
}
ret = cli.user_query(videos=videos)
assert ret == [videos['video_id_0']]
def test_watch_videos(mocker: MockerFixture) -> None:
repository_mock = mocker.MagicMock()
update_video = mocker.MagicMock()
repository_mock.__enter__.return_value.update_video = update_video
fetcher_mock = mocker.MagicMock()
channel = Channel.from_dict(TEST_CHANNEL_DATA)
video = Video.from_dict(TEST_VIDEO_DATA)
channel.add_video(video)
fetcher_mock.fetch_new_videos.return_value = (
{channel.channel_id: channel},
{video.video_id: video},
)
query_mock = mocker.patch.object(cli, 'user_query')
query_mock.return_value = [video]
fork_mock = mocker.patch.object(cli, 'fork')
cli.watch_videos(repository_manager=repository_mock, fetcher=fetcher_mock)
fork_mock.assert_called_once()
update_video.assert_called_once() # repository is a context manager
def test_main_no_arg(mocker: MockerFixture) -> None:
mock = mocker.patch.object(cli, 'watch_videos')
cli.main(['ytrssil'])
assert mock.called_once
def test_main_watch_videos(mocker: MockerFixture) -> None:
mock = mocker.patch.object(cli, 'watch_videos')
cli.main(['ytrssil', 'watch_videos'])
assert mock.called_once
def test_main_history(mocker: MockerFixture) -> None:
mock = mocker.patch.object(cli, 'watch_history')
cli.main(['ytrssil', 'history'])
assert mock.called_once
def test_main_mark(mocker: MockerFixture) -> None:
mock = mocker.patch.object(cli, 'mark_as_watched')
cli.main(['ytrssil', 'mark', datetime.utcnow().isoformat()])
assert mock.called_once

View File

@ -1,98 +0,0 @@
from datetime import datetime
from tests.constants import TEST_CHANNEL_DATA, TEST_VIDEO_DATA
from ytrssil.datatypes import Channel, ChannelData, Video
def test_video_str() -> None:
string = str(Video.from_dict(TEST_VIDEO_DATA))
assert string == 'channel_name - video_name - video_id'
def test_channel_str() -> None:
channel_data: ChannelData = TEST_CHANNEL_DATA.copy()
channel_data.update({
'new_videos': {
'video_id': Video(
video_id='video_id',
name='video_name',
url='https://www.youtube.com/watch?v=video_id',
channel_id='channel_id',
channel_name='channel_name',
timestamp=datetime.fromisoformat('1970-01-01T00:00:00+00:00'),
watch_timestamp=None,
),
},
})
channel_string = str(Channel.from_dict(channel_data))
assert channel_string == 'channel_name - 1'
def test_channel_add_new_video() -> None:
channel_data: ChannelData = TEST_CHANNEL_DATA.copy()
channel = Channel.from_dict(channel_data)
added_video = channel.add_video(Video(
video_id='video_id',
name='video_name',
url='https://www.youtube.com/watch?v=video_id',
channel_id='channel_id',
channel_name='channel_name',
timestamp=datetime.fromisoformat('1970-01-01T00:00:00+00:00'),
watch_timestamp=None,
))
assert added_video
assert list(channel.new_videos.keys()) == ['video_id']
def test_channel_add_existing_video() -> None:
channel_data: ChannelData = TEST_CHANNEL_DATA.copy()
channel_data.update({
'new_videos': {
'video_id': Video(
video_id='video_id',
name='video_name',
url='https://www.youtube.com/watch?v=video_id',
channel_id='channel_id',
channel_name='channel_name',
timestamp=datetime.fromisoformat('1970-01-01T00:00:00+00:00'),
watch_timestamp=None,
),
},
})
channel = Channel.from_dict(channel_data)
added_video = channel.add_video(Video(
video_id='video_id',
name='video_name',
url='https://www.youtube.com/watch?v=video_id',
channel_id='channel_id',
channel_name='channel_name',
timestamp=datetime.fromisoformat('1970-01-01T00:00:00+00:00'),
watch_timestamp=None,
))
assert not added_video
assert list(channel.new_videos.keys()) == ['video_id']
def test_channel_mark_video_as_watched() -> None:
video = Video(
video_id='video_id',
name='video_name',
url='https://www.youtube.com/watch?v=video_id',
channel_id='channel_id',
channel_name='channel_name',
timestamp=datetime.fromisoformat('1970-01-01T00:00:00+00:00'),
watch_timestamp=None,
)
channel_data: ChannelData = TEST_CHANNEL_DATA.copy()
channel_data.update({'new_videos': {'video_id': video}})
channel = Channel.from_dict(channel_data)
channel.mark_video_as_watched(video)
assert list(channel.new_videos.keys()) == []
assert list(channel.watched_videos.keys()) == ['video_id']

View File

@ -1,44 +0,0 @@
from __future__ import annotations
from collections.abc import Iterable
from aioresponses import aioresponses
from tests.constants import FEED_XML, TEST_CHANNEL_DATA, TEST_VIDEO_DATA
from ytrssil.config import Configuration
from ytrssil.datatypes import Channel, Video
from ytrssil.fetch import AioHttpFetcher, FetcherBase
def test_fetch_new_videos() -> None:
class MockFetcher(FetcherBase):
def fetch_feeds(self, urls: Iterable[str]) -> Iterable[str]:
return [FEED_XML]
fetcher = MockFetcher()
channel = Channel.from_dict(TEST_CHANNEL_DATA)
video = Video.from_dict(TEST_VIDEO_DATA)
channel.add_video(video)
channels, new_videos = fetcher.fetch_new_videos(
config=Configuration(
feed_url_getter_type='static',
feed_urls=[''],
),
parser=lambda _: channel,
)
assert channels[channel.channel_id] == channel
assert new_videos[TEST_VIDEO_DATA['video_id']] == video
def test_aiohttpfetcher_fetch_feeds() -> None:
feed_url = 'test_url'
with aioresponses() as mocked:
mocked.get(
url=feed_url,
body=FEED_XML,
)
fetcher = AioHttpFetcher()
xml = fetcher.fetch_feeds([feed_url])
assert xml == [FEED_XML]

View File

@ -1,47 +0,0 @@
from unittest import TestCase
from unittest.mock import Mock
from inject import Binder, clear_and_configure
from tests.constants import FEED_XML, TEST_CHANNEL_DATA, TEST_VIDEO_DATA
from ytrssil.config import Configuration
from ytrssil.datatypes import Channel, Video
from ytrssil.exceptions import ChannelNotFound
from ytrssil.parse import FeedparserParser, create_feed_parser
from ytrssil.protocols import ChannelRepository
def test_feedparser_channel_exists() -> None:
channel = Channel.from_dict(TEST_CHANNEL_DATA)
mock_repo = Mock()
mock_repo.get_channel.return_value = channel
parser = FeedparserParser(channel_repository=mock_repo)
assert parser(FEED_XML) == channel
def test_feedparser_new_channel() -> None:
channel = Channel.from_dict(TEST_CHANNEL_DATA)
channel.add_video(Video.from_dict(TEST_VIDEO_DATA))
mock_repo = Mock()
mock_repo.get_channel.side_effect = ChannelNotFound()
parser = FeedparserParser(channel_repository=mock_repo)
assert parser(FEED_XML) == channel
class TestCreateParser(TestCase):
def setUp(self) -> None:
clear_and_configure(self.inject)
def inject(self, binder: Binder) -> None:
binder.bind(ChannelRepository, Mock())
def test_create_feedparser_parser(self) -> None:
parser = create_feed_parser(Configuration(parser_type='feedparser'))
self.assertIsInstance(parser, FeedparserParser)
def test_fail_create_parser(self) -> None:
with self.assertRaises(Exception) as e:
create_feed_parser(Configuration(parser_type='fail'))
self.assertEqual('Unknown feed parser type: "fail"', e.exception)

View File

@ -1,10 +1,9 @@
from collections.abc import Sequence
from ytrssil.api import get_new_video_count, get_new_videos
from ytrssil.datatypes import Channel, Video
from ytrssil.datatypes import Video
__all__: Sequence[str] = (
'Channel',
'Video',
'get_new_video_count',
'get_new_videos',

View File

@ -1,24 +0,0 @@
from inject import autoparams
from ytrssil.bindings import setup_dependencies
from ytrssil.datatypes import Video
from ytrssil.protocols import ChannelRepository, Fetcher
def get_new_videos() -> list[Video]:
setup_dependencies()
@autoparams()
def _get_new_videos(
repository_manager: ChannelRepository,
fetcher: Fetcher,
) -> dict[str, Video]:
with repository_manager as _:
_, new_videos = fetcher.fetch_new_videos()
return new_videos
return list(_get_new_videos().values())
def get_new_video_count() -> int:
return len(get_new_videos())

View File

@ -1,18 +1,13 @@
from inject import Binder, Injector, clear_and_configure, get_injector_or_die
from ytrssil.config import Configuration
from ytrssil.fetch import create_fetcher
from ytrssil.parse import create_feed_parser
from ytrssil.protocols import ChannelRepository, Fetcher, Parser
from ytrssil.repository import create_channel_repository
from ytrssil.client import HttpClient
from ytrssil.config import Configuration, load_config
from ytrssil.protocols import Client
def dependency_configuration(binder: Binder) -> None:
config = Configuration()
binder.bind(Configuration, config)
binder.bind_to_constructor(ChannelRepository, create_channel_repository)
binder.bind_to_constructor(Fetcher, create_fetcher)
binder.bind_to_constructor(Parser, create_feed_parser)
binder.bind(Configuration, load_config())
binder.bind_to_constructor(Client, HttpClient)
def setup_dependencies() -> Injector:

View File

@ -1,6 +1,4 @@
from collections.abc import Iterator
from datetime import datetime, timezone
from operator import attrgetter
from os import execv, fork
from subprocess import PIPE, Popen
from sys import argv, stderr
@ -8,12 +6,12 @@ from sys import argv, stderr
from inject import autoparams
from ytrssil.bindings import setup_dependencies
from ytrssil.constants import mpv_options
from ytrssil.config import Configuration
from ytrssil.datatypes import Video
from ytrssil.protocols import ChannelRepository, Fetcher
from ytrssil.protocols import Client
def user_query(videos: dict[str, Video], reverse: bool = False) -> list[Video]:
def user_query(videos: list[Video], reverse: bool = False) -> list[str]:
p = Popen(
['fzf', '-m'],
stdout=PIPE,
@ -21,19 +19,19 @@ def user_query(videos: dict[str, Video], reverse: bool = False) -> list[Video]:
)
video_list: Iterator[Video]
if reverse:
video_list = reversed(videos.values())
video_list = reversed(videos)
else:
video_list = iter(videos.values())
video_list = iter(videos)
input_bytes = '\n'.join(map(str, video_list)).encode('UTF-8')
stdout, _ = p.communicate(input=input_bytes)
videos_str: list[str] = stdout.decode('UTF-8').strip().split('\n')
ret: list[Video] = []
ret: list[str] = []
for video_str in videos_str:
*_, video_id = video_str.split(' - ')
try:
ret.append(videos[video_id])
ret.append(video_id)
except KeyError:
pass
@ -41,111 +39,117 @@ def user_query(videos: dict[str, Video], reverse: bool = False) -> list[Video]:
@autoparams()
def fetch_new_videos(
repository_manager: ChannelRepository,
fetcher: Fetcher,
) -> int:
with repository_manager as _:
_, new_videos = fetcher.fetch_new_videos()
if not new_videos:
print('No new videos', file=stderr)
return 1
def fetch_new_videos(client: Client) -> int:
client.fetch()
return 0
@autoparams()
def register(client: Client) -> int:
client.register()
return 0
@autoparams()
def watch_videos(config: Configuration, client: Client) -> int:
videos = client.get_new_videos()
if not videos:
print('No new videos', file=stderr)
return 1
selected_videos = user_query(videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
video_urls = [
f'https://www.youtube.com/watch?v={video_id}'
for video_id in selected_videos
]
cmd = ['/usr/bin/mpv', *config.mpv_options, *video_urls]
if (fork() == 0):
execv(cmd[0], cmd)
for video_id in selected_videos:
client.mark_video_as_watched(video_id)
return 0
@autoparams()
def watch_videos(
repository_manager: ChannelRepository,
fetcher: Fetcher,
) -> int:
with repository_manager as repository:
new_videos = repository.get_new_videos()
if not new_videos:
print('No new videos', file=stderr)
return 1
def print_url(client: Client) -> int:
videos = client.get_new_videos()
if not videos:
print('No new videos', file=stderr)
return 1
selected_videos = user_query(new_videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
selected_videos = user_query(videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
video_urls = [video.url for video in selected_videos]
cmd = ['/usr/bin/mpv', *mpv_options, *video_urls]
if (fork() == 0):
execv(cmd[0], cmd)
for video in selected_videos:
selected_channel = repository.get_channel(video.channel_id)
selected_channel.mark_video_as_watched(video)
watch_timestamp = datetime.utcnow().replace(tzinfo=timezone.utc)
repository.update_video(video, watch_timestamp)
for video_id in selected_videos:
client.mark_video_as_watched(video_id)
print(f'https://www.youtube.com/watch?v={video_id}')
return 0
@autoparams()
def print_url(
repository_manager: ChannelRepository,
fetcher: Fetcher,
) -> int:
with repository_manager as repository:
new_videos = repository.get_new_videos()
if not new_videos:
print('No new videos', file=stderr)
return 1
def mark_as_watched(client: Client) -> int:
videos = client.get_new_videos()
if not videos:
print('No new videos', file=stderr)
return 1
selected_videos = user_query(new_videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
selected_videos = user_query(videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
for video in selected_videos:
selected_channel = repository.get_channel(video.channel_id)
selected_channel.mark_video_as_watched(video)
watch_timestamp = datetime.utcnow().replace(tzinfo=timezone.utc)
repository.update_video(video, watch_timestamp)
print(video.url)
for video_id in selected_videos:
client.mark_video_as_watched(video_id)
return 0
@autoparams()
def watch_history(
repository_manager: ChannelRepository,
fetcher: Fetcher,
) -> int:
with repository_manager as repository:
watched_videos = repository.get_watched_videos()
selected_videos = user_query(watched_videos, reverse=True)
if not selected_videos:
print('No video selected', file=stderr)
return 1
def watch_history(config: Configuration, client: Client) -> int:
videos = client.get_watched_videos()
if not videos:
print('No new videos', file=stderr)
return 1
video_urls = [video.url for video in selected_videos]
cmd = ['/usr/bin/mpv', *mpv_options, *video_urls]
if (fork() == 0):
execv(cmd[0], cmd)
selected_videos = user_query(videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
video_urls = [
f'https://www.youtube.com/watch?v={video_id}'
for video_id in selected_videos
]
cmd = ['/usr/bin/mpv', *config.mpv_options, *video_urls]
if (fork() == 0):
execv(cmd[0], cmd)
return 0
@autoparams()
def mark_as_watched(
repository_manager: ChannelRepository,
fetcher: Fetcher,
up_to_date: datetime
) -> int:
with repository_manager as repository:
channels, new_videos = fetcher.fetch_new_videos()
for video in sorted(new_videos.values(), key=attrgetter('timestamp')):
if video.timestamp >= up_to_date:
continue
def mark_as_unwatched(client: Client) -> int:
videos = client.get_watched_videos()
if not videos:
print('No new videos', file=stderr)
return 1
selected_channel = channels[video.channel_id]
selected_channel.mark_video_as_watched(video)
watch_timestamp = datetime.utcnow().replace(tzinfo=timezone.utc)
repository.update_video(video, watch_timestamp)
selected_videos = user_query(videos)
if not selected_videos:
print('No video selected', file=stderr)
return 2
for video_id in selected_videos:
client.mark_video_as_unwatched(video_id)
return 0
@ -160,6 +164,8 @@ def main(args: list[str] = argv) -> int:
if command == 'fetch':
return fetch_new_videos()
elif command == 'register':
return register()
elif command == 'watch':
return watch_videos()
elif command == 'print':
@ -167,10 +173,15 @@ def main(args: list[str] = argv) -> int:
elif command == 'history':
return watch_history()
elif command == 'mark':
up_to_date = datetime.fromisoformat(args[2])
return mark_as_watched(up_to_date=up_to_date)
return mark_as_watched()
elif command == 'unmark':
return mark_as_unwatched()
else:
print(f'Unknown command "{command}"', file=stderr)
print(
'Available commands: fetch, watch, print, history, mark, unmark',
file=stderr,
)
return 1
return 0

71
ytrssil/client.py Normal file
View File

@ -0,0 +1,71 @@
import requests
from inject import autoparams
from ytrssil.config import Configuration
from ytrssil.datatypes import User, Video
class HttpClient:
@autoparams('config')
def __init__(self, config: Configuration) -> None:
self.base_url: str = config.api_url
self.auth = requests.auth.HTTPBasicAuth(
config.username,
config.password,
)
def fetch(self) -> None:
resp = requests.post(url=f'{self.base_url}/fetch')
resp.raise_for_status()
def register(self) -> None:
user = User(username=self.auth.username, password=self.auth.password)
resp = requests.post(url=f'{self.base_url}/fetch', json=user)
resp.raise_for_status()
def subscribe_to_channel(self, channel_id: str) -> None:
resp = requests.post(
url=f'{self.base_url}/api/channels/{channel_id}/subscribe',
auth=self.auth,
)
resp.raise_for_status()
def get_new_videos(self) -> list[Video]:
resp = requests.post(
url=f'{self.base_url}/api/videos/new',
auth=self.auth,
)
resp.raise_for_status()
data = resp.json()
ret: list[Video] = []
for video_data in data['videos']:
ret.append(Video(**video_data))
return ret
def get_watched_videos(self) -> list[Video]:
resp = requests.post(
url=f'{self.base_url}/api/videos/watched',
auth=self.auth,
)
resp.raise_for_status()
data = resp.json()
ret: list[Video] = []
for video_data in data['videos']:
ret.append(Video(**video_data))
return ret
def mark_video_as_watched(self, video_id: str) -> None:
resp = requests.post(
url=f'{self.base_url}/api/videos/{video_id}/watch',
auth=self.auth,
)
resp.raise_for_status()
def mark_video_as_unwatched(self, video_id: str) -> None:
resp = requests.post(
url=f'{self.base_url}/api/videos/{video_id}/unwatch',
auth=self.auth,
)
resp.raise_for_status()

View File

@ -1,30 +1,33 @@
from __future__ import annotations
import json
import os
from collections.abc import Iterator
from dataclasses import dataclass, field
from typing import Any
from ytrssil.constants import config_dir
from dataclasses import dataclass
from typing import Literal
@dataclass
class Configuration:
feed_url_getter_type: str = 'file'
feed_urls: list[str] = field(default_factory=lambda: list())
channel_repository_type: str = 'sqlite'
fetcher_type: str = 'aiohttp'
parser_type: str = 'feedparser'
username: str
password: str
api_url: str = 'https://ytrssil.theedgeofrage.com'
max_res: Literal['480', '720', '1080', '1440', '2160'] = '1440'
@classmethod
def from_dict(cls, config_dict: dict[str, Any]) -> Configuration:
return cls(**config_dict)
@property
def mpv_options(self) -> list[str]:
return [
'--no-terminal',
f'--ytdl-format=bestvideo[height<=?{self.max_res}]+bestaudio/best',
]
def get_feed_urls(self) -> Iterator[str]:
if self.feed_url_getter_type == 'file':
file_path = os.path.join(config_dir, 'feeds')
with open(file_path, 'r') as f:
for line in f:
yield line.strip()
elif self.feed_url_getter_type == 'static':
yield from self.feed_urls
def load_config() -> Configuration:
config_prefix: str
try:
config_prefix = os.environ['XDG_CONFIG_HOME']
except KeyError:
config_prefix = os.path.expanduser('~/.config')
config_path: str = os.path.join(config_prefix, 'ytrssil', 'config.json')
with open(config_path) as f:
config_data = json.load(f)
return Configuration(**config_data)

View File

@ -1,13 +0,0 @@
import os
config_prefix: str
try:
config_prefix = os.environ['XDG_CONFIG_HOME']
except KeyError:
config_prefix = os.path.expanduser('~/.config')
config_dir: str = os.path.join(config_prefix, 'ytrssil')
mpv_options: list[str] = [
'--no-terminal',
'--ytdl-format=bestvideo[height<=?1080]+bestaudio/best',
]

View File

@ -1,33 +1,12 @@
from __future__ import annotations
from dataclasses import dataclass, field
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional, TypedDict
class VideoData(TypedDict):
video_id: str
name: str
url: str
channel_id: str
channel_name: str
timestamp: datetime
watch_timestamp: Optional[datetime]
class ChannelData(TypedDict):
channel_id: str
name: str
new_videos: dict[str, Any]
watched_videos: dict[str, Any]
from typing import Optional
@dataclass
class Video:
video_id: str
name: str
url: str
channel_id: str
channel_name: str
timestamp: datetime
watch_timestamp: Optional[datetime] = None
@ -35,40 +14,17 @@ class Video:
def __str__(self) -> str:
return f'{self.channel_name} - {self.name} - {self.video_id}'
@classmethod
def from_dict(cls, data: VideoData) -> Video:
return cls(**data)
@dataclass
class Channel:
channel_id: str
name: str
new_videos: dict[str, Video] = field(default_factory=lambda: dict())
watched_videos: dict[str, Video] = field(default_factory=lambda: dict())
def add_video(self, video: Video) -> bool:
if (
video.video_id in self.watched_videos
or video.video_id in self.new_videos
):
return False
self.new_videos[video.video_id] = video
return True
def mark_video_as_watched(self, video: Video) -> None:
self.new_videos.pop(video.video_id)
self.watched_videos[video.video_id] = video
def __str__(self) -> str:
return f'{self.name} - {len(self.new_videos)}'
return f'{self.name} - {self.channel_id}'
@classmethod
def from_dict(cls, data: ChannelData) -> Channel:
return cls(
channel_id=data['channel_id'],
name=data['name'],
new_videos=data.get('new_videos', {}).copy(),
watched_videos=data.get('watched_videos', {}).copy(),
)
@dataclass
class User:
username: str
password: str

View File

@ -1,2 +0,0 @@
class ChannelNotFound(Exception):
pass

View File

@ -1,64 +0,0 @@
from asyncio import gather, run
from collections.abc import Iterable
from aiohttp import ClientResponse, ClientSession
from inject import autoparams
from ytrssil.config import Configuration
from ytrssil.datatypes import Channel, Video
from ytrssil.protocols import Fetcher, Parser
class FetcherBase:
def fetch_feeds(
self,
urls: Iterable[str],
) -> Iterable[str]: # pragma: no cover
raise NotImplementedError
@autoparams()
def fetch_new_videos(
self,
config: Configuration,
parser: Parser,
) -> tuple[dict[str, Channel], dict[str, Video]]:
feed_urls = config.get_feed_urls()
channels: dict[str, Channel] = {}
new_videos: dict[str, Video] = {}
for feed in self.fetch_feeds(feed_urls):
channel = parser(feed)
channels[channel.channel_id] = channel
new_videos.update(channel.new_videos)
return channels, new_videos
class AioHttpFetcher(FetcherBase):
async def request(
self,
session: ClientSession,
url: str,
) -> ClientResponse:
return await session.get(url=url)
async def async_fetch_feeds(self, urls: Iterable[str]) -> Iterable[str]:
async with ClientSession() as session:
responses: Iterable[ClientResponse] = await gather(*[
self.request(session, url) for url in urls
])
return [
await response.text(encoding='UTF-8')
for response in responses
]
def fetch_feeds(self, urls: Iterable[str]) -> Iterable[str]:
return run(self.async_fetch_feeds(urls))
@autoparams()
def create_fetcher(config: Configuration) -> Fetcher:
fetcher_type = config.fetcher_type
if fetcher_type == 'aiohttp':
return AioHttpFetcher()
else:
raise Exception(f'Unknown feed fetcher type: "{fetcher_type}"')

View File

@ -1,52 +0,0 @@
from datetime import datetime
import feedparser
from inject import autoparams
from ytrssil.config import Configuration
from ytrssil.datatypes import Channel, Video
from ytrssil.exceptions import ChannelNotFound
from ytrssil.protocols import ChannelRepository, Parser
class ParserBase:
@autoparams('channel_repository')
def __init__(self, channel_repository: ChannelRepository) -> None:
self.repository = channel_repository
class FeedparserParser(ParserBase):
def __call__(self, feed_content: str) -> Channel:
d = feedparser.parse(feed_content)
channel_id: str = d['feed']['yt_channelid']
try:
channel = self.repository.get_channel(channel_id)
except ChannelNotFound:
channel = Channel(
channel_id=channel_id,
name=d['feed']['title'],
)
self.repository.create_channel(channel)
for entry in d['entries']:
video = Video(
video_id=entry['yt_videoid'],
name=entry['title'],
url=entry['link'],
timestamp=datetime.fromisoformat(entry['published']),
channel_id=channel.channel_id,
channel_name=channel.name,
)
if channel.add_video(video):
self.repository.add_new_video(channel, video)
return channel
@autoparams()
def create_feed_parser(config: Configuration) -> Parser:
parser_type = config.parser_type
if parser_type == 'feedparser':
return FeedparserParser()
else:
raise Exception(f'Unknown feed parser type: "{parser_type}"')

View File

@ -1,70 +1,32 @@
from __future__ import annotations
from typing import Protocol
from datetime import datetime
from types import TracebackType
from typing import Iterable, Optional, Protocol, Type
from ytrssil.config import Configuration
from ytrssil.datatypes import Channel, Video
from ytrssil.datatypes import Video
class ChannelRepository(Protocol):
def __enter__(self) -> ChannelRepository: # pragma: no cover
class Client(Protocol):
def fetch(self) -> None: # pragma: no cover
...
def __exit__(
def register(self) -> None: # pragma: no cover
...
def subscribe_to_channel(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
channel_id: str,
) -> None: # pragma: no cover
...
def get_channel(self, channel_id: str) -> Channel: # pragma: no cover
def get_new_videos(self) -> list[Video]: # pragma: no cover
...
def get_all_channels(self) -> list[Channel]: # pragma: no cover
def get_watched_videos(self) -> list[Video]: # pragma: no cover
...
def get_watched_videos(self) -> dict[str, Video]: # pragma: no cover
def mark_video_as_watched(self, video_id: str) -> None: # pragma: no cover
...
def get_new_videos(self) -> dict[str, Video]: # pragma: no cover
...
def create_channel(self, channel: Channel) -> None: # pragma: no cover
...
def add_new_video(
def mark_video_as_unwatched(
self,
channel: Channel,
video: Video,
video_id: str,
) -> None: # pragma: no cover
...
def update_video(
self,
video: Video,
watch_timestamp: datetime,
) -> None: # pragma: no cover
...
class Fetcher(Protocol):
def fetch_feeds(
self,
urls: Iterable[str],
) -> Iterable[str]: # pragma: no cover
...
def fetch_new_videos(
self,
config: Optional[Configuration] = None,
parser: Optional[Parser] = None,
) -> tuple[dict[str, Channel], dict[str, Video]]: # pragma: no cover
...
class Parser(Protocol):
def __call__(self, feed_content: str) -> Channel: # pragma: no cover
...

View File

@ -1,232 +0,0 @@
import os
from datetime import datetime
from sqlite3 import connect
from types import TracebackType
from typing import Any, Optional, Type
from inject import autoparams
from ytrssil.config import Configuration
from ytrssil.constants import config_dir
from ytrssil.datatypes import Channel, Video
from ytrssil.exceptions import ChannelNotFound
from ytrssil.protocols import ChannelRepository
class SqliteChannelRepository:
def __init__(self) -> None:
os.makedirs(config_dir, exist_ok=True)
self.file_path: str = os.path.join(config_dir, 'channels.db')
self.setup_database()
def setup_database(self) -> None:
connection = connect(self.file_path)
cursor = connection.cursor()
cursor.execute('PRAGMA foreign_keys = ON')
cursor.execute(
'CREATE TABLE IF NOT EXISTS channels ('
'channel_id VARCHAR PRIMARY KEY, name VARCHAR)'
)
cursor.execute(
'CREATE TABLE IF NOT EXISTS videos ('
'video_id VARCHAR PRIMARY KEY, name VARCHAR, url VARCHAR UNIQUE, '
'timestamp VARCHAR, watch_timestamp VARCHAR, channel_id VARCHAR, '
'FOREIGN KEY(channel_id) REFERENCES channels(channel_id))'
)
connection.commit()
connection.close()
def __enter__(self) -> ChannelRepository:
self.connection = connect(self.file_path)
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.connection.close()
def channel_to_params(self, channel: Channel) -> dict[str, str]:
return {
'channel_id': channel.channel_id,
'name': channel.name,
}
def channel_data_to_channel(
self,
channel_data: tuple[str, str, str],
) -> Channel:
channel = Channel(
channel_id=channel_data[0],
name=channel_data[1],
)
for video in self.get_videos_for_channel(channel):
if video.watch_timestamp is not None:
channel.watched_videos[video.video_id] = video
else:
channel.new_videos[video.video_id] = video
return channel
def video_to_params(self, video: Video) -> dict[str, Any]:
watch_timestamp: Optional[str]
if video.watch_timestamp is not None:
watch_timestamp = video.watch_timestamp.isoformat()
else:
watch_timestamp = video.watch_timestamp
return {
'video_id': video.video_id,
'name': video.name,
'url': video.url,
'timestamp': video.timestamp.isoformat(),
'watch_timestamp': watch_timestamp,
'channel_id': video.channel_id,
}
def video_data_to_video(
self,
video_data: tuple[str, str, str, str, str],
channel_id: str,
channel_name: str,
) -> Video:
watch_timestamp: Optional[datetime]
if video_data[4] is not None:
watch_timestamp = datetime.fromisoformat(video_data[4])
else:
watch_timestamp = video_data[4]
return Video(
video_id=video_data[0],
name=video_data[1],
url=video_data[2],
timestamp=datetime.fromisoformat(video_data[3]),
watch_timestamp=watch_timestamp,
channel_id=channel_id,
channel_name=channel_name,
)
def get_channel(self, channel_id: str) -> Channel:
cursor = self.connection.cursor()
cursor.execute(
'SELECT * FROM channels WHERE channel_id=:channel_id',
{'channel_id': channel_id},
)
try:
return self.channel_data_to_channel(next(cursor))
except StopIteration:
raise ChannelNotFound
def get_all_channels(self) -> list[Channel]:
cursor = self.connection.cursor()
cursor.execute('SELECT * FROM channels')
return [
self.channel_data_to_channel(channel_data)
for channel_data in cursor
]
def get_videos_for_channel(
self,
channel: Channel,
) -> list[Video]:
cursor = self.connection.cursor()
cursor.execute(
'SELECT video_id, name, url, timestamp, watch_timestamp '
'FROM videos WHERE channel_id=:channel_id',
{'channel_id': channel.channel_id},
)
return [
self.video_data_to_video(
video_data=video_data,
channel_id=channel.channel_id,
channel_name=channel.name,
)
for video_data in cursor
]
def get_watched_videos(self) -> dict[str, Video]:
cursor = self.connection.cursor()
cursor.execute(
'SELECT video_id, videos.name, url, timestamp, '
'watch_timestamp, channels.channel_id, channels.name FROM videos '
'LEFT JOIN channels ON channels.channel_id=videos.channel_id '
'WHERE watch_timestamp IS NOT NULL ORDER BY timestamp'
)
return {
video_data[0]: self.video_data_to_video(
video_data=video_data,
channel_id=video_data[5],
channel_name=video_data[6],
)
for video_data in cursor
}
def get_new_videos(self) -> dict[str, Video]:
cursor = self.connection.cursor()
cursor.execute(
'SELECT video_id, videos.name, url, timestamp, '
'watch_timestamp, channels.channel_id, channels.name FROM videos '
'LEFT JOIN channels ON channels.channel_id=videos.channel_id '
'WHERE watch_timestamp IS NULL ORDER BY timestamp'
)
return {
video_data[0]: self.video_data_to_video(
video_data=video_data,
channel_id=video_data[5],
channel_name=video_data[6],
)
for video_data in cursor
}
def create_channel(self, channel: Channel) -> None:
cursor = self.connection.cursor()
cursor.execute(
'INSERT INTO channels VALUES (:channel_id, :name)',
self.channel_to_params(channel),
)
self.connection.commit()
def update_channel(self, channel: Channel) -> None:
cursor = self.connection.cursor()
cursor.execute(
'UPDATE channels SET channel_id = :channel_id, name = :name, '
'WHERE channel_id=:channel_id',
self.channel_to_params(channel),
)
self.connection.commit()
def add_new_video(self, channel: Channel, video: Video) -> None:
cursor = self.connection.cursor()
cursor.execute(
'INSERT INTO videos VALUES (:video_id, :name, '
':url, :timestamp, :watch_timestamp, :channel_id)',
self.video_to_params(video),
)
self.connection.commit()
def update_video(self, video: Video, watch_timestamp: datetime) -> None:
cursor = self.connection.cursor()
cursor.execute(
'UPDATE videos SET watch_timestamp = :watch_timestamp '
'WHERE video_id=:video_id',
{
'watch_timestamp': watch_timestamp.isoformat(),
'video_id': video.video_id,
},
)
self.connection.commit()
@autoparams()
def create_channel_repository(config: Configuration) -> ChannelRepository:
repo_type = config.channel_repository_type
if repo_type == 'sqlite':
return SqliteChannelRepository()
else:
raise Exception(f'Unknown channel repository type: "{repo_type}"')