1package store
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "runtime/debug"
9
10 "github.com/mjl-/bstore"
11
12 "github.com/mjl-/mox/message"
13 "github.com/mjl-/mox/metrics"
14 "github.com/mjl-/mox/mlog"
15)
16
17// We process messages in database transactions in batches. Otherwise, for accounts
18// with many messages, we would get slowdown with many unwritten blocks in memory.
19var reparseMessageBatchSize = 1000
20
21// ReparseMessages reparses all messages, updating the MIME structure in
22// Message.ParsedBuf.
23//
24// Typically called during automatic account upgrade, or manually.
25//
26// Returns total number of messages, all of which were reparsed.
27func (a *Account) ReparseMessages(ctx context.Context, log mlog.Log) (int, error) {
28 type Result struct {
29 Message *Message
30 Buf []byte
31 Err error
32 }
33
34 // We'll have multiple goroutines that pick up messages to parse. The assumption is
35 // that reads of messages from disk are the bottleneck.
36 nprog := 10
37 work := make(chan *Message, nprog)
38 results := make(chan Result, nprog)
39
40 processMessage := func(m *Message) {
41 r := Result{Message: m}
42
43 defer func() {
44 x := recover()
45 if x != nil {
46 r.Err = fmt.Errorf("unhandled panic parsing message: %v", x)
47 log.Error("processMessage panic", slog.Any("err", x))
48 debug.PrintStack()
49 metrics.PanicInc(metrics.Store)
50 }
51
52 results <- r
53 }()
54
55 mr := a.MessageReader(*m)
56 p, err := message.EnsurePart(log.Logger, false, mr, m.Size)
57 if err != nil {
58 // note: p is still set to a usable part
59 log.Debugx("reparsing message", err, slog.Int64("msgid", m.ID))
60 }
61 r.Buf, r.Err = json.Marshal(p)
62 }
63
64 // Start goroutines that parse messages.
65 for range nprog {
66 go func() {
67 for {
68 m, ok := <-work
69 if !ok {
70 return
71 }
72
73 processMessage(m)
74 }
75 }()
76 }
77 defer close(work) // Stop goroutines when done.
78
79 total := 0
80 var lastID int64 // Each db transaction starts after lastID.
81 for {
82 var n int
83 err := a.DB.Write(ctx, func(tx *bstore.Tx) error {
84 var busy int
85
86 q := bstore.QueryTx[Message](tx)
87 q.FilterEqual("Expunged", false)
88 q.FilterGreater("ID", lastID)
89 q.Limit(reparseMessageBatchSize)
90 q.SortAsc("ID")
91 err := q.ForEach(func(m Message) error {
92 lastID = m.ID
93 n++
94
95 for {
96 select {
97 case work <- &m:
98 busy++
99 return nil
100
101 case r := <-results:
102 busy--
103 if r.Err != nil {
104 log.Errorx("marshal parsed form of message", r.Err, slog.Int64("msgid", r.Message.ID))
105 } else {
106 r.Message.ParsedBuf = r.Buf
107 if err := tx.Update(r.Message); err != nil {
108 return fmt.Errorf("update message: %w", err)
109 }
110 }
111 }
112 }
113 })
114 if err != nil {
115 return fmt.Errorf("reparsing messages: %w", err)
116 }
117
118 // Drain remaining reparses.
119 for ; busy > 0; busy-- {
120 r := <-results
121 if r.Err != nil {
122 log.Errorx("marshal parsed form of message", r.Err, slog.Int64("msgid", r.Message.ID))
123 } else {
124 r.Message.ParsedBuf = r.Buf
125 if err := tx.Update(r.Message); err != nil {
126 return fmt.Errorf("update message with id %d: %w", r.Message.ID, err)
127 }
128 }
129 }
130
131 return nil
132 })
133 total += n
134 if err != nil {
135 return total, fmt.Errorf("update messages with parsed mime structure: %w", err)
136 }
137 log.Debug("reparse message progress", slog.Int("total", total))
138 if n < reparseMessageBatchSize {
139 break
140 }
141 }
142
143 return total, nil
144}
145