1// Package webops implements shared functionality between webapisrv and webmail.
2package webops
3
4import (
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "os"
11 "path/filepath"
12 "slices"
13 "sort"
14 "time"
15
16 "github.com/mjl-/bstore"
17
18 "github.com/mjl-/mox/junk"
19 "github.com/mjl-/mox/message"
20 "github.com/mjl-/mox/mlog"
21 "github.com/mjl-/mox/moxio"
22 "github.com/mjl-/mox/store"
23)
24
25var ErrMessageNotFound = errors.New("no such message")
26
27type XOps struct {
28 DBWrite func(ctx context.Context, acc *store.Account, fn func(tx *bstore.Tx))
29 Checkf func(ctx context.Context, err error, format string, args ...any)
30 Checkuserf func(ctx context.Context, err error, format string, args ...any)
31}
32
33func (x XOps) mailboxID(ctx context.Context, tx *bstore.Tx, mailboxID int64) store.Mailbox {
34 if mailboxID == 0 {
35 x.Checkuserf(ctx, errors.New("invalid zero mailbox ID"), "getting mailbox")
36 }
37 mb, err := store.MailboxID(tx, mailboxID)
38 if err == bstore.ErrAbsent || err == store.ErrMailboxExpunged {
39 x.Checkuserf(ctx, err, "getting mailbox")
40 }
41 x.Checkf(ctx, err, "getting mailbox")
42 return mb
43}
44
45// messageID returns a non-expunged message or panics with a sherpa error.
46func (x XOps) messageID(ctx context.Context, tx *bstore.Tx, messageID int64) store.Message {
47 if messageID == 0 {
48 x.Checkuserf(ctx, errors.New("invalid zero message id"), "getting message")
49 }
50 m := store.Message{ID: messageID}
51 err := tx.Get(&m)
52 if err == bstore.ErrAbsent {
53 x.Checkuserf(ctx, ErrMessageNotFound, "getting message")
54 } else if err == nil && m.Expunged {
55 x.Checkuserf(ctx, errors.New("message was removed"), "getting message")
56 }
57 x.Checkf(ctx, err, "getting message")
58 return m
59}
60
61func (x XOps) MessageDelete(ctx context.Context, log mlog.Log, acc *store.Account, messageIDs []int64) {
62 acc.WithWLock(func() {
63 var changes []store.Change
64
65 x.DBWrite(ctx, acc, func(tx *bstore.Tx) {
66 var modseq store.ModSeq
67 changes = x.MessageDeleteTx(ctx, log, tx, acc, messageIDs, &modseq)
68 })
69
70 store.BroadcastChanges(acc, changes)
71 })
72}
73
74func (x XOps) MessageDeleteTx(ctx context.Context, log mlog.Log, tx *bstore.Tx, acc *store.Account, messageIDs []int64, modseq *store.ModSeq) []store.Change {
75 changes := make([]store.Change, 0, 1+1) // 1 remove, 1 mailbox counts, optimistic that all messages are in 1 mailbox.
76
77 var jf *junk.Filter
78 defer func() {
79 if jf != nil {
80 err := jf.CloseDiscard()
81 log.Check(err, "close junk filter")
82 }
83 }()
84
85 conf, _ := acc.Conf()
86
87 var mb store.Mailbox
88 var changeRemoveUIDs store.ChangeRemoveUIDs
89 xflushMailbox := func() {
90 err := tx.Update(&mb)
91 x.Checkf(ctx, err, "updating mailbox counts")
92 slices.Sort(changeRemoveUIDs.UIDs)
93 changeRemoveUIDs.UIDNext = mb.UIDNext
94 changeRemoveUIDs.MessageCountIMAP = mb.MessageCountIMAP()
95 changeRemoveUIDs.Unseen = uint32(mb.MailboxCounts.Unseen)
96 changes = append(changes, mb.ChangeCounts(), changeRemoveUIDs)
97 }
98
99 for _, id := range messageIDs {
100 m := x.messageID(ctx, tx, id)
101
102 if *modseq == 0 {
103 var err error
104 *modseq, err = acc.NextModSeq(tx)
105 x.Checkf(ctx, err, "assigning next modseq")
106 }
107
108 if m.MailboxID != mb.ID {
109 if mb.ID != 0 {
110 xflushMailbox()
111 }
112 mb = x.mailboxID(ctx, tx, m.MailboxID)
113 mb.ModSeq = *modseq
114 changeRemoveUIDs = store.ChangeRemoveUIDs{MailboxID: mb.ID, ModSeq: *modseq}
115 }
116
117 if m.Junk != m.Notjunk && jf == nil && conf.JunkFilter != nil {
118 var err error
119 jf, _, err = acc.OpenJunkFilter(ctx, log)
120 x.Checkf(ctx, err, "open junk filter")
121 }
122
123 opts := store.RemoveOpts{JunkFilter: jf}
124 _, _, err := acc.MessageRemove(log, tx, *modseq, &mb, opts, m)
125 x.Checkf(ctx, err, "expunge message")
126
127 changeRemoveUIDs.UIDs = append(changeRemoveUIDs.UIDs, m.UID)
128 changeRemoveUIDs.MsgIDs = append(changeRemoveUIDs.MsgIDs, m.ID)
129 }
130
131 xflushMailbox()
132
133 if jf != nil {
134 err := jf.Close()
135 jf = nil
136 x.Checkf(ctx, err, "close junk filter")
137 }
138
139 return changes
140}
141
142func (x XOps) MessageFlagsAdd(ctx context.Context, log mlog.Log, acc *store.Account, messageIDs []int64, flaglist []string) {
143 flags, keywords, err := store.ParseFlagsKeywords(flaglist)
144 x.Checkuserf(ctx, err, "parsing flags")
145
146 acc.WithRLock(func() {
147 var changes []store.Change
148
149 x.DBWrite(ctx, acc, func(tx *bstore.Tx) {
150 var modseq store.ModSeq
151 var retrain []store.Message
152 var mb, origmb store.Mailbox
153
154 for _, mid := range messageIDs {
155 m := x.messageID(ctx, tx, mid)
156
157 if modseq == 0 {
158 modseq, err = acc.NextModSeq(tx)
159 x.Checkf(ctx, err, "assigning next modseq")
160 }
161
162 if mb.ID != m.MailboxID {
163 if mb.ID != 0 {
164 mb.ModSeq = modseq
165 err := tx.Update(&mb)
166 x.Checkf(ctx, err, "updating mailbox")
167 if mb.MailboxCounts != origmb.MailboxCounts {
168 changes = append(changes, mb.ChangeCounts())
169 }
170 if mb.KeywordsChanged(origmb) {
171 changes = append(changes, mb.ChangeKeywords())
172 }
173 }
174 mb = x.mailboxID(ctx, tx, m.MailboxID)
175 origmb = mb
176 }
177 mb.Keywords, _ = store.MergeKeywords(mb.Keywords, keywords)
178
179 mb.Sub(m.MailboxCounts())
180 oflags := m.Flags
181 m.Flags = m.Flags.Set(flags, flags)
182 var kwChanged bool
183 m.Keywords, kwChanged = store.MergeKeywords(m.Keywords, keywords)
184 mb.Add(m.MailboxCounts())
185
186 if m.Flags == oflags && !kwChanged {
187 continue
188 }
189
190 m.ModSeq = modseq
191 err = tx.Update(&m)
192 x.Checkf(ctx, err, "updating message")
193
194 changes = append(changes, m.ChangeFlags(oflags, mb))
195 retrain = append(retrain, m)
196 }
197
198 if mb.ID != 0 {
199 mb.ModSeq = modseq
200 err := tx.Update(&mb)
201 x.Checkf(ctx, err, "updating mailbox")
202 if mb.MailboxCounts != origmb.MailboxCounts {
203 changes = append(changes, mb.ChangeCounts())
204 }
205 if mb.KeywordsChanged(origmb) {
206 changes = append(changes, mb.ChangeKeywords())
207 }
208 }
209
210 err = acc.RetrainMessages(ctx, log, tx, retrain)
211 x.Checkf(ctx, err, "retraining messages")
212 })
213
214 store.BroadcastChanges(acc, changes)
215 })
216}
217
218func (x XOps) MessageFlagsClear(ctx context.Context, log mlog.Log, acc *store.Account, messageIDs []int64, flaglist []string) {
219 flags, keywords, err := store.ParseFlagsKeywords(flaglist)
220 x.Checkuserf(ctx, err, "parsing flags")
221
222 acc.WithRLock(func() {
223 var retrain []store.Message
224 var changes []store.Change
225
226 x.DBWrite(ctx, acc, func(tx *bstore.Tx) {
227 var modseq store.ModSeq
228 var mb, origmb store.Mailbox
229
230 for _, mid := range messageIDs {
231 m := x.messageID(ctx, tx, mid)
232
233 if modseq == 0 {
234 modseq, err = acc.NextModSeq(tx)
235 x.Checkf(ctx, err, "assigning next modseq")
236 }
237
238 if mb.ID != m.MailboxID {
239 if mb.ID != 0 {
240 mb.ModSeq = modseq
241 err := tx.Update(&mb)
242 x.Checkf(ctx, err, "updating counts for mailbox")
243 if mb.MailboxCounts != origmb.MailboxCounts {
244 changes = append(changes, mb.ChangeCounts())
245 }
246 // note: cannot remove keywords from mailbox by removing keywords from message.
247 }
248 mb = x.mailboxID(ctx, tx, m.MailboxID)
249 origmb = mb
250 }
251
252 oflags := m.Flags
253 mb.Sub(m.MailboxCounts())
254 m.Flags = m.Flags.Set(flags, store.Flags{})
255 var changed bool
256 m.Keywords, changed = store.RemoveKeywords(m.Keywords, keywords)
257 mb.Add(m.MailboxCounts())
258
259 if m.Flags == oflags && !changed {
260 continue
261 }
262
263 m.ModSeq = modseq
264 err = tx.Update(&m)
265 x.Checkf(ctx, err, "updating message")
266
267 changes = append(changes, m.ChangeFlags(oflags, mb))
268 retrain = append(retrain, m)
269 }
270
271 if mb.ID != 0 {
272 mb.ModSeq = modseq
273 err := tx.Update(&mb)
274 x.Checkf(ctx, err, "updating keywords in mailbox")
275 if mb.MailboxCounts != origmb.MailboxCounts {
276 changes = append(changes, mb.ChangeCounts())
277 }
278 // note: cannot remove keywords from mailbox by removing keywords from message.
279 }
280
281 err = acc.RetrainMessages(ctx, log, tx, retrain)
282 x.Checkf(ctx, err, "retraining messages")
283 })
284
285 store.BroadcastChanges(acc, changes)
286 })
287}
288
289// MailboxesMarkRead updates all messages in the referenced mailboxes as seen when
290// they aren't yet. The mailboxes are updated with their unread messages counts,
291// and the changes are propagated.
292func (x XOps) MailboxesMarkRead(ctx context.Context, log mlog.Log, acc *store.Account, mailboxIDs []int64) {
293 acc.WithRLock(func() {
294 var changes []store.Change
295
296 x.DBWrite(ctx, acc, func(tx *bstore.Tx) {
297 var modseq store.ModSeq
298
299 // Note: we don't need to retrain, changing the "seen" flag is not relevant.
300
301 for _, mbID := range mailboxIDs {
302 mb := x.mailboxID(ctx, tx, mbID)
303
304 // Find messages to update.
305 q := bstore.QueryTx[store.Message](tx)
306 q.FilterNonzero(store.Message{MailboxID: mb.ID})
307 q.FilterEqual("Seen", false)
308 q.FilterEqual("Expunged", false)
309 q.SortAsc("UID")
310 var have bool
311 err := q.ForEach(func(m store.Message) error {
312 have = true // We need to update mailbox.
313
314 oflags := m.Flags
315 mb.Sub(m.MailboxCounts())
316 m.Seen = true
317 mb.Add(m.MailboxCounts())
318
319 if modseq == 0 {
320 var err error
321 modseq, err = acc.NextModSeq(tx)
322 x.Checkf(ctx, err, "assigning next modseq")
323 }
324 m.ModSeq = modseq
325 err := tx.Update(&m)
326 x.Checkf(ctx, err, "updating message")
327
328 changes = append(changes, m.ChangeFlags(oflags, mb))
329 return nil
330 })
331 x.Checkf(ctx, err, "listing messages to mark as read")
332
333 if have {
334 mb.ModSeq = modseq
335 err := tx.Update(&mb)
336 x.Checkf(ctx, err, "updating mailbox")
337 changes = append(changes, mb.ChangeCounts())
338 }
339 }
340 })
341
342 store.BroadcastChanges(acc, changes)
343 })
344}
345
346// MessageMove moves messages to the mailbox represented by mailboxName, or to mailboxID if mailboxName is empty.
347func (x XOps) MessageMove(ctx context.Context, log mlog.Log, acc *store.Account, messageIDs []int64, mailboxName string, mailboxID int64) {
348 acc.WithWLock(func() {
349 var changes []store.Change
350
351 var newIDs []int64
352 defer func() {
353 for _, id := range newIDs {
354 p := acc.MessagePath(id)
355 err := os.Remove(p)
356 log.Check(err, "removing delivered message after failure", slog.String("path", p))
357 }
358 }()
359
360 x.DBWrite(ctx, acc, func(tx *bstore.Tx) {
361 if mailboxName != "" {
362 mb, err := acc.MailboxFind(tx, mailboxName)
363 x.Checkf(ctx, err, "looking up mailbox name")
364 if mb == nil {
365 x.Checkuserf(ctx, errors.New("not found"), "looking up mailbox name")
366 } else {
367 mailboxID = mb.ID
368 }
369 }
370
371 mbDst := x.mailboxID(ctx, tx, mailboxID)
372
373 if len(messageIDs) == 0 {
374 return
375 }
376
377 var modseq store.ModSeq
378 newIDs, changes = x.MessageMoveTx(ctx, log, acc, tx, messageIDs, mbDst, &modseq)
379 })
380 newIDs = nil
381
382 store.BroadcastChanges(acc, changes)
383 })
384}
385
386// MessageMoveTx moves message to a new mailbox, which must be different than their
387// current mailbox. Moving a message is done by changing the MailboxID and
388// assigning an appriorate new UID, and then inserting a replacement Message record
389// with new ID that is marked expunged in the original mailbox, along with a
390// MessageErase record so the message gets erased when all sessions stopped
391// referencing the message.
392func (x XOps) MessageMoveTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, messageIDs []int64, mbDst store.Mailbox, modseq *store.ModSeq) ([]int64, []store.Change) {
393 var newIDs []int64
394 var commit bool
395 defer func() {
396 if commit {
397 return
398 }
399 for _, id := range newIDs {
400 p := acc.MessagePath(id)
401 err := os.Remove(p)
402 log.Check(err, "removing delivered message after failure", slog.String("path", p))
403 }
404 newIDs = nil
405 }()
406
407 // n adds, 1 remove, 2 mailboxcounts, 1 mailboxkeywords, optimistic that messages are in a single source mailbox.
408 changes := make([]store.Change, 0, len(messageIDs)+4)
409
410 var err error
411 if *modseq == 0 {
412 *modseq, err = acc.NextModSeq(tx)
413 x.Checkf(ctx, err, "assigning next modseq")
414 }
415
416 mbDst.ModSeq = *modseq
417
418 // Get messages. group them by mailbox.
419 l := make([]store.Message, len(messageIDs))
420 for i, id := range messageIDs {
421 l[i] = x.messageID(ctx, tx, id)
422 if l[i].MailboxID == mbDst.ID {
423 // Client should filter out messages that are already in mailbox.
424 x.Checkuserf(ctx, fmt.Errorf("message %d already in destination mailbox", l[i].ID), "moving message")
425 }
426 }
427
428 // Sort (group) by mailbox, sort by UID.
429 sort.Slice(l, func(i, j int) bool {
430 if l[i].MailboxID != l[j].MailboxID {
431 return l[i].MailboxID < l[j].MailboxID
432 }
433 return l[i].UID < l[j].UID
434 })
435
436 var jf *junk.Filter
437 defer func() {
438 if jf != nil {
439 err := jf.CloseDiscard()
440 log.Check(err, "close junk filter")
441 }
442 }()
443
444 accConf, _ := acc.Conf()
445
446 var mbSrc store.Mailbox
447 var changeRemoveUIDs store.ChangeRemoveUIDs
448 xflushMailbox := func() {
449 changeRemoveUIDs.UIDNext = mbSrc.UIDNext
450 changeRemoveUIDs.MessageCountIMAP = mbSrc.MessageCountIMAP()
451 changeRemoveUIDs.Unseen = uint32(mbSrc.MailboxCounts.Unseen)
452 changes = append(changes, changeRemoveUIDs, mbSrc.ChangeCounts())
453
454 err = tx.Update(&mbSrc)
455 x.Checkf(ctx, err, "updating source mailbox counts")
456 }
457
458 nkeywords := len(mbDst.Keywords)
459 now := time.Now()
460
461 syncDirs := map[string]struct{}{}
462
463 for _, om := range l {
464 if om.MailboxID != mbSrc.ID {
465 if mbSrc.ID != 0 {
466 xflushMailbox()
467 }
468 mbSrc = x.mailboxID(ctx, tx, om.MailboxID)
469 mbSrc.ModSeq = *modseq
470 changeRemoveUIDs = store.ChangeRemoveUIDs{MailboxID: mbSrc.ID, ModSeq: *modseq}
471 }
472
473 nm := om
474 nm.MailboxID = mbDst.ID
475 nm.UID = mbDst.UIDNext
476 err := mbDst.UIDNextAdd(1)
477 x.Checkf(ctx, err, "adding uid")
478 nm.ModSeq = *modseq
479 nm.CreateSeq = *modseq
480 nm.SaveDate = &now
481 if nm.IsReject && nm.MailboxDestinedID != 0 {
482 // Incorrectly delivered to Rejects mailbox. Adjust MailboxOrigID so this message
483 // is used for reputation calculation during future deliveries.
484 nm.MailboxOrigID = nm.MailboxDestinedID
485 nm.IsReject = false
486 nm.Seen = false
487 }
488 if mbDst.Trash {
489 nm.Seen = true
490 }
491
492 nm.JunkFlagsForMailbox(mbDst, accConf)
493
494 err = tx.Update(&nm)
495 x.Checkf(ctx, err, "updating message with new mailbox")
496
497 mbDst.Add(nm.MailboxCounts())
498
499 mbSrc.Sub(om.MailboxCounts())
500 om.ID = 0
501 om.Expunged = true
502 om.ModSeq = *modseq
503 om.TrainedJunk = nil
504 err = tx.Insert(&om)
505 x.Checkf(ctx, err, "inserting expunged message in old mailbox")
506
507 dstPath := acc.MessagePath(om.ID)
508 dstDir := filepath.Dir(dstPath)
509 if _, ok := syncDirs[dstDir]; !ok {
510 os.MkdirAll(dstDir, 0770)
511 syncDirs[dstDir] = struct{}{}
512 }
513
514 err = moxio.LinkOrCopy(log, dstPath, acc.MessagePath(nm.ID), nil, false)
515 x.Checkf(ctx, err, "duplicating message in old mailbox for current sessions")
516 newIDs = append(newIDs, nm.ID)
517 // We don't sync the directory. In case of a crash and files disappearing, the
518 // eraser will simply not find the file at next startup.
519
520 err = tx.Insert(&store.MessageErase{ID: om.ID, SkipUpdateDiskUsage: true})
521 x.Checkf(ctx, err, "insert message erase")
522
523 mbDst.Keywords, _ = store.MergeKeywords(mbDst.Keywords, nm.Keywords)
524
525 if accConf.JunkFilter != nil && nm.NeedsTraining() {
526 // Lazily open junk filter.
527 if jf == nil {
528 jf, _, err = acc.OpenJunkFilter(ctx, log)
529 x.Checkf(ctx, err, "open junk filter")
530 }
531 err := acc.RetrainMessage(ctx, log, tx, jf, &nm)
532 x.Checkf(ctx, err, "retrain message after moving")
533 }
534
535 changeRemoveUIDs.UIDs = append(changeRemoveUIDs.UIDs, om.UID)
536 changeRemoveUIDs.MsgIDs = append(changeRemoveUIDs.MsgIDs, om.ID)
537 changes = append(changes, nm.ChangeAddUID(mbDst))
538 }
539
540 for dir := range syncDirs {
541 err := moxio.SyncDir(log, dir)
542 x.Checkf(ctx, err, "sync directory")
543 }
544
545 xflushMailbox()
546
547 changes = append(changes, mbDst.ChangeCounts())
548 if nkeywords > len(mbDst.Keywords) {
549 changes = append(changes, mbDst.ChangeKeywords())
550 }
551
552 err = tx.Update(&mbDst)
553 x.Checkf(ctx, err, "updating destination mailbox with uidnext and modseq")
554
555 if jf != nil {
556 err := jf.Close()
557 x.Checkf(ctx, err, "saving junk filter")
558 jf = nil
559 }
560
561 commit = true
562 return newIDs, changes
563}
564
565func isText(p message.Part) bool {
566 return p.MediaType == "" && p.MediaSubType == "" || p.MediaType == "TEXT" && p.MediaSubType == "PLAIN"
567}
568
569func isHTML(p message.Part) bool {
570 return p.MediaType == "" && p.MediaSubType == "" || p.MediaType == "TEXT" && p.MediaSubType == "HTML"
571}
572
573func isAlternative(p message.Part) bool {
574 return p.MediaType == "MULTIPART" && p.MediaSubType == "ALTERNATIVE"
575}
576
577func readPart(p message.Part, maxSize int64) (string, error) {
578 buf, err := io.ReadAll(io.LimitReader(p.ReaderUTF8OrBinary(), maxSize))
579 if err != nil {
580 return "", fmt.Errorf("reading part contents: %v", err)
581 }
582 return string(buf), nil
583}
584
585// ReadableParts returns the contents of the first text and/or html parts,
586// descending into multiparts, truncated to maxSize bytes if longer.
587func ReadableParts(p message.Part, maxSize int64) (text string, html string, found bool, err error) {
588 // todo: may want to merge this logic with webmail's message parsing.
589
590 // For non-multipart messages, top-level part.
591 if isText(p) {
592 data, err := readPart(p, maxSize)
593 return data, "", true, err
594 } else if isHTML(p) {
595 data, err := readPart(p, maxSize)
596 return "", data, true, err
597 }
598
599 // Look in sub-parts. Stop when we have a readable part, don't continue with other
600 // subparts unless we have a multipart/alternative.
601 // todo: we may have to look at disposition "inline".
602 var haveText, haveHTML bool
603 for _, pp := range p.Parts {
604 if isText(pp) {
605 haveText = true
606 text, err = readPart(pp, maxSize)
607 if !isAlternative(p) {
608 break
609 }
610 } else if isHTML(pp) {
611 haveHTML = true
612 html, err = readPart(pp, maxSize)
613 if !isAlternative(p) {
614 break
615 }
616 }
617 }
618 if haveText || haveHTML {
619 return text, html, true, err
620 }
621
622 // Descend into the subparts.
623 for _, pp := range p.Parts {
624 text, html, found, err = ReadableParts(pp, maxSize)
625 if found {
626 break
627 }
628 }
629 return
630}
631