Add background fetch routine and handle fetch operations in parallel

This commit is contained in:
Pavle Portic 2022-10-31 02:39:52 +01:00
parent 8b098dadb2
commit 77031ef602
Signed by: TheEdgeOfRage
GPG Key ID: 66AD4BA646FBC0D2
12 changed files with 247 additions and 38 deletions

View File

@ -17,6 +17,7 @@ bin/moq:
gen-mocks: bin/moq
./bin/moq -pkg db_mock -out ./mocks/db/db.go ./db DB
./bin/moq -pkg parser_mock -out ./mocks/feedparser/feedparser.go ./feedparser Parser
go fmt ./...
bin/golangci-lint:

View File

@ -13,6 +13,7 @@ import (
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/config"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/db"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/feedparser"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/handler"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/httpserver/auth"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/httpserver/ytrssil"
@ -24,6 +25,16 @@ func init() {
time.Local = time.UTC
}
func fetcherRoutine(l log.Logger, h handler.Handler) {
for {
err := h.FetchVideos(context.Background())
if err != nil {
l.Log("level", "ERROR", "function", "main.fetcherRoutine", "call", "handler.FetchVideos", "err", err)
}
time.Sleep(5 * time.Minute)
}
}
func main() {
log := log.NewLogger()
@ -32,14 +43,13 @@ func main() {
log.Log("level", "FATAL", "call", "config.Parse", "error", err)
return
}
db, err := db.NewPostgresDB(log, config.DB)
if err != nil {
log.Log("level", "FATAL", "call", "db.NewPostgresDB", "error", err)
return
}
handler := handler.New(log, db)
parser := feedparser.NewParser(log)
handler := handler.New(log, db, parser)
gin.SetMode(gin.ReleaseMode)
router, err := ytrssil.SetupGinRouter(
log,
@ -79,6 +89,9 @@ func main() {
}
}()
// start periodic fetch videos routine
go fetcherRoutine(log, handler)
log.Log(
"level", "INFO",
"msg", "ytrssil API is starting up",

View File

@ -14,6 +14,7 @@ import (
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/config"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/db"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/feedparser"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/handler"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/httpserver/auth"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/httpserver/ytrssil"
@ -34,8 +35,8 @@ func setupTestServer(t *testing.T, authEnabled bool) (*http.Server, db.DB) {
if !assert.NoError(t, err) {
return nil, nil
}
handler := handler.New(l, db)
parser := feedparser.NewParser(l)
handler := handler.New(l, db, parser)
gin.SetMode(gin.TestMode)
router, err := ytrssil.SetupGinRouter(
l,

View File

@ -11,7 +11,7 @@ services:
volumes:
- postgres-data:/var/lib/postgresql/data
healthcheck:
test: [ "CMD-SHELL", "pg_isready -d $${POSTGRES_DB} -U $${POSTGRES_USER}" ]
test: ["CMD-SHELL", "pg_isready -d $${POSTGRES_DB} -U $${POSTGRES_USER}"]
interval: 5s
timeout: 2s
retries: 5

View File

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"sync"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/lib/log"
"github.com/paulrosania/go-charset/charset"
@ -18,29 +19,31 @@ var (
var urlFormat = "https://www.youtube.com/feeds/videos.xml?channel_id=%s"
// Video struct for each video in the feed
type Video struct {
ID string `xml:"id"`
Title string `xml:"title"`
Published Date `xml:"published"`
type Parser interface {
Parse(channelID string) (*Channel, error)
ParseThreadSafe(channelID string, channelChan chan *Channel, errChan chan error, mu *sync.Mutex, wg *sync.WaitGroup)
}
// Channel struct for RSS
type Channel struct {
Name string `xml:"title"`
Videos []Video `xml:"entry"`
type parser struct {
log log.Logger
}
func read(l log.Logger, url string) (io.ReadCloser, error) {
func NewParser(l log.Logger) *parser {
return &parser{
log: l,
}
}
func (p *parser) fetch(url string) (io.ReadCloser, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
l.Log("level", "ERROR", "function", "feedparser.read", "call", "http.NewRequest", "error", err)
p.log.Log("level", "ERROR", "function", "feedparser.fetch", "call", "http.NewRequest", "error", err)
return nil, err
}
response, err := http.DefaultClient.Do(req)
if err != nil {
l.Log("level", "ERROR", "function", "feedparser.read", "call", "http.Do", "error", err)
p.log.Log("level", "ERROR", "function", "feedparser.fetch", "call", "http.Do", "error", err)
return nil, err
}
@ -54,9 +57,9 @@ func read(l log.Logger, url string) (io.ReadCloser, error) {
}
// Parse parses a YouTube channel XML feed from a channel ID
func Parse(l log.Logger, channelID string) (*Channel, error) {
func (p *parser) Parse(channelID string) (*Channel, error) {
url := fmt.Sprintf(urlFormat, channelID)
reader, err := read(l, url)
reader, err := p.fetch(url)
if err != nil {
return nil, err
}
@ -67,8 +70,23 @@ func Parse(l log.Logger, channelID string) (*Channel, error) {
var channel Channel
if err := xmlDecoder.Decode(&channel); err != nil {
l.Log("level", "ERROR", "function", "feedparser.read", "call", "xml.Decode", "error", err)
p.log.Log("level", "ERROR", "function", "feedparser.Parse", "call", "xml.Decode", "error", err)
return nil, fmt.Errorf("%w: %s", ErrParseFailed, err.Error())
}
channel.ID = channelID
return &channel, nil
}
// ParseThreadSafe calls Parse, but additionally accepts an out parameter to store the result,
// as well as a mutex and wait group to run multiple fetches in parallel
func (p *parser) ParseThreadSafe(
channelID string, channelChan chan *Channel, errChan chan error, mu *sync.Mutex, wg *sync.WaitGroup,
) {
channel, err := p.Parse(channelID)
mu.Lock()
channelChan <- channel
errChan <- err
mu.Unlock()
wg.Done()
}

15
feedparser/models.go Normal file
View File

@ -0,0 +1,15 @@
package feedparser
// Video struct for each video in the feed
type Video struct {
ID string `xml:"id"`
Title string `xml:"title"`
Published Date `xml:"published"`
}
// Channel struct for RSS
type Channel struct {
ID string
Name string `xml:"title"`
Videos []Video `xml:"entry"`
}

View File

@ -5,12 +5,11 @@ import (
"errors"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/db"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/feedparser"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/models"
)
func (h *handler) SubscribeToChannel(ctx context.Context, username string, channelID string) error {
parsedChannel, err := feedparser.Parse(h.log, channelID)
parsedChannel, err := h.parser.Parse(channelID)
if err != nil {
return err
}

View File

@ -4,6 +4,7 @@ import (
"context"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/db"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/feedparser"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/lib/log"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/models"
)
@ -20,10 +21,15 @@ type Handler interface {
}
type handler struct {
log log.Logger
db db.DB
log log.Logger
db db.DB
parser feedparser.Parser
}
func New(log log.Logger, db db.DB) *handler {
return &handler{log: log, db: db}
func New(log log.Logger, db db.DB, parser feedparser.Parser) *handler {
return &handler{
log: log,
db: db,
parser: parser,
}
}

View File

@ -10,6 +10,7 @@ import (
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/config"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/lib/log"
db_mock "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/mocks/db"
parser_mock "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/mocks/feedparser"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/models"
)
@ -35,7 +36,7 @@ func TestGetNewVideos(t *testing.T) {
},
}, nil
},
})
}, &parser_mock.ParserMock{})
// Act
resp, err := handler.GetNewVideos(context.TODO(), "username")

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"strings"
"sync"
"time"
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/db"
@ -37,7 +38,7 @@ func (h *handler) addVideoToAllSubscribers(ctx context.Context, channelID string
return nil
}
func (h *handler) fetchVideosForChannel(ctx context.Context, channelID string, parsedChannel *feedparser.Channel) {
func (h *handler) addVideosForChannel(ctx context.Context, parsedChannel *feedparser.Channel) {
for _, parsedVideo := range parsedChannel.Videos {
date, err := parsedVideo.Published.Parse()
if err != nil {
@ -45,20 +46,20 @@ func (h *handler) fetchVideosForChannel(ctx context.Context, channelID string, p
continue
}
id := strings.Split(parsedVideo.ID, ":")[2]
videoID := strings.Split(parsedVideo.ID, ":")[2]
video := models.Video{
ID: id,
ID: videoID,
Title: parsedVideo.Title,
PublishedTime: date,
}
err = h.db.AddVideo(ctx, video, channelID)
err = h.db.AddVideo(ctx, video, parsedChannel.ID)
if err != nil {
if !errors.Is(err, db.ErrVideoExists) {
h.log.Log("level", "WARNING", "call", "db.AddVideo", "err", err)
}
continue
}
err = h.addVideoToAllSubscribers(ctx, channelID, id)
err = h.addVideoToAllSubscribers(ctx, parsedChannel.ID, videoID)
if err != nil {
continue
}
@ -66,18 +67,29 @@ func (h *handler) fetchVideosForChannel(ctx context.Context, channelID string, p
}
func (h *handler) FetchVideos(ctx context.Context) error {
h.log.Log("level", "INFO", "msg", "fetching new videos for all channels")
channels, err := h.db.ListChannels(ctx)
if err != nil {
return err
}
var parsedChannels = make(chan *feedparser.Channel, len(channels))
var errors = make(chan error, len(channels))
var wg sync.WaitGroup
var mu sync.Mutex
for _, channel := range channels {
parsedChannel, err := feedparser.Parse(h.log, channel.ID)
wg.Add(1)
go h.parser.ParseThreadSafe(channel.ID, parsedChannels, errors, &mu, &wg)
}
wg.Wait()
for range channels {
parsedChannel := <-parsedChannels
err = <-errors
if err != nil {
continue
}
h.fetchVideosForChannel(ctx, channel.ID, parsedChannel)
h.addVideosForChannel(ctx, parsedChannel)
}
return nil

View File

@ -28,7 +28,7 @@ func init() {
func setupTestServer(t *testing.T) *http.Server {
l := log.NewNopLogger()
handler := handler.New(l, nil)
handler := handler.New(l, nil, nil)
gin.SetMode(gin.TestMode)
router, err := ytrssil.SetupGinRouter(

View File

@ -0,0 +1,143 @@
// Code generated by moq; DO NOT EDIT.
// github.com/matryer/moq
package parser_mock
import (
"gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/feedparser"
"sync"
)
// Ensure, that ParserMock does implement feedparser.Parser.
// If this is not the case, regenerate this file with moq.
var _ feedparser.Parser = &ParserMock{}
// ParserMock is a mock implementation of feedparser.Parser.
//
// func TestSomethingThatUsesParser(t *testing.T) {
//
// // make and configure a mocked feedparser.Parser
// mockedParser := &ParserMock{
// ParseFunc: func(channelID string) (*feedparser.Channel, error) {
// panic("mock out the Parse method")
// },
// ParseThreadSafeFunc: func(channelID string, channelChan chan *feedparser.Channel, errChan chan error, mu *sync.Mutex, wg *sync.WaitGroup) {
// panic("mock out the ParseThreadSafe method")
// },
// }
//
// // use mockedParser in code that requires feedparser.Parser
// // and then make assertions.
//
// }
type ParserMock struct {
// ParseFunc mocks the Parse method.
ParseFunc func(channelID string) (*feedparser.Channel, error)
// ParseThreadSafeFunc mocks the ParseThreadSafe method.
ParseThreadSafeFunc func(channelID string, channelChan chan *feedparser.Channel, errChan chan error, mu *sync.Mutex, wg *sync.WaitGroup)
// calls tracks calls to the methods.
calls struct {
// Parse holds details about calls to the Parse method.
Parse []struct {
// ChannelID is the channelID argument value.
ChannelID string
}
// ParseThreadSafe holds details about calls to the ParseThreadSafe method.
ParseThreadSafe []struct {
// ChannelID is the channelID argument value.
ChannelID string
// ChannelChan is the channelChan argument value.
ChannelChan chan *feedparser.Channel
// ErrChan is the errChan argument value.
ErrChan chan error
// Mu is the mu argument value.
Mu *sync.Mutex
// Wg is the wg argument value.
Wg *sync.WaitGroup
}
}
lockParse sync.RWMutex
lockParseThreadSafe sync.RWMutex
}
// Parse calls ParseFunc.
func (mock *ParserMock) Parse(channelID string) (*feedparser.Channel, error) {
if mock.ParseFunc == nil {
panic("ParserMock.ParseFunc: method is nil but Parser.Parse was just called")
}
callInfo := struct {
ChannelID string
}{
ChannelID: channelID,
}
mock.lockParse.Lock()
mock.calls.Parse = append(mock.calls.Parse, callInfo)
mock.lockParse.Unlock()
return mock.ParseFunc(channelID)
}
// ParseCalls gets all the calls that were made to Parse.
// Check the length with:
//
// len(mockedParser.ParseCalls())
func (mock *ParserMock) ParseCalls() []struct {
ChannelID string
} {
var calls []struct {
ChannelID string
}
mock.lockParse.RLock()
calls = mock.calls.Parse
mock.lockParse.RUnlock()
return calls
}
// ParseThreadSafe calls ParseThreadSafeFunc.
func (mock *ParserMock) ParseThreadSafe(channelID string, channelChan chan *feedparser.Channel, errChan chan error, mu *sync.Mutex, wg *sync.WaitGroup) {
if mock.ParseThreadSafeFunc == nil {
panic("ParserMock.ParseThreadSafeFunc: method is nil but Parser.ParseThreadSafe was just called")
}
callInfo := struct {
ChannelID string
ChannelChan chan *feedparser.Channel
ErrChan chan error
Mu *sync.Mutex
Wg *sync.WaitGroup
}{
ChannelID: channelID,
ChannelChan: channelChan,
ErrChan: errChan,
Mu: mu,
Wg: wg,
}
mock.lockParseThreadSafe.Lock()
mock.calls.ParseThreadSafe = append(mock.calls.ParseThreadSafe, callInfo)
mock.lockParseThreadSafe.Unlock()
mock.ParseThreadSafeFunc(channelID, channelChan, errChan, mu, wg)
}
// ParseThreadSafeCalls gets all the calls that were made to ParseThreadSafe.
// Check the length with:
//
// len(mockedParser.ParseThreadSafeCalls())
func (mock *ParserMock) ParseThreadSafeCalls() []struct {
ChannelID string
ChannelChan chan *feedparser.Channel
ErrChan chan error
Mu *sync.Mutex
Wg *sync.WaitGroup
} {
var calls []struct {
ChannelID string
ChannelChan chan *feedparser.Channel
ErrChan chan error
Mu *sync.Mutex
Wg *sync.WaitGroup
}
mock.lockParseThreadSafe.RLock()
calls = mock.calls.ParseThreadSafe
mock.lockParseThreadSafe.RUnlock()
return calls
}