I have a large table in a MySQL database that I'm trying to read from as efficiently as I can. I was thinking about speeding the code up by adding multiple workers however when I do that I get marshaling errors at the start of running it (and only at the start) It looks something like this:
{"caller":"mysql.go:repository.(*MySQLRepo).GetNextBatch#428","error":"DBGetRecordException: could not marshal episode comments: sql: Scan error on column index 4, name "created_at": unsupported Scan, storing driver.Value type []uint8 into type *time.Time","level":"error","ts":"2020-07-13T20:42:03.9621 658Z"}
I don't get this is error if I remove the worker code from ImportLegacyComments and just loop over it normally. Is sqlx.next and sqlx.StructScan safe to multithread and if not is there an alternative way to do this safely?
import (
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
type BatchResult struct {
rows *sqlx.Rows
}
func (m *MySQLRepo) GetNextBatch(b *BatchResult) ([]model.EpisodeComment, error) {
var episodeComments []model.EpisodeComment
for i := 0; i < 1000 && b.rows.Next(); i++ {
var episodeComment model.EpisodeComment
err := b.rows.StructScan(&episodeComment)
if err != nil {
return nil, err
}
episodeComments = append(episodeComments, episodeComment)
}
return episodeComments, nil
}
func (m *MySQLRepo) FetchAllEpisodeComments() (*BatchResult, error) {
rows, err := m.db.Queryx("SELECT * FROM episode_comment")
if err != nil {
return nil, err
}
return &BatchResult{
rows: rows,
}, nil
}
func (svc *ImportService) ImportLegacyComments(ctx context.Context) error {
batchResult, err := svc.legacyCommentsRepo.FetchAllEpisodeComments()
var wg sync.WaitGroup
processor := func() {
comments, err := svc.legacyCommentsRepo.GetNextBatch(batchResult)
if err != nil {
svc.logger.Error(err)
}
for len(comments) > 0 {
comments, err = svc.legacyCommentsRepo.GetNextBatch(batchResult)
if err != nil {
svc.logger.Error(err)
}
svc.logger.Info("batch", "completed 1000")
}
wg.Done()
}
for i := 0; i < 20; i++ {
go processor()
wg.Add(1)
}
wg.Wait()
return err
}