From 77031ef6024d541cd8ba966084d103a9e4b4be57 Mon Sep 17 00:00:00 2001 From: Pavle Portic Date: Mon, 31 Oct 2022 02:39:52 +0100 Subject: [PATCH] Add background fetch routine and handle fetch operations in parallel --- Makefile | 1 + cmd/main.go | 19 +++- cmd/main_test.go | 5 +- docker-compose.yml | 2 +- feedparser/feedparser.go | 48 ++++++--- feedparser/models.go | 15 +++ handler/channels.go | 3 +- handler/handler.go | 14 ++- handler/handler_test.go | 3 +- handler/videos.go | 30 ++++-- httpserver/ytrssil/api_setup_test.go | 2 +- mocks/feedparser/feedparser.go | 143 +++++++++++++++++++++++++++ 12 files changed, 247 insertions(+), 38 deletions(-) create mode 100644 feedparser/models.go create mode 100644 mocks/feedparser/feedparser.go diff --git a/Makefile b/Makefile index 874daa5..30d04d5 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,7 @@ bin/moq: gen-mocks: bin/moq ./bin/moq -pkg db_mock -out ./mocks/db/db.go ./db DB + ./bin/moq -pkg parser_mock -out ./mocks/feedparser/feedparser.go ./feedparser Parser go fmt ./... bin/golangci-lint: diff --git a/cmd/main.go b/cmd/main.go index 795318b..37e62ab 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,6 +13,7 @@ import ( "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/config" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/db" + "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/feedparser" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/handler" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/httpserver/auth" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/httpserver/ytrssil" @@ -24,6 +25,16 @@ func init() { time.Local = time.UTC } +func fetcherRoutine(l log.Logger, h handler.Handler) { + for { + err := h.FetchVideos(context.Background()) + if err != nil { + l.Log("level", "ERROR", "function", "main.fetcherRoutine", "call", "handler.FetchVideos", "err", err) + } + time.Sleep(5 * time.Minute) + } +} + func main() { log := log.NewLogger() @@ -32,14 +43,13 @@ func main() { log.Log("level", "FATAL", "call", "config.Parse", "error", err) return } - db, err := db.NewPostgresDB(log, config.DB) if err != nil { log.Log("level", "FATAL", "call", "db.NewPostgresDB", "error", err) return } - - handler := handler.New(log, db) + parser := feedparser.NewParser(log) + handler := handler.New(log, db, parser) gin.SetMode(gin.ReleaseMode) router, err := ytrssil.SetupGinRouter( log, @@ -79,6 +89,9 @@ func main() { } }() + // start periodic fetch videos routine + go fetcherRoutine(log, handler) + log.Log( "level", "INFO", "msg", "ytrssil API is starting up", diff --git a/cmd/main_test.go b/cmd/main_test.go index e42d985..9493c42 100644 --- a/cmd/main_test.go +++ b/cmd/main_test.go @@ -14,6 +14,7 @@ import ( "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/config" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/db" + "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/feedparser" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/handler" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/httpserver/auth" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/httpserver/ytrssil" @@ -34,8 +35,8 @@ func setupTestServer(t *testing.T, authEnabled bool) (*http.Server, db.DB) { if !assert.NoError(t, err) { return nil, nil } - - handler := handler.New(l, db) + parser := feedparser.NewParser(l) + handler := handler.New(l, db, parser) gin.SetMode(gin.TestMode) router, err := ytrssil.SetupGinRouter( l, diff --git a/docker-compose.yml b/docker-compose.yml index 4e8863d..2f58916 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,7 +11,7 @@ services: volumes: - postgres-data:/var/lib/postgresql/data healthcheck: - test: [ "CMD-SHELL", "pg_isready -d $${POSTGRES_DB} -U $${POSTGRES_USER}" ] + test: ["CMD-SHELL", "pg_isready -d $${POSTGRES_DB} -U $${POSTGRES_USER}"] interval: 5s timeout: 2s retries: 5 diff --git a/feedparser/feedparser.go b/feedparser/feedparser.go index afebb54..03b913e 100644 --- a/feedparser/feedparser.go +++ b/feedparser/feedparser.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "sync" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/lib/log" "github.com/paulrosania/go-charset/charset" @@ -18,29 +19,31 @@ var ( var urlFormat = "https://www.youtube.com/feeds/videos.xml?channel_id=%s" -// Video struct for each video in the feed -type Video struct { - ID string `xml:"id"` - Title string `xml:"title"` - Published Date `xml:"published"` +type Parser interface { + Parse(channelID string) (*Channel, error) + ParseThreadSafe(channelID string, channelChan chan *Channel, errChan chan error, mu *sync.Mutex, wg *sync.WaitGroup) } -// Channel struct for RSS -type Channel struct { - Name string `xml:"title"` - Videos []Video `xml:"entry"` +type parser struct { + log log.Logger } -func read(l log.Logger, url string) (io.ReadCloser, error) { +func NewParser(l log.Logger) *parser { + return &parser{ + log: l, + } +} + +func (p *parser) fetch(url string) (io.ReadCloser, error) { req, err := http.NewRequest("GET", url, nil) if err != nil { - l.Log("level", "ERROR", "function", "feedparser.read", "call", "http.NewRequest", "error", err) + p.log.Log("level", "ERROR", "function", "feedparser.fetch", "call", "http.NewRequest", "error", err) return nil, err } response, err := http.DefaultClient.Do(req) if err != nil { - l.Log("level", "ERROR", "function", "feedparser.read", "call", "http.Do", "error", err) + p.log.Log("level", "ERROR", "function", "feedparser.fetch", "call", "http.Do", "error", err) return nil, err } @@ -54,9 +57,9 @@ func read(l log.Logger, url string) (io.ReadCloser, error) { } // Parse parses a YouTube channel XML feed from a channel ID -func Parse(l log.Logger, channelID string) (*Channel, error) { +func (p *parser) Parse(channelID string) (*Channel, error) { url := fmt.Sprintf(urlFormat, channelID) - reader, err := read(l, url) + reader, err := p.fetch(url) if err != nil { return nil, err } @@ -67,8 +70,23 @@ func Parse(l log.Logger, channelID string) (*Channel, error) { var channel Channel if err := xmlDecoder.Decode(&channel); err != nil { - l.Log("level", "ERROR", "function", "feedparser.read", "call", "xml.Decode", "error", err) + p.log.Log("level", "ERROR", "function", "feedparser.Parse", "call", "xml.Decode", "error", err) return nil, fmt.Errorf("%w: %s", ErrParseFailed, err.Error()) } + channel.ID = channelID return &channel, nil } + +// ParseThreadSafe calls Parse, but additionally accepts an out parameter to store the result, +// as well as a mutex and wait group to run multiple fetches in parallel +func (p *parser) ParseThreadSafe( + channelID string, channelChan chan *Channel, errChan chan error, mu *sync.Mutex, wg *sync.WaitGroup, +) { + channel, err := p.Parse(channelID) + + mu.Lock() + channelChan <- channel + errChan <- err + mu.Unlock() + wg.Done() +} diff --git a/feedparser/models.go b/feedparser/models.go new file mode 100644 index 0000000..54a1c45 --- /dev/null +++ b/feedparser/models.go @@ -0,0 +1,15 @@ +package feedparser + +// Video struct for each video in the feed +type Video struct { + ID string `xml:"id"` + Title string `xml:"title"` + Published Date `xml:"published"` +} + +// Channel struct for RSS +type Channel struct { + ID string + Name string `xml:"title"` + Videos []Video `xml:"entry"` +} diff --git a/handler/channels.go b/handler/channels.go index 0b2b787..afe5928 100644 --- a/handler/channels.go +++ b/handler/channels.go @@ -5,12 +5,11 @@ import ( "errors" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/db" - "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/feedparser" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/models" ) func (h *handler) SubscribeToChannel(ctx context.Context, username string, channelID string) error { - parsedChannel, err := feedparser.Parse(h.log, channelID) + parsedChannel, err := h.parser.Parse(channelID) if err != nil { return err } diff --git a/handler/handler.go b/handler/handler.go index 8c95bf7..7a0a041 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -4,6 +4,7 @@ import ( "context" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/db" + "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/feedparser" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/lib/log" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/models" ) @@ -20,10 +21,15 @@ type Handler interface { } type handler struct { - log log.Logger - db db.DB + log log.Logger + db db.DB + parser feedparser.Parser } -func New(log log.Logger, db db.DB) *handler { - return &handler{log: log, db: db} +func New(log log.Logger, db db.DB, parser feedparser.Parser) *handler { + return &handler{ + log: log, + db: db, + parser: parser, + } } diff --git a/handler/handler_test.go b/handler/handler_test.go index c9bd147..53ef361 100644 --- a/handler/handler_test.go +++ b/handler/handler_test.go @@ -10,6 +10,7 @@ import ( "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/config" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/lib/log" db_mock "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/mocks/db" + parser_mock "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/mocks/feedparser" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/models" ) @@ -35,7 +36,7 @@ func TestGetNewVideos(t *testing.T) { }, }, nil }, - }) + }, &parser_mock.ParserMock{}) // Act resp, err := handler.GetNewVideos(context.TODO(), "username") diff --git a/handler/videos.go b/handler/videos.go index 142e597..6edde23 100644 --- a/handler/videos.go +++ b/handler/videos.go @@ -4,6 +4,7 @@ import ( "context" "errors" "strings" + "sync" "time" "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/db" @@ -37,7 +38,7 @@ func (h *handler) addVideoToAllSubscribers(ctx context.Context, channelID string return nil } -func (h *handler) fetchVideosForChannel(ctx context.Context, channelID string, parsedChannel *feedparser.Channel) { +func (h *handler) addVideosForChannel(ctx context.Context, parsedChannel *feedparser.Channel) { for _, parsedVideo := range parsedChannel.Videos { date, err := parsedVideo.Published.Parse() if err != nil { @@ -45,20 +46,20 @@ func (h *handler) fetchVideosForChannel(ctx context.Context, channelID string, p continue } - id := strings.Split(parsedVideo.ID, ":")[2] + videoID := strings.Split(parsedVideo.ID, ":")[2] video := models.Video{ - ID: id, + ID: videoID, Title: parsedVideo.Title, PublishedTime: date, } - err = h.db.AddVideo(ctx, video, channelID) + err = h.db.AddVideo(ctx, video, parsedChannel.ID) if err != nil { if !errors.Is(err, db.ErrVideoExists) { h.log.Log("level", "WARNING", "call", "db.AddVideo", "err", err) } continue } - err = h.addVideoToAllSubscribers(ctx, channelID, id) + err = h.addVideoToAllSubscribers(ctx, parsedChannel.ID, videoID) if err != nil { continue } @@ -66,18 +67,29 @@ func (h *handler) fetchVideosForChannel(ctx context.Context, channelID string, p } func (h *handler) FetchVideos(ctx context.Context) error { + h.log.Log("level", "INFO", "msg", "fetching new videos for all channels") + channels, err := h.db.ListChannels(ctx) if err != nil { return err } - + var parsedChannels = make(chan *feedparser.Channel, len(channels)) + var errors = make(chan error, len(channels)) + var wg sync.WaitGroup + var mu sync.Mutex for _, channel := range channels { - parsedChannel, err := feedparser.Parse(h.log, channel.ID) + wg.Add(1) + go h.parser.ParseThreadSafe(channel.ID, parsedChannels, errors, &mu, &wg) + } + wg.Wait() + + for range channels { + parsedChannel := <-parsedChannels + err = <-errors if err != nil { continue } - - h.fetchVideosForChannel(ctx, channel.ID, parsedChannel) + h.addVideosForChannel(ctx, parsedChannel) } return nil diff --git a/httpserver/ytrssil/api_setup_test.go b/httpserver/ytrssil/api_setup_test.go index 7284a41..925a7e5 100644 --- a/httpserver/ytrssil/api_setup_test.go +++ b/httpserver/ytrssil/api_setup_test.go @@ -28,7 +28,7 @@ func init() { func setupTestServer(t *testing.T) *http.Server { l := log.NewNopLogger() - handler := handler.New(l, nil) + handler := handler.New(l, nil, nil) gin.SetMode(gin.TestMode) router, err := ytrssil.SetupGinRouter( diff --git a/mocks/feedparser/feedparser.go b/mocks/feedparser/feedparser.go new file mode 100644 index 0000000..64502bf --- /dev/null +++ b/mocks/feedparser/feedparser.go @@ -0,0 +1,143 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package parser_mock + +import ( + "gitea.theedgeofrage.com/TheEdgeOfRage/ytrssil-api/feedparser" + "sync" +) + +// Ensure, that ParserMock does implement feedparser.Parser. +// If this is not the case, regenerate this file with moq. +var _ feedparser.Parser = &ParserMock{} + +// ParserMock is a mock implementation of feedparser.Parser. +// +// func TestSomethingThatUsesParser(t *testing.T) { +// +// // make and configure a mocked feedparser.Parser +// mockedParser := &ParserMock{ +// ParseFunc: func(channelID string) (*feedparser.Channel, error) { +// panic("mock out the Parse method") +// }, +// ParseThreadSafeFunc: func(channelID string, channelChan chan *feedparser.Channel, errChan chan error, mu *sync.Mutex, wg *sync.WaitGroup) { +// panic("mock out the ParseThreadSafe method") +// }, +// } +// +// // use mockedParser in code that requires feedparser.Parser +// // and then make assertions. +// +// } +type ParserMock struct { + // ParseFunc mocks the Parse method. + ParseFunc func(channelID string) (*feedparser.Channel, error) + + // ParseThreadSafeFunc mocks the ParseThreadSafe method. + ParseThreadSafeFunc func(channelID string, channelChan chan *feedparser.Channel, errChan chan error, mu *sync.Mutex, wg *sync.WaitGroup) + + // calls tracks calls to the methods. + calls struct { + // Parse holds details about calls to the Parse method. + Parse []struct { + // ChannelID is the channelID argument value. + ChannelID string + } + // ParseThreadSafe holds details about calls to the ParseThreadSafe method. + ParseThreadSafe []struct { + // ChannelID is the channelID argument value. + ChannelID string + // ChannelChan is the channelChan argument value. + ChannelChan chan *feedparser.Channel + // ErrChan is the errChan argument value. + ErrChan chan error + // Mu is the mu argument value. + Mu *sync.Mutex + // Wg is the wg argument value. + Wg *sync.WaitGroup + } + } + lockParse sync.RWMutex + lockParseThreadSafe sync.RWMutex +} + +// Parse calls ParseFunc. +func (mock *ParserMock) Parse(channelID string) (*feedparser.Channel, error) { + if mock.ParseFunc == nil { + panic("ParserMock.ParseFunc: method is nil but Parser.Parse was just called") + } + callInfo := struct { + ChannelID string + }{ + ChannelID: channelID, + } + mock.lockParse.Lock() + mock.calls.Parse = append(mock.calls.Parse, callInfo) + mock.lockParse.Unlock() + return mock.ParseFunc(channelID) +} + +// ParseCalls gets all the calls that were made to Parse. +// Check the length with: +// +// len(mockedParser.ParseCalls()) +func (mock *ParserMock) ParseCalls() []struct { + ChannelID string +} { + var calls []struct { + ChannelID string + } + mock.lockParse.RLock() + calls = mock.calls.Parse + mock.lockParse.RUnlock() + return calls +} + +// ParseThreadSafe calls ParseThreadSafeFunc. +func (mock *ParserMock) ParseThreadSafe(channelID string, channelChan chan *feedparser.Channel, errChan chan error, mu *sync.Mutex, wg *sync.WaitGroup) { + if mock.ParseThreadSafeFunc == nil { + panic("ParserMock.ParseThreadSafeFunc: method is nil but Parser.ParseThreadSafe was just called") + } + callInfo := struct { + ChannelID string + ChannelChan chan *feedparser.Channel + ErrChan chan error + Mu *sync.Mutex + Wg *sync.WaitGroup + }{ + ChannelID: channelID, + ChannelChan: channelChan, + ErrChan: errChan, + Mu: mu, + Wg: wg, + } + mock.lockParseThreadSafe.Lock() + mock.calls.ParseThreadSafe = append(mock.calls.ParseThreadSafe, callInfo) + mock.lockParseThreadSafe.Unlock() + mock.ParseThreadSafeFunc(channelID, channelChan, errChan, mu, wg) +} + +// ParseThreadSafeCalls gets all the calls that were made to ParseThreadSafe. +// Check the length with: +// +// len(mockedParser.ParseThreadSafeCalls()) +func (mock *ParserMock) ParseThreadSafeCalls() []struct { + ChannelID string + ChannelChan chan *feedparser.Channel + ErrChan chan error + Mu *sync.Mutex + Wg *sync.WaitGroup +} { + var calls []struct { + ChannelID string + ChannelChan chan *feedparser.Channel + ErrChan chan error + Mu *sync.Mutex + Wg *sync.WaitGroup + } + mock.lockParseThreadSafe.RLock() + calls = mock.calls.ParseThreadSafe + mock.lockParseThreadSafe.RUnlock() + return calls +}