I have records in my table, the table has column called flow
which takes values in range from 1 to 10. Every 3rd row has flow 3, every 5th row has flow 5 and so on for other rows and flows:
+------+--------+
| id | flow |
+------+--------+
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
| ... | ... |
| 10 | 10 |
| ... | ... |
| 53 | 3 |
| 54 | 4 |
| ... | ... |
| 77 | 7 |
I select 10.000 rows from this table and want to process each row in worker pool (the size of WP is 10 same as flow values). I want each worker to process rows that have matching flow value. So worker #1 processes rows with flow #1 (ids 1, 11, 21, 31, etc.), worker #2 processes rows with flow #2 (ids 2, 12, 22, etc.) and so on up to worker #10 processing every 10th row. I tried conditionally processing rows and it works partially: each worker handles corresponding row, but only small batch of 10k rows processed and then program exits successfully.
type r struct {
ID int `json:"id"`
Flow int `json:"flow"`
}
func main() {
data := repo.getData()
wg := new(sync.WaitGroup)
jobs := make(chan *r)
wg.Add(10)
for i := 1; i <= 10; i++ {
go worker(wg, i, jobs)
}
for i := range rs {
jobs <- rs[i]
}
close(jobs)
wg.Wait()
}
func worker(wg *sync.WaitGroup, worker int, jobs chan *r) {
defer wg.Done()
for job := range jobs {
// matching worker with row's flow
if worker == job.Flow {
exe(worker, job)
}
}
}
func exe(worker int, job *r) {
// some work is done here
fmt.Printf("worker #%-3d | task #%-3d | worker id == task flow: %t\n", worker, job.ID, worker == job.Flow)
}
Output
worker #10 | task #10 | worker id == task flow: true
worker #5 | task #35 | worker id == task flow: true
worker #1 | task #21 | worker id == task flow: true
worker #2 | task #22 | worker id == task flow: true
worker #9 | task #59 | worker id == task flow: true
worker #7 | task #27 | worker id == task flow: true
worker #7 | task #87 | worker id == task flow: true
worker #7 | task #97 | worker id == task flow: true
worker #6 | task #46 | worker id == task flow: true
worker #1 | task #51 | worker id == task flow: true
worker #2 | task #62 | worker id == task flow: true
worker #3 | task #23 | worker id == task flow: true
worker #9 | task #69 | worker id == task flow: true
worker #8 | task #78 | worker id == task flow: true
worker #5 | task #85 | worker id == task flow: true
worker #10 | task #40 | worker id == task flow: true
worker #4 | task #14 | worker id == task flow: true
Program exited.
Is there a way to pass every nth
worker a row with n
flow so that all rows being processed in worker pool?
If I understand correctly, see playground.