10 "github.com/mjl-/bstore"
12 "github.com/mjl-/mox/message"
13 "github.com/mjl-/mox/metrics"
14 "github.com/mjl-/mox/mlog"
17// We process messages in database transactions in batches. Otherwise, for accounts
18// with many messages, we would get slowdown with many unwritten blocks in memory.
19var reparseMessageBatchSize = 1000
21// ReparseMessages reparses all messages, updating the MIME structure in
24// Typically called during automatic account upgrade, or manually.
26// Returns total number of messages, all of which were reparsed.
27func (a *Account) ReparseMessages(ctx context.Context, log mlog.Log) (int, error) {
34 // We'll have multiple goroutines that pick up messages to parse. The assumption is
35 // that reads of messages from disk are the bottleneck.
37 work := make(chan *Message, nprog)
38 results := make(chan Result, nprog)
40 processMessage := func(m *Message) {
41 r := Result{Message: m}
46 r.Err = fmt.Errorf("unhandled panic parsing message: %v", x)
47 log.Error("processMessage panic", slog.Any("err", x))
49 metrics.PanicInc(metrics.Store)
55 mr := a.MessageReader(*m)
56 p, err := message.EnsurePart(log.Logger, false, mr, m.Size)
58 // note: p is still set to a usable part
59 log.Debugx("reparsing message", err, slog.Int64("msgid", m.ID))
61 r.Buf, r.Err = json.Marshal(p)
64 // Start goroutines that parse messages.
77 defer close(work) // Stop goroutines when done.
80 var lastID int64 // Each db transaction starts after lastID.
83 err := a.DB.Write(ctx, func(tx *bstore.Tx) error {
86 q := bstore.QueryTx[Message](tx)
87 q.FilterEqual("Expunged", false)
88 q.FilterGreater("ID", lastID)
89 q.Limit(reparseMessageBatchSize)
91 err := q.ForEach(func(m Message) error {
104 log.Errorx("marshal parsed form of message", r.Err, slog.Int64("msgid", r.Message.ID))
106 r.Message.ParsedBuf = r.Buf
107 if err := tx.Update(r.Message); err != nil {
108 return fmt.Errorf("update message: %w", err)
115 return fmt.Errorf("reparsing messages: %w", err)
118 // Drain remaining reparses.
119 for ; busy > 0; busy-- {
122 log.Errorx("marshal parsed form of message", r.Err, slog.Int64("msgid", r.Message.ID))
124 r.Message.ParsedBuf = r.Buf
125 if err := tx.Update(r.Message); err != nil {
126 return fmt.Errorf("update message with id %d: %w", r.Message.ID, err)
135 return total, fmt.Errorf("update messages with parsed mime structure: %w", err)
137 log.Debug("reparse message progress", slog.Int("total", total))
138 if n < reparseMessageBatchSize {