Golang worker pool execute tasks in specific order

I have records in my table, the table has column called flow which takes values in range from 1 to 10. Every 3rd row has flow 3, every 5th row has flow 5 and so on for other rows and flows:

+------+--------+
|  id  |  flow  |
+------+--------+
|   1  |    1   |
|   2  |    2   |
|   3  |    3   |
|  ... |   ...  |
|  10  |    10  |
|  ... |   ...  |
|  53  |    3   |
|  54  |    4   |
|  ... |   ...  |
|  77  |    7   |

I select 10.000 rows from this table and want to process each row in worker pool (the size of WP is 10 same as flow values). I want each worker to process rows that have matching flow value. So worker #1 processes rows with flow #1 (ids 1, 11, 21, 31, etc.), worker #2 processes rows with flow #2 (ids 2, 12, 22, etc.) and so on up to worker #10 processing every 10th row. I tried conditionally processing rows and it works partially: each worker handles corresponding row, but only small batch of 10k rows processed and then program exits successfully.


type r struct {
    ID   int `json:"id"`
    Flow int `json:"flow"`
}

func main() {
    data := repo.getData()
    wg := new(sync.WaitGroup)
    jobs := make(chan *r)
    wg.Add(10)
    for i := 1; i <= 10; i++ {
        go worker(wg, i, jobs)
    }
    for i := range rs {
        jobs <- rs[i]
    }
    close(jobs)
    wg.Wait()
}

func worker(wg *sync.WaitGroup, worker int, jobs chan *r) {
    defer wg.Done()
    for job := range jobs {
        // matching worker with row's flow
        if worker == job.Flow {
            exe(worker, job)
        }
    }
}

func exe(worker int, job *r) {
    // some work is done here
    fmt.Printf("worker #%-3d | task #%-3d | worker id == task flow: %t\n", worker, job.ID, worker == job.Flow)
}

Output

worker #10  | task #10  | worker id == task flow: true
worker #5   | task #35  | worker id == task flow: true
worker #1   | task #21  | worker id == task flow: true
worker #2   | task #22  | worker id == task flow: true
worker #9   | task #59  | worker id == task flow: true
worker #7   | task #27  | worker id == task flow: true
worker #7   | task #87  | worker id == task flow: true
worker #7   | task #97  | worker id == task flow: true
worker #6   | task #46  | worker id == task flow: true
worker #1   | task #51  | worker id == task flow: true
worker #2   | task #62  | worker id == task flow: true
worker #3   | task #23  | worker id == task flow: true
worker #9   | task #69  | worker id == task flow: true
worker #8   | task #78  | worker id == task flow: true
worker #5   | task #85  | worker id == task flow: true
worker #10  | task #40  | worker id == task flow: true
worker #4   | task #14  | worker id == task flow: true

Program exited.

Is there a way to pass every nth worker a row with n flow so that all rows being processed in worker pool?

  • If I understand correctly, see playground.

    – 




Leave a Comment