diff --git a/concurrent/download.go b/concurrent/download.go index 22701dc..1039145 100644 --- a/concurrent/download.go +++ b/concurrent/download.go @@ -5,8 +5,6 @@ import ( "io/ioutil" "path" "strconv" - "sync" - "time" "github.com/dustin/go-humanize" "github.com/logrusorgru/aurora" @@ -18,45 +16,67 @@ import ( // been processed. It returns the number of successes, failures, and the total // amount of posts. func BeginDownload(posts *[]e621.Post, saveDirectory *string, maxConcurrents *int) (*int, *int, *int) { - var wg sync.WaitGroup + // Channel for worker goroutines to notify the main goroutine that it is done + // downloading a post + done := make(chan interface{}) + + // Channel for main goroutine to give workers a post when they are done downloading + // one + pc := make(chan *e621.Post) + var completed int var successes int var failures int total := len(*posts) + current := 0 - // Distribute the posts based on the number of workers - ppw := len(*posts) / *maxConcurrents // ppw: posts per worker - mod := len(*posts) % *maxConcurrents // mod: remainder of posts - - for i := 0; i < *maxConcurrents; i++ { - postsLower := i * ppw - postsUpper := i*ppw + ppw - - if i == *maxConcurrents-1 { - // Give the last worker the remaining posts - // TODO: compensate it for labor - postsUpper += mod - } - - wg.Add(1) - go work(i+1, (*posts)[postsLower:postsUpper], *saveDirectory, &completed, &successes, &failures, &total, &wg) - // Spawn workers with a little bit of a delay so as to not DDOS e621 - // but also make the initial numbers show up correctly - time.Sleep(50 * time.Millisecond) + // If we have more workers than posts, then we don't need all of them + if *maxConcurrents > total { + *maxConcurrents = total } - wg.Wait() + for i := 0; i < *maxConcurrents; i++ { + // Create our workers + go work(i+1, *saveDirectory, &completed, &total, &successes, &failures, done, pc) + + // Give them their initial posts + pc <- &(*posts)[current] + current++ + } + + for { + // Wait for a worker to be done + <-done + + // If we finished downloading all posts, break out of the loop + if successes+failures == total { + break + } + + // If there's no more posts to give, stop the worker + if current >= total-1 { + pc <- nil + continue + } + + // Give the worker the next post in the array + current++ + pc <- &(*posts)[current] + } return &successes, &failures, &total } -func work(wn int, posts []e621.Post, directory string, completed *int, successes *int, failures *int, total *int, wg *sync.WaitGroup) { - defer wg.Done() - - for _, post := range posts { +func work(wn int, directory string, completed *int, total *int, successes *int, failures *int, done chan interface{}, pc chan *e621.Post) { + for { *completed++ + post := <-pc + if post == nil { + return + } + progress := aurora.Sprintf(aurora.Green("[%d/%d]"), *completed, *total) workerText := aurora.Sprintf(aurora.Cyan("[w%d]"), wn) @@ -66,16 +86,18 @@ func work(wn int, posts []e621.Post, directory string, completed *int, successes workerText, post.ID, humanize.Bytes(uint64(post.FileSize)), - getSavePath(&post, &directory), + getSavePath(post, &directory), )) - err := downloadPost(&post, directory) + err := downloadPost(post, directory) if err != nil { fmt.Printf("[w%d] Failed to download post %d: %v\n", wn, post.ID, err) *failures++ } else { *successes++ } + + done <- nil } }