This commit is contained in:
TJ Horner 2019-03-29 09:17:46 -07:00
parent 52fbd5d6bd
commit 52c8d838a5
1 changed files with 51 additions and 29 deletions

View File

@ -5,8 +5,6 @@ import (
"io/ioutil" "io/ioutil"
"path" "path"
"strconv" "strconv"
"sync"
"time"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"github.com/logrusorgru/aurora" "github.com/logrusorgru/aurora"
@ -18,45 +16,67 @@ import (
// been processed. It returns the number of successes, failures, and the total // been processed. It returns the number of successes, failures, and the total
// amount of posts. // amount of posts.
func BeginDownload(posts *[]e621.Post, saveDirectory *string, maxConcurrents *int) (*int, *int, *int) { func BeginDownload(posts *[]e621.Post, saveDirectory *string, maxConcurrents *int) (*int, *int, *int) {
var wg sync.WaitGroup // Channel for worker goroutines to notify the main goroutine that it is done
// downloading a post
done := make(chan interface{})
// Channel for main goroutine to give workers a post when they are done downloading
// one
pc := make(chan *e621.Post)
var completed int var completed int
var successes int var successes int
var failures int var failures int
total := len(*posts) total := len(*posts)
current := 0
// Distribute the posts based on the number of workers // If we have more workers than posts, then we don't need all of them
ppw := len(*posts) / *maxConcurrents // ppw: posts per worker if *maxConcurrents > total {
mod := len(*posts) % *maxConcurrents // mod: remainder of posts *maxConcurrents = total
for i := 0; i < *maxConcurrents; i++ {
postsLower := i * ppw
postsUpper := i*ppw + ppw
if i == *maxConcurrents-1 {
// Give the last worker the remaining posts
// TODO: compensate it for labor
postsUpper += mod
}
wg.Add(1)
go work(i+1, (*posts)[postsLower:postsUpper], *saveDirectory, &completed, &successes, &failures, &total, &wg)
// Spawn workers with a little bit of a delay so as to not DDOS e621
// but also make the initial numbers show up correctly
time.Sleep(50 * time.Millisecond)
} }
wg.Wait() for i := 0; i < *maxConcurrents; i++ {
// Create our workers
go work(i+1, *saveDirectory, &completed, &total, &successes, &failures, done, pc)
// Give them their initial posts
pc <- &(*posts)[current]
current++
}
for {
// Wait for a worker to be done
<-done
// If we finished downloading all posts, break out of the loop
if successes+failures == total {
break
}
// If there's no more posts to give, stop the worker
if current >= total-1 {
pc <- nil
continue
}
// Give the worker the next post in the array
current++
pc <- &(*posts)[current]
}
return &successes, &failures, &total return &successes, &failures, &total
} }
func work(wn int, posts []e621.Post, directory string, completed *int, successes *int, failures *int, total *int, wg *sync.WaitGroup) { func work(wn int, directory string, completed *int, total *int, successes *int, failures *int, done chan interface{}, pc chan *e621.Post) {
defer wg.Done() for {
for _, post := range posts {
*completed++ *completed++
post := <-pc
if post == nil {
return
}
progress := aurora.Sprintf(aurora.Green("[%d/%d]"), *completed, *total) progress := aurora.Sprintf(aurora.Green("[%d/%d]"), *completed, *total)
workerText := aurora.Sprintf(aurora.Cyan("[w%d]"), wn) workerText := aurora.Sprintf(aurora.Cyan("[w%d]"), wn)
@ -66,16 +86,18 @@ func work(wn int, posts []e621.Post, directory string, completed *int, successes
workerText, workerText,
post.ID, post.ID,
humanize.Bytes(uint64(post.FileSize)), humanize.Bytes(uint64(post.FileSize)),
getSavePath(&post, &directory), getSavePath(post, &directory),
)) ))
err := downloadPost(&post, directory) err := downloadPost(post, directory)
if err != nil { if err != nil {
fmt.Printf("[w%d] Failed to download post %d: %v\n", wn, post.ID, err) fmt.Printf("[w%d] Failed to download post %d: %v\n", wn, post.ID, err)
*failures++ *failures++
} else { } else {
*successes++ *successes++
} }
done <- nil
} }
} }