1package webmail
2
3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
4
5import (
6 "compress/gzip"
7 "context"
8 cryptrand "crypto/rand"
9 "encoding/base64"
10 "encoding/json"
11 "errors"
12 "fmt"
13 "log/slog"
14 "net/http"
15 "path/filepath"
16 "reflect"
17 "runtime/debug"
18 "slices"
19 "strconv"
20 "strings"
21 "sync"
22 "time"
23
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
26
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxvar"
33 "github.com/mjl-/mox/smtp"
34 "github.com/mjl-/mox/store"
35)
36
37// Request is a request to an SSE connection to send messages, either for a new
38// view, to continue with an existing view, or to a cancel an ongoing request.
39type Request struct {
40 ID int64
41
42 SSEID int64 // SSE connection.
43
44 // To indicate a request is a continuation (more results) of the previous view.
45 // Echoed in events, client checks if it is getting results for the latest request.
46 ViewID int64
47
48 // If set, this request and its view are canceled. A new view must be started.
49 Cancel bool
50
51 Query Query
52 Page Page
53}
54
55type ThreadMode string
56
57const (
58 ThreadOff ThreadMode = "off"
59 ThreadOn ThreadMode = "on"
60 ThreadUnread ThreadMode = "unread"
61)
62
63// Query is a request for messages that match filters, in a given order.
64type Query struct {
65 OrderAsc bool // Order by received ascending or desending.
66 Threading ThreadMode
67 Filter Filter
68 NotFilter NotFilter
69}
70
71// AttachmentType is for filtering by attachment type.
72type AttachmentType string
73
74const (
75 AttachmentIndifferent AttachmentType = ""
76 AttachmentNone AttachmentType = "none"
77 AttachmentAny AttachmentType = "any"
78 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
79 AttachmentPDF AttachmentType = "pdf"
80 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
81 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
82 AttachmentDocument AttachmentType = "document" // odt, docx, ...
83 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
84)
85
86// Filter selects the messages to return. Fields that are set must all match,
87// for slices each element by match ("and").
88type Filter struct {
89 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
90 MailboxID int64
91
92 // If true, also submailboxes are included in the search.
93 MailboxChildrenIncluded bool
94
95 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
96 // connection setup, where it is turned into a MailboxID. Filtering only looks at
97 // MailboxID.
98 MailboxName string
99
100 Words []string // Case insensitive substring match for each string.
101 From []string
102 To []string // Including Cc and Bcc.
103 Oldest *time.Time
104 Newest *time.Time
105 Subject []string
106 Attachments AttachmentType
107 Labels []string
108 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
109 SizeMin int64
110 SizeMax int64
111}
112
113// NotFilter matches messages that don't match these fields.
114type NotFilter struct {
115 Words []string
116 From []string
117 To []string
118 Subject []string
119 Attachments AttachmentType
120 Labels []string
121}
122
123// Page holds pagination parameters for a request.
124type Page struct {
125 // Start returning messages after this ID, if > 0. For pagination, fetching the
126 // next set of messages.
127 AnchorMessageID int64
128
129 // Number of messages to return, must be >= 1, we never return more than 10000 for
130 // one request.
131 Count int
132
133 // If > 0, return messages until DestMessageID is found. More than Count messages
134 // can be returned. For long-running searches, it may take a while before this
135 // message if found.
136 DestMessageID int64
137}
138
139// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
140
141// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
142// included.
143type MessageAddress struct {
144 Name string // Free-form name for display in mail applications.
145 User string // Localpart, encoded.
146 Domain dns.Domain
147}
148
149// MessageEnvelope is like message.Envelope, as used in message.Part, but including
150// unicode host names for IDNA names.
151type MessageEnvelope struct {
152 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
153 Date time.Time
154 Subject string
155 From []MessageAddress
156 Sender []MessageAddress
157 ReplyTo []MessageAddress
158 To []MessageAddress
159 CC []MessageAddress
160 BCC []MessageAddress
161 InReplyTo string
162 MessageID string
163}
164
165// MessageItem is sent by queries, it has derived information analyzed from
166// message.Part, made for the needs of the message items in the message list.
167// messages.
168type MessageItem struct {
169 Message store.Message // Without ParsedBuf and MsgPrefix, for size. With Preview, even if it isn't stored yet in the database.
170 Envelope MessageEnvelope
171 Attachments []Attachment
172 IsSigned bool
173 IsEncrypted bool
174 MatchQuery bool // If message does not match query, it can still be included because of threading.
175 MoreHeaders [][2]string // All headers from store.Settings.ShowHeaders that are present.
176}
177
178// ParsedMessage has more parsed/derived information about a message, intended
179// for rendering the (contents of the) message. Information from MessageItem is
180// not duplicated.
181type ParsedMessage struct {
182 ID int64
183 Part message.Part
184 Headers map[string][]string
185 ViewMode store.ViewMode
186
187 Texts []string // Contents of text parts, can be empty.
188
189 // Whether there is an HTML part. The webclient renders HTML message parts through
190 // an iframe and a separate request with strict CSP headers to prevent script
191 // execution and loading of external resources, which isn't possible when loading
192 // in iframe with inline HTML because not all browsers support the iframe csp
193 // attribute.
194 HasHTML bool
195
196 ListReplyAddress *MessageAddress // From List-Post.
197
198 TextPaths [][]int // Paths to text parts.
199 HTMLPath []int // Path to HTML part.
200
201 // Information used by MessageItem, not exported in this type.
202 envelope MessageEnvelope
203 attachments []Attachment
204 isSigned bool
205 isEncrypted bool
206}
207
208// EventStart is the first message sent on an SSE connection, giving the client
209// basic data to populate its UI. After this event, messages will follow quickly in
210// an EventViewMsgs event.
211type EventStart struct {
212 SSEID int64
213 LoginAddress MessageAddress
214 Addresses []MessageAddress
215 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
216 MailboxName string
217 Mailboxes []store.Mailbox
218 RejectsMailbox string
219 Settings store.Settings
220 AccountPath string // If nonempty, the path on same host to webaccount interface.
221 Version string
222}
223
224// DomainAddressConfig has the address (localpart) configuration for a domain, so
225// the webmail client can decide if an address matches the addresses of the
226// account.
227type DomainAddressConfig struct {
228 LocalpartCatchallSeparators []string // Can be empty.
229 LocalpartCaseSensitive bool
230}
231
232// EventViewMsgs contains messages for a view, possibly a continuation of an
233// earlier list of messages.
234type EventViewMsgs struct {
235 ViewID int64
236 RequestID int64
237
238 // If empty, this was the last message for the request. If non-empty, a list of
239 // thread messages. Each with the first message being the reason this thread is
240 // included and can be used as AnchorID in followup requests. If the threading mode
241 // is "off" in the query, there will always be only a single message. If a thread
242 // is sent, all messages in the thread are sent, including those that don't match
243 // the query (e.g. from another mailbox). Threads can be displayed based on the
244 // ThreadParentIDs field, with possibly slightly different display based on field
245 // ThreadMissingLink.
246 MessageItems [][]MessageItem
247
248 // If set, will match the target page.DestMessageID from the request.
249 ParsedMessage *ParsedMessage
250
251 // If set, there are no more messages in this view at this moment. Messages can be
252 // added, typically via Change messages, e.g. for new deliveries.
253 ViewEnd bool
254}
255
256// EventViewErr indicates an error during a query for messages. The request is
257// aborted, no more request-related messages will be sent until the next request.
258type EventViewErr struct {
259 ViewID int64
260 RequestID int64
261 Err string // To be displayed in client.
262 err error // Original message, for checking against context.Canceled.
263}
264
265// EventViewReset indicates that a request for the next set of messages in a few
266// could not be fulfilled, e.g. because the anchor message does not exist anymore.
267// The client should clear its list of messages. This can happen before
268// EventViewMsgs events are sent.
269type EventViewReset struct {
270 ViewID int64
271 RequestID int64
272}
273
274// EventViewChanges contain one or more changes relevant for the client, either
275// with new mailbox total/unseen message counts, or messages added/removed/modified
276// (flags) for the current view.
277type EventViewChanges struct {
278 ViewID int64
279 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
280}
281
282// ChangeMsgAdd adds a new message and possibly its thread to the view.
283type ChangeMsgAdd struct {
284 store.ChangeAddUID
285 MessageItems []MessageItem
286}
287
288// ChangeMsgRemove removes one or more messages from the view.
289type ChangeMsgRemove struct {
290 store.ChangeRemoveUIDs
291}
292
293// ChangeMsgFlags updates flags for one message.
294type ChangeMsgFlags struct {
295 store.ChangeFlags
296}
297
298// ChangeMsgThread updates muted/collapsed fields for one message.
299type ChangeMsgThread struct {
300 store.ChangeThread
301}
302
303// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
304type ChangeMailboxRemove struct {
305 store.ChangeRemoveMailbox
306}
307
308// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
309type ChangeMailboxAdd struct {
310 Mailbox store.Mailbox
311}
312
313// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
314// It could be under a new parent.
315type ChangeMailboxRename struct {
316 store.ChangeRenameMailbox
317}
318
319// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
320type ChangeMailboxCounts struct {
321 store.ChangeMailboxCounts
322}
323
324// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
325type ChangeMailboxSpecialUse struct {
326 store.ChangeMailboxSpecialUse
327}
328
329// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
330// a message was added with a keyword that wasn't in the mailbox yet.
331type ChangeMailboxKeywords struct {
332 store.ChangeMailboxKeywords
333}
334
335// View holds the information about the returned data for a query. It is used to
336// determine whether mailbox changes should be sent to the client, we only send
337// addition/removal/flag-changes of messages that are in view, or would extend it
338// if the view is at the end of the results.
339type view struct {
340 Request Request
341
342 // Received of last message we sent to the client. We use it to decide if a newly
343 // delivered message is within the view and the client should get a notification.
344 LastMessageReceived time.Time
345
346 // If set, the last message in the query view has been sent. There is no need to do
347 // another query, it will not return more data. Used to decide if an event for a
348 // new message should be sent.
349 End bool
350
351 // Whether message must or must not match mailboxIDs.
352 matchMailboxIDs bool
353 // Mailboxes to match, can be multiple, for matching children. If empty, there is
354 // no filter on mailboxes.
355 mailboxIDs map[int64]bool
356
357 // Threads sent to client. New messages for this thread are also sent, regardless
358 // of regular query matching, so also for other mailboxes. If the user (re)moved
359 // all messages of a thread, they may still receive events for the thread. Only
360 // filled when query with threading not off.
361 threadIDs map[int64]struct{}
362}
363
364// sses tracks all sse connections, and access to them.
365var sses = struct {
366 sync.Mutex
367 gen int64
368 m map[int64]sse
369}{m: map[int64]sse{}}
370
371// sse represents an sse connection.
372type sse struct {
373 ID int64 // Also returned in EventStart and used in Request to identify the request.
374 AccountName string // Used to check the authenticated user has access to the SSE connection.
375 Request chan Request // Goroutine will receive requests from here, coming from API calls.
376}
377
378// called by the goroutine when the connection is closed or breaks.
379func (sse sse) unregister() {
380 sses.Lock()
381 defer sses.Unlock()
382 delete(sses.m, sse.ID)
383
384 // Drain any pending requests, preventing blocked goroutines from API calls.
385 for {
386 select {
387 case <-sse.Request:
388 default:
389 return
390 }
391 }
392}
393
394func sseRegister(accountName string) sse {
395 sses.Lock()
396 defer sses.Unlock()
397 sses.gen++
398 v := sse{sses.gen, accountName, make(chan Request, 1)}
399 sses.m[v.ID] = v
400 return v
401}
402
403// sseGet returns a reference to an existing connection if it exists and user
404// has access.
405func sseGet(id int64, accountName string) (sse, bool) {
406 sses.Lock()
407 defer sses.Unlock()
408 s := sses.m[id]
409 if s.AccountName != accountName {
410 return sse{}, false
411 }
412 return s, true
413}
414
415// ssetoken is a temporary token that has not yet been used to start an SSE
416// connection. Created by Token, consumed by a new SSE connection.
417type ssetoken struct {
418 token string // Uniquely generated.
419 accName string
420 address string // Address used to authenticate in call that created the token.
421 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
422 validUntil time.Time
423}
424
425// ssetokens maintains unused tokens. We have just one, but it's a type so we
426// can define methods.
427type ssetokens struct {
428 sync.Mutex
429 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
430 tokens map[string]ssetoken // Token to details, for finding account for a token.
431}
432
433var sseTokens = ssetokens{
434 accountTokens: map[string][]ssetoken{},
435 tokens: map[string]ssetoken{},
436}
437
438// xgenerate creates and saves a new token. It ensures no more than 10 tokens
439// per account exist, removing old ones if needed.
440func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
441 buf := make([]byte, 16)
442 _, err := cryptrand.Read(buf)
443 xcheckf(ctx, err, "generating token")
444 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
445
446 x.Lock()
447 defer x.Unlock()
448 n := len(x.accountTokens[accName])
449 if n >= 10 {
450 for _, ost := range x.accountTokens[accName][:n-9] {
451 delete(x.tokens, ost.token)
452 }
453 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
454 x.accountTokens[accName] = x.accountTokens[accName][:9]
455 }
456 x.accountTokens[accName] = append(x.accountTokens[accName], st)
457 x.tokens[st.token] = st
458 return st.token
459}
460
461// check verifies a token, and consumes it if valid.
462func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
463 x.Lock()
464 defer x.Unlock()
465
466 st, ok := x.tokens[token]
467 if !ok {
468 return "", "", "", false, nil
469 }
470 delete(x.tokens, token)
471 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
472 return "", "", "", false, errors.New("internal error, could not find token in account")
473 } else {
474 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
475 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
476 if len(x.accountTokens[st.accName]) == 0 {
477 delete(x.accountTokens, st.accName)
478 }
479 }
480 if time.Now().After(st.validUntil) {
481 return "", "", "", false, nil
482 }
483 return st.accName, st.address, st.sessionToken, true, nil
484}
485
486// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
487type ioErr struct {
488 err error
489}
490
491// ensure we have a non-nil moreHeaders, taking it from Settings.
492func ensureMoreHeaders(tx *bstore.Tx, moreHeaders []string) ([]string, error) {
493 if moreHeaders != nil {
494 return moreHeaders, nil
495 }
496
497 s := store.Settings{ID: 1}
498 if err := tx.Get(&s); err != nil {
499 return nil, fmt.Errorf("get settings: %v", err)
500 }
501 moreHeaders = s.ShowHeaders
502 if moreHeaders == nil {
503 moreHeaders = []string{} // Ensure we won't get Settings again next call.
504 }
505 return moreHeaders, nil
506}
507
508// serveEvents serves an SSE connection. Authentication is done through a query
509// string parameter "singleUseToken", a one-time-use token returned by the Token
510// API call.
511func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
512 if r.Method != "GET" {
513 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
514 return
515 }
516
517 flusher, ok := w.(http.Flusher)
518 if !ok {
519 log.Error("internal error: ResponseWriter not a http.Flusher")
520 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
521 return
522 }
523
524 q := r.URL.Query()
525 token := q.Get("singleUseToken")
526 if token == "" {
527 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
528 return
529 }
530 accName, address, sessionToken, ok, err := sseTokens.check(token)
531 if err != nil {
532 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
533 return
534 }
535 if !ok {
536 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
537 return
538 }
539 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
540 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
541 return
542 }
543
544 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
545 // incoming responses with its slow-network similation.
546 var waitMin, waitMax time.Duration
547 waitMinMsec := q.Get("waitMinMsec")
548 waitMaxMsec := q.Get("waitMaxMsec")
549 if waitMinMsec != "" && waitMaxMsec != "" {
550 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
551 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
552 return
553 } else {
554 waitMin = time.Duration(v) * time.Millisecond
555 }
556
557 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
558 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
559 return
560 } else {
561 waitMax = time.Duration(v) * time.Millisecond
562 }
563 }
564
565 // Parse the request with initial mailbox/search criteria.
566 var req Request
567 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
568 dec.DisallowUnknownFields()
569 if err := dec.Decode(&req); err != nil {
570 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
571 return
572 } else if req.Page.Count <= 0 {
573 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
574 return
575 }
576 if req.Query.Threading == "" {
577 req.Query.Threading = ThreadOff
578 }
579
580 var writer *eventWriter
581
582 metricSSEConnections.Inc()
583 defer metricSSEConnections.Dec()
584
585 // Below here, error handling cause through xcheckf, which panics with
586 // *sherpa.Error, after which we send an error event to the client. We can also get
587 // an *ioErr when the connection is broken.
588 defer func() {
589 x := recover()
590 if x == nil {
591 return
592 }
593 if err, ok := x.(*sherpa.Error); ok {
594 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
595 } else if _, ok := x.(ioErr); ok {
596 return
597 } else {
598 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
599 debug.PrintStack()
600 metrics.PanicInc(metrics.Webmail)
601 panic(x)
602 }
603 }()
604
605 h := w.Header()
606 h.Set("Content-Type", "text/event-stream")
607 h.Set("Cache-Control", "no-cache")
608
609 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
610 // keys), so should be quite compressible.
611 var out writeFlusher
612 gz := mox.AcceptsGzip(r)
613 if gz {
614 h.Set("Content-Encoding", "gzip")
615 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
616 } else {
617 out = nopFlusher{w}
618 }
619 out = httpFlusher{out, flusher}
620
621 // We'll be writing outgoing SSE events through writer.
622 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
623 defer writer.close()
624
625 // Fetch initial data.
626 acc, err := store.OpenAccount(log, accName, true)
627 xcheckf(ctx, err, "open account")
628 defer func() {
629 err := acc.Close()
630 log.Check(err, "closing account")
631 }()
632 comm := store.RegisterComm(acc)
633 defer comm.Unregister()
634
635 // List addresses that the client can use to send email from.
636 accConf, _ := acc.Conf()
637 loginAddr, err := smtp.ParseAddress(address)
638 xcheckf(ctx, err, "parsing login address")
639 _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false, false)
640 xcheckf(ctx, err, "looking up destination for login address")
641 loginName := accConf.FullName
642 if dest.FullName != "" {
643 loginName = dest.FullName
644 }
645 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
646 var addresses []MessageAddress
647 for a, dest := range accConf.Destinations {
648 name := dest.FullName
649 if name == "" {
650 name = accConf.FullName
651 }
652 var ma MessageAddress
653 if strings.HasPrefix(a, "@") {
654 dom, err := dns.ParseDomain(a[1:])
655 xcheckf(ctx, err, "parsing destination address for account")
656 ma = MessageAddress{Domain: dom}
657 } else {
658 addr, err := smtp.ParseAddress(a)
659 xcheckf(ctx, err, "parsing destination address for account")
660 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
661 }
662 addresses = append(addresses, ma)
663 }
664 // User is allowed to send using alias address as message From address. Webmail
665 // will choose it when replying to a message sent to that address.
666 aliasAddrs := map[MessageAddress]bool{}
667 for _, a := range accConf.Aliases {
668 if a.Alias.AllowMsgFrom {
669 ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
670 if !aliasAddrs[ma] {
671 addresses = append(addresses, ma)
672 }
673 aliasAddrs[ma] = true
674 }
675 }
676
677 // We implicitly start a query. We use the reqctx for the transaction, because the
678 // transaction is passed to the query, which can be canceled.
679 reqctx, reqctxcancel := context.WithCancel(ctx)
680 defer func() {
681 // We also cancel in cancelDrain later on, but there is a brief window where the
682 // context wouldn't be canceled.
683 if reqctxcancel != nil {
684 reqctxcancel()
685 reqctxcancel = nil
686 }
687 }()
688
689 // qtx is kept around during connection initialization, until we pass it off to the
690 // goroutine that starts querying for messages.
691 var qtx *bstore.Tx
692 defer func() {
693 if qtx != nil {
694 err := qtx.Rollback()
695 log.Check(err, "rolling back")
696 }
697 }()
698
699 var mbl []store.Mailbox
700 settings := store.Settings{ID: 1}
701
702 // We only take the rlock when getting the tx.
703 acc.WithRLock(func() {
704 // Now a read-only transaction we'll use during the query.
705 qtx, err = acc.DB.Begin(reqctx, false)
706 xcheckf(ctx, err, "begin transaction")
707
708 mbl, err = bstore.QueryTx[store.Mailbox](qtx).FilterEqual("Expunged", false).List()
709 xcheckf(ctx, err, "list mailboxes")
710
711 err = qtx.Get(&settings)
712 xcheckf(ctx, err, "get settings")
713 })
714
715 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
716 var zerofilter Filter
717 var zeronotfilter NotFilter
718 var mailbox store.Mailbox
719 var mailboxPrefixes []string
720 var matchMailboxes bool
721 mailboxIDs := map[int64]bool{}
722 mailboxName := req.Query.Filter.MailboxName
723 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
724 if mailboxName == "" {
725 mailboxName = "Inbox"
726 }
727
728 var inbox store.Mailbox
729 for _, e := range mbl {
730 if e.Name == mailboxName {
731 mailbox = e
732 }
733 if e.Name == "Inbox" {
734 inbox = e
735 }
736 }
737 if mailbox.ID == 0 {
738 mailbox = inbox
739 }
740 if mailbox.ID == 0 {
741 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
742 }
743 req.Query.Filter.MailboxID = mailbox.ID
744 req.Query.Filter.MailboxName = ""
745 mailboxPrefixes = []string{mailbox.Name + "/"}
746 matchMailboxes = true
747 mailboxIDs[mailbox.ID] = true
748 } else {
749 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
750 }
751 if req.Query.Filter.MailboxChildrenIncluded {
752 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
753 }
754
755 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
756
757 sse := sseRegister(acc.Name)
758 defer sse.unregister()
759
760 // Per-domain localpart config so webclient can decide if an address belongs to the account.
761 domainAddressConfigs := map[string]DomainAddressConfig{}
762 for _, a := range addresses {
763 dom, _ := mox.Conf.Domain(a.Domain)
764 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparatorsEffective, dom.LocalpartCaseSensitive}
765 }
766
767 // Write first event, allowing client to fill its UI with mailboxes.
768 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
769 writer.xsendEvent(ctx, log, "start", start)
770
771 // The goroutine doing the querying will send messages on these channels, which
772 // result in an event being written on the SSE connection.
773 viewMsgsc := make(chan EventViewMsgs)
774 viewErrc := make(chan EventViewErr)
775 viewResetc := make(chan EventViewReset)
776 donec := make(chan int64) // When request is done.
777
778 // Start a view, it determines if we send a change to the client. And start an
779 // implicit query for messages, we'll send the messages to the client which can
780 // fill its ui with messages.
781 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
782 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
783 qtx = nil // viewRequestTx closes qtx
784
785 // When canceling a query, we must drain its messages until it says it is done.
786 // Otherwise the sending goroutine would hang indefinitely on a channel send.
787 cancelDrain := func() {
788 if reqctxcancel != nil {
789 // Cancel the goroutine doing the querying.
790 reqctxcancel()
791 reqctx = nil
792 reqctxcancel = nil
793 } else {
794 return
795 }
796
797 // Drain events until done.
798 for {
799 select {
800 case <-viewMsgsc:
801 case <-viewErrc:
802 case <-viewResetc:
803 case <-donec:
804 return
805 }
806 }
807 }
808
809 // If we stop and a query is in progress, we must drain the channel it will send on.
810 defer cancelDrain()
811
812 // Changes broadcasted by other connections on this account. If applicable for the
813 // connection/view, we send events.
814 xprocessChanges := func(changes []store.Change) {
815 taggedChanges := [][2]any{}
816
817 newPreviews := map[int64]string{}
818 defer storeNewPreviews(ctx, log, acc, newPreviews)
819
820 // We get a transaction first time we need it.
821 var xtx *bstore.Tx
822 defer func() {
823 if xtx != nil {
824 err := xtx.Rollback()
825 log.Check(err, "rolling back transaction")
826 }
827 }()
828 ensureTx := func() error {
829 if xtx != nil {
830 return nil
831 }
832 acc.RLock()
833 defer acc.RUnlock()
834 var err error
835 xtx, err = acc.DB.Begin(ctx, false)
836 return err
837 }
838 // This getmsg will now only be called mailboxID+UID, not with messageID set.
839 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
840 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
841 if err := ensureTx(); err != nil {
842 return store.Message{}, fmt.Errorf("transaction: %v", err)
843 }
844 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
845 }
846
847 // Additional headers from settings to add to MessageItems.
848 var moreHeaders []string
849 xmoreHeaders := func() []string {
850 err := ensureTx()
851 xcheckf(ctx, err, "transaction")
852
853 moreHeaders, err = ensureMoreHeaders(xtx, moreHeaders)
854 xcheckf(ctx, err, "ensuring more headers")
855 return moreHeaders
856 }
857
858 // Return uids that are within range in view. Because the end has been reached, or
859 // because the UID is not after the last message.
860 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
861 uidsAny := make([]any, len(uids))
862 for i, uid := range uids {
863 uidsAny[i] = uid
864 }
865 err := ensureTx()
866 xcheckf(ctx, err, "transaction")
867 q := bstore.QueryTx[store.Message](xtx)
868 q.FilterNonzero(store.Message{MailboxID: mailboxID})
869 q.FilterEqual("UID", uidsAny...)
870 mbOK := v.matchesMailbox(mailboxID)
871 err = q.ForEach(func(m store.Message) error {
872 _, thread := v.threadIDs[m.ThreadID]
873 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
874 changedUIDs = append(changedUIDs, m.UID)
875 }
876 return nil
877 })
878 xcheckf(ctx, err, "fetching messages for change")
879 return changedUIDs
880 }
881
882 // Forward changes that are relevant to the current view.
883 for _, change := range changes {
884 switch c := change.(type) {
885 case store.ChangeAddUID:
886 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
887 xcheckf(ctx, err, "matching new message against view")
888 m, err := getmsg(0, c.MailboxID, c.UID)
889 xcheckf(ctx, err, "get message")
890 _, thread := v.threadIDs[m.ThreadID]
891 if !ok && !thread {
892 continue
893 }
894
895 state := msgState{acc: acc, log: log, newPreviews: newPreviews}
896 mi, err := messageItem(log, m, &state, xmoreHeaders())
897 state.clear()
898 xcheckf(ctx, err, "make messageitem")
899 mi.MatchQuery = ok
900
901 mil := []MessageItem{mi}
902 if !thread && req.Query.Threading != ThreadOff {
903 err := ensureTx()
904 xcheckf(ctx, err, "transaction")
905 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false, xmoreHeaders(), newPreviews)
906 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
907 mil = append(mil, more...)
908 v.threadIDs[m.ThreadID] = struct{}{}
909 }
910
911 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
912
913 // If message extends the view, store it as such.
914 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
915 v.LastMessageReceived = m.Received
916 }
917
918 case store.ChangeRemoveUIDs:
919 comm.RemovalSeen(c)
920
921 // We may send changes for uids the client doesn't know, that's fine.
922 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
923 if len(changedUIDs) == 0 {
924 continue
925 }
926 ch := ChangeMsgRemove{c}
927 ch.UIDs = changedUIDs
928 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
929
930 case store.ChangeFlags:
931 // We may send changes for uids the client doesn't know, that's fine.
932 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
933 if len(changedUIDs) == 0 {
934 continue
935 }
936 ch := ChangeMsgFlags{c}
937 ch.UID = changedUIDs[0]
938 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
939
940 case store.ChangeThread:
941 // Change in muted/collaped state, just always ship it.
942 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
943
944 case store.ChangeRemoveMailbox:
945 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
946
947 case store.ChangeAddMailbox:
948 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
949
950 case store.ChangeRenameMailbox:
951 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
952
953 case store.ChangeMailboxCounts:
954 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
955
956 case store.ChangeMailboxSpecialUse:
957 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
958
959 case store.ChangeMailboxKeywords:
960 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
961
962 case store.ChangeAddSubscription, store.ChangeRemoveSubscription:
963 // Webmail does not care about subscriptions.
964
965 case store.ChangeAnnotation:
966 // Nothing.
967
968 default:
969 panic(fmt.Sprintf("missing case for change %T", c))
970 }
971 }
972
973 if len(taggedChanges) > 0 {
974 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
975 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
976 }
977 }
978
979 timer := time.NewTimer(5 * time.Minute) // For keepalives.
980 defer timer.Stop()
981 for {
982 if writer.wrote {
983 timer.Reset(5 * time.Minute)
984 writer.wrote = false
985 }
986
987 pending := comm.Pending
988 if reqctx != nil {
989 pending = nil
990 }
991
992 select {
993 case <-mox.Shutdown.Done():
994 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
995 // Work around go vet, it doesn't see defer cancelDrain.
996 if reqctxcancel != nil {
997 reqctxcancel()
998 }
999 return
1000
1001 case <-timer.C:
1002 _, err := fmt.Fprintf(out, ": keepalive\n\n")
1003 if err == nil {
1004 err = out.Flush()
1005 }
1006 if err != nil {
1007 log.Errorx("write keepalive", err)
1008 // Work around go vet, it doesn't see defer cancelDrain.
1009 if reqctxcancel != nil {
1010 reqctxcancel()
1011 }
1012 return
1013 }
1014 writer.wrote = true
1015
1016 case vm := <-viewMsgsc:
1017 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
1018 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
1019 }
1020 if vm.ViewEnd {
1021 v.End = true
1022 }
1023 if len(vm.MessageItems) > 0 {
1024 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
1025 }
1026 writer.xsendEvent(ctx, log, "viewMsgs", vm)
1027
1028 case ve := <-viewErrc:
1029 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
1030 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
1031 }
1032 if errors.Is(ve.err, context.Canceled) || mlog.IsClosed(ve.err) {
1033 // Work around go vet, it doesn't see defer cancelDrain.
1034 if reqctxcancel != nil {
1035 reqctxcancel()
1036 }
1037 return
1038 }
1039 writer.xsendEvent(ctx, log, "viewErr", ve)
1040
1041 case vr := <-viewResetc:
1042 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
1043 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
1044 }
1045 writer.xsendEvent(ctx, log, "viewReset", vr)
1046
1047 case id := <-donec:
1048 if id != v.Request.ID {
1049 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
1050 }
1051 if reqctxcancel != nil {
1052 reqctxcancel()
1053 }
1054 reqctx = nil
1055 reqctxcancel = nil
1056
1057 case req := <-sse.Request:
1058 if reqctx != nil {
1059 cancelDrain()
1060 }
1061 if req.Cancel {
1062 v = view{req, time.Time{}, false, false, nil, nil}
1063 continue
1064 }
1065
1066 reqctx, reqctxcancel = context.WithCancel(ctx)
1067
1068 stop := func() (stop bool) {
1069 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1070 var rtx *bstore.Tx
1071 var err error
1072 defer func() {
1073 if rtx != nil {
1074 err = rtx.Rollback()
1075 log.Check(err, "rolling back transaction")
1076 }
1077 }()
1078 acc.WithRLock(func() {
1079 rtx, err = acc.DB.Begin(reqctx, false)
1080 })
1081 if err != nil {
1082 reqctxcancel()
1083 reqctx = nil
1084 reqctxcancel = nil
1085
1086 if errors.Is(err, context.Canceled) {
1087 return true
1088 }
1089 err := fmt.Errorf("begin transaction: %v", err)
1090 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1091 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1092 return false
1093 }
1094
1095 // Reset view state for new query.
1096 if req.ViewID != v.Request.ViewID {
1097 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1098 if req.Query.Filter.MailboxChildrenIncluded {
1099 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1100 }
1101 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1102 } else {
1103 v.Request = req
1104 }
1105 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1106 rtx = nil
1107 return false
1108 }()
1109 if stop {
1110 return
1111 }
1112
1113 case <-pending:
1114 overflow, changes := comm.Get()
1115 if overflow {
1116 writer.xsendEvent(ctx, log, "fatalErr", "out of sync, too many pending changes")
1117 return
1118 }
1119 xprocessChanges(changes)
1120
1121 case <-ctx.Done():
1122 // Work around go vet, it doesn't see defer cancelDrain.
1123 if reqctxcancel != nil {
1124 reqctxcancel()
1125 }
1126 return
1127 }
1128 }
1129}
1130
1131// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1132// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1133// mailboxIDs must or must not match. mailboxPrefixes is for use with
1134// xgatherMailboxIDs to gather children of the mailboxIDs.
1135func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1136 matchMailboxes = true
1137 mailboxIDs = map[int64]bool{}
1138 if f.MailboxID == -1 {
1139 matchMailboxes = false
1140 // Add the trash, junk and account rejects mailbox.
1141 err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
1142 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1143 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1144 mailboxIDs[mb.ID] = true
1145 }
1146 return nil
1147 })
1148 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1149 } else if f.MailboxID > 0 {
1150 mb, err := store.MailboxID(tx, f.MailboxID)
1151 xcheckf(ctx, err, "get mailbox")
1152 mailboxIDs[f.MailboxID] = true
1153 mailboxPrefixes = []string{mb.Name + "/"}
1154 }
1155 return
1156}
1157
1158// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1159// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1160func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1161 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1162 if len(mailboxPrefixes) == 0 {
1163 return
1164 }
1165 err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
1166 for _, p := range mailboxPrefixes {
1167 if strings.HasPrefix(mb.Name, p) {
1168 mailboxIDs[mb.ID] = true
1169 break
1170 }
1171 }
1172 return nil
1173 })
1174 xcheckf(ctx, err, "gathering mailboxes")
1175}
1176
1177// matchesMailbox returns whether a mailbox matches the view.
1178func (v view) matchesMailbox(mailboxID int64) bool {
1179 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1180}
1181
1182// inRange returns whether m is within the range for the view, whether a change for
1183// this message should be sent to the client so it can update its state.
1184func (v view) inRange(m store.Message) bool {
1185 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1186}
1187
1188// matches checks if the message, identified by either messageID or mailboxID+UID,
1189// is in the current "view" (i.e. passing the filters, and if checkRange is set
1190// also if within the range of sent messages based on sort order and the last seen
1191// message). getmsg retrieves the message, which may be necessary depending on the
1192// active filters. Used to determine if a store.Change with a new message should be
1193// sent, and for the destination and anchor messages in view requests.
1194func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1195 var m store.Message
1196 ensureMessage := func() bool {
1197 if m.ID == 0 && rerr == nil {
1198 m, rerr = getmsg(messageID, mailboxID, uid)
1199 }
1200 return rerr == nil
1201 }
1202
1203 q := v.Request.Query
1204
1205 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1206
1207 // Check filters.
1208 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1209 return false, rerr
1210 }
1211 // note: anchorMessageID is not relevant for matching.
1212 flagfilter := q.flagFilterFn()
1213 if flagfilter != nil && !flagfilter(flags, keywords) {
1214 return false, rerr
1215 }
1216
1217 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1218 return false, rerr
1219 }
1220 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1221 return false, rerr
1222 }
1223
1224 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1225 return false, rerr
1226 }
1227 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1228 return false, rerr
1229 }
1230
1231 state := msgState{acc: acc, log: log}
1232 defer func() {
1233 if rerr == nil && state.err != nil {
1234 rerr = state.err
1235 }
1236 state.clear()
1237 }()
1238
1239 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1240 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1241 return false, rerr
1242 }
1243
1244 envFilter := q.envFilterFn(log, &state)
1245 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1246 return false, rerr
1247 }
1248
1249 headerFilter := q.headerFilterFn(log, &state)
1250 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1251 return false, rerr
1252 }
1253
1254 wordsFilter := q.wordsFilterFn(log, &state)
1255 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1256 return false, rerr
1257 }
1258
1259 // Now check that we are either within the sorting order, or "last" was sent.
1260 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1261 return true, rerr
1262 }
1263 return false, rerr
1264}
1265
1266type msgResp struct {
1267 err error // If set, an error happened and fields below are not set.
1268 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1269 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1270 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1271 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1272}
1273
1274func storeNewPreviews(ctx context.Context, log mlog.Log, acc *store.Account, newPreviews map[int64]string) {
1275 if len(newPreviews) == 0 {
1276 return
1277 }
1278
1279 defer func() {
1280 x := recover()
1281 if x != nil {
1282 log.Error("unhandled panic in storeNewPreviews", slog.Any("err", x))
1283 debug.PrintStack()
1284 metrics.PanicInc(metrics.Store)
1285 }
1286 }()
1287
1288 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1289 for id, preview := range newPreviews {
1290 m := store.Message{ID: id}
1291 if err := tx.Get(&m); err != nil {
1292 return fmt.Errorf("get message with id %d to store preview: %w", id, err)
1293 } else if !m.Expunged {
1294 m.Preview = &preview
1295 if err := tx.Update(&m); err != nil {
1296 return fmt.Errorf("updating message with id %d: %v", m.ID, err)
1297 }
1298 }
1299 }
1300 return nil
1301 })
1302 log.Check(err, "saving new previews with messages")
1303}
1304
1305// viewRequestTx executes a request (query with filters, pagination) by
1306// launching a new goroutine with queryMessages, receiving results as msgResp,
1307// and sending Event* to the SSE connection.
1308//
1309// It always closes tx.
1310func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1311 // Newly generated previews which we'll save when the operation is done.
1312 newPreviews := map[int64]string{}
1313
1314 defer func() {
1315 err := tx.Rollback()
1316 log.Check(err, "rolling back query transaction")
1317
1318 donec <- v.Request.ID
1319
1320 // ctx can be canceled, we still want to store the previews.
1321 storeNewPreviews(context.Background(), log, acc, newPreviews)
1322
1323 x := recover() // Should not happen, but don't take program down if it does.
1324 if x != nil {
1325 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1326 debug.PrintStack()
1327 metrics.PanicInc(metrics.Webmailrequest)
1328 }
1329 }()
1330
1331 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1332 var parsedMessage *ParsedMessage
1333 var viewEnd bool
1334
1335 var immediate bool // No waiting, flush immediate.
1336 t := time.NewTimer(300 * time.Millisecond)
1337 defer t.Stop()
1338
1339 sendViewMsgs := func(force bool) {
1340 if len(msgitems) == 0 && !force {
1341 return
1342 }
1343
1344 immediate = false
1345 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1346 msgitems = nil
1347 parsedMessage = nil
1348 t.Reset(300 * time.Millisecond)
1349 }
1350
1351 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1352
1353 mrc := make(chan msgResp, 1)
1354 go queryMessages(ctx, log, acc, tx, v, mrc, newPreviews)
1355
1356 for {
1357 select {
1358 case mr, ok := <-mrc:
1359 if !ok {
1360 sendViewMsgs(false)
1361 // Empty message list signals this query is done.
1362 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1363 return
1364 }
1365 if mr.err != nil {
1366 sendViewMsgs(false)
1367 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1368 return
1369 }
1370 if mr.reset {
1371 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1372 continue
1373 }
1374 if mr.viewEnd {
1375 viewEnd = true
1376 sendViewMsgs(true)
1377 return
1378 }
1379
1380 msgitems = append(msgitems, mr.mil)
1381 if mr.pm != nil {
1382 parsedMessage = mr.pm
1383 }
1384 if immediate {
1385 sendViewMsgs(true)
1386 }
1387
1388 case <-t.C:
1389 if len(msgitems) == 0 {
1390 // Nothing to send yet. We'll send immediately when the next message comes in.
1391 immediate = true
1392 } else {
1393 sendViewMsgs(false)
1394 }
1395 }
1396 }
1397}
1398
1399// queryMessages executes a query, with filter, pagination, destination message id
1400// to fetch (the message that the client had in view and wants to display again).
1401// It sends on msgc, with several types of messages: errors, whether the view is
1402// reset due to missing AnchorMessageID, and when the end of the view was reached
1403// and/or for a message.
1404// newPreviews is filled with previews, the caller must save them.
1405func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp, newPreviews map[int64]string) {
1406 defer func() {
1407 x := recover() // Should not happen, but don't take program down if it does.
1408 if x != nil {
1409 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1410 debug.PrintStack()
1411 mrc <- msgResp{err: fmt.Errorf("query failed")}
1412 metrics.PanicInc(metrics.Webmailquery)
1413 }
1414
1415 close(mrc)
1416 }()
1417
1418 query := v.Request.Query
1419 page := v.Request.Page
1420
1421 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1422
1423 checkMessage := func(id int64) (valid bool, rerr error) {
1424 m := store.Message{ID: id}
1425 err := tx.Get(&m)
1426 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1427 return false, nil
1428 } else if err != nil {
1429 return false, err
1430 } else {
1431 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1432 return m, nil
1433 })
1434 }
1435 }
1436
1437 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1438 if page.AnchorMessageID > 0 {
1439 // Check if message exists and (still) matches the filter.
1440 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1441 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1442 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1443 return
1444 } else if !valid {
1445 mrc <- msgResp{reset: true}
1446 page.AnchorMessageID = 0
1447 }
1448 }
1449
1450 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1451 // it instead of continuing to send message till the end of the view.
1452 if page.DestMessageID > 0 {
1453 if valid, err := checkMessage(page.DestMessageID); err != nil {
1454 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1455 return
1456 } else if !valid {
1457 page.DestMessageID = 0
1458 }
1459 }
1460
1461 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1462
1463 q := bstore.QueryTx[store.Message](tx)
1464 q.FilterEqual("Expunged", false)
1465 if len(v.mailboxIDs) > 0 {
1466 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1467 // Should result in fast indexed query.
1468 for mbID := range v.mailboxIDs {
1469 q.FilterNonzero(store.Message{MailboxID: mbID})
1470 }
1471 } else {
1472 idsAny := make([]any, 0, len(v.mailboxIDs))
1473 for mbID := range v.mailboxIDs {
1474 idsAny = append(idsAny, mbID)
1475 }
1476 if v.matchMailboxIDs {
1477 q.FilterEqual("MailboxID", idsAny...)
1478 } else {
1479 q.FilterNotEqual("MailboxID", idsAny...)
1480 }
1481 }
1482 }
1483
1484 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1485 if page.AnchorMessageID > 0 {
1486 var seen = false
1487 q.FilterFn(func(m store.Message) bool {
1488 if seen {
1489 return true
1490 }
1491 seen = m.ID == page.AnchorMessageID
1492 return false
1493 })
1494 }
1495
1496 // We may be added filters the the query below. The FilterFn signature does not
1497 // implement reporting errors, or anything else, just a bool. So when making the
1498 // filter functions, we give them a place to store parsed message state, and an
1499 // error. We check the error during and after query execution.
1500 state := msgState{acc: acc, log: log, newPreviews: newPreviews}
1501 defer state.clear()
1502
1503 flagfilter := query.flagFilterFn()
1504 if flagfilter != nil {
1505 q.FilterFn(func(m store.Message) bool {
1506 return flagfilter(m.Flags, m.Keywords)
1507 })
1508 }
1509
1510 if query.Filter.Oldest != nil {
1511 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1512 }
1513 if query.Filter.Newest != nil {
1514 q.FilterLessEqual("Received", *query.Filter.Newest)
1515 }
1516
1517 if query.Filter.SizeMin > 0 {
1518 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1519 }
1520 if query.Filter.SizeMax > 0 {
1521 q.FilterLessEqual("Size", query.Filter.SizeMax)
1522 }
1523
1524 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1525 if attachmentFilter != nil {
1526 q.FilterFn(attachmentFilter)
1527 }
1528
1529 envFilter := query.envFilterFn(log, &state)
1530 if envFilter != nil {
1531 q.FilterFn(envFilter)
1532 }
1533
1534 headerFilter := query.headerFilterFn(log, &state)
1535 if headerFilter != nil {
1536 q.FilterFn(headerFilter)
1537 }
1538
1539 wordsFilter := query.wordsFilterFn(log, &state)
1540 if wordsFilter != nil {
1541 q.FilterFn(wordsFilter)
1542 }
1543
1544 var moreHeaders []string // From store.Settings.ShowHeaders
1545
1546 if query.OrderAsc {
1547 q.SortAsc("Received")
1548 } else {
1549 q.SortDesc("Received")
1550 }
1551 found := page.DestMessageID <= 0
1552 end := true
1553 have := 0
1554 err := q.ForEach(func(m store.Message) error {
1555 // Check for an error in one of the filters, propagate it.
1556 if state.err != nil {
1557 return state.err
1558 }
1559
1560 if have >= page.Count && found || have > 10000 {
1561 end = false
1562 return bstore.StopForEach
1563 }
1564
1565 if _, ok := v.threadIDs[m.ThreadID]; ok {
1566 // Message was already returned as part of a thread.
1567 return nil
1568 }
1569
1570 var pm *ParsedMessage
1571 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1572 // For threads, if there was no DestMessageID, we may be getting the newest
1573 // message. For an initial view, this isn't necessarily the first the user is
1574 // expected to read first, that would be the first unread, which we'll get below
1575 // when gathering the thread.
1576 found = true
1577 xpm, err := parsedMessage(log, &m, &state, true, false, false)
1578 if err != nil && errors.Is(err, message.ErrHeader) {
1579 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1580 } else if err != nil {
1581 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1582 } else {
1583 pm = &xpm
1584 }
1585 }
1586
1587 var err error
1588 moreHeaders, err = ensureMoreHeaders(tx, moreHeaders)
1589 if err != nil {
1590 return fmt.Errorf("ensuring more headers: %v", err)
1591 }
1592
1593 mi, err := messageItem(log, m, &state, moreHeaders)
1594 if err != nil {
1595 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1596 }
1597 mil := []MessageItem{mi}
1598 if query.Threading != ThreadOff {
1599 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0, moreHeaders, state.newPreviews)
1600 if err != nil {
1601 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1602 }
1603 if xpm != nil {
1604 pm = xpm
1605 found = true
1606 }
1607 mil = append(mil, more...)
1608 v.threadIDs[m.ThreadID] = struct{}{}
1609
1610 // Calculate how many messages the frontend is going to show, and only count those as returned.
1611 collapsed := map[int64]bool{}
1612 for _, mi := range mil {
1613 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1614 }
1615 unread := map[int64]bool{} // Propagated to thread root.
1616 if query.Threading == ThreadUnread {
1617 for _, mi := range mil {
1618 mm := mi.Message
1619 if mm.Seen {
1620 continue
1621 }
1622 unread[mm.ID] = true
1623 for _, id := range mm.ThreadParentIDs {
1624 unread[id] = true
1625 }
1626 }
1627 }
1628 for _, mi := range mil {
1629 mm := mi.Message
1630 threadRoot := true
1631 rootID := mm.ID
1632 for _, id := range mm.ThreadParentIDs {
1633 if _, ok := collapsed[id]; ok {
1634 threadRoot = false
1635 rootID = id
1636 }
1637 }
1638 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1639 have++
1640 }
1641 }
1642 } else {
1643 have++
1644 }
1645 if pm != nil && len(pm.envelope.From) == 1 {
1646 pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
1647 if err != nil {
1648 return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
1649 }
1650 }
1651 mrc <- msgResp{mil: mil, pm: pm}
1652 return nil
1653 })
1654 // Check for an error in one of the filters again. Check in ForEach would not
1655 // trigger if the last message has the error.
1656 if err == nil && state.err != nil {
1657 err = state.err
1658 }
1659 if err != nil {
1660 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1661 return
1662 }
1663 if end {
1664 mrc <- msgResp{viewEnd: true}
1665 }
1666}
1667
1668func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool, moreHeaders []string, newPreviews map[int64]string) ([]MessageItem, *ParsedMessage, error) {
1669 if m.ThreadID == 0 {
1670 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1671 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1672 }
1673
1674 // Fetch other messages for this thread.
1675 qt := bstore.QueryTx[store.Message](tx)
1676 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1677 qt.FilterEqual("Expunged", false)
1678 qt.FilterNotEqual("ID", m.ID)
1679 qt.SortAsc("ID")
1680 tml, err := qt.List()
1681 if err != nil {
1682 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1683 }
1684
1685 var mil []MessageItem
1686 var pm *ParsedMessage
1687 var firstUnread bool
1688 for _, tm := range tml {
1689 err := func() error {
1690 xstate := msgState{acc: acc, log: log, newPreviews: newPreviews}
1691 defer xstate.clear()
1692
1693 mi, err := messageItem(log, tm, &xstate, moreHeaders)
1694 if err != nil {
1695 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1696 }
1697 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1698 return tm, nil
1699 })
1700 if err != nil {
1701 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1702 }
1703 mil = append(mil, mi)
1704
1705 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1706 firstUnread = !tm.Seen
1707 xpm, err := parsedMessage(log, &tm, &xstate, true, false, false)
1708 if err != nil && errors.Is(err, message.ErrHeader) {
1709 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1710 } else if err != nil {
1711 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1712 } else {
1713 pm = &xpm
1714 }
1715 }
1716 return nil
1717 }()
1718 if err != nil {
1719 return nil, nil, err
1720 }
1721 }
1722
1723 // Finally, the message that caused us to gather this thread (which is likely the
1724 // most recent message in the thread) could be the only unread message.
1725 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1726 xstate := msgState{acc: acc, log: log}
1727 defer xstate.clear()
1728 xpm, err := parsedMessage(log, &m, &xstate, true, false, false)
1729 if err != nil && errors.Is(err, message.ErrHeader) {
1730 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1731 } else if err != nil {
1732 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1733 } else {
1734 pm = &xpm
1735 }
1736 }
1737
1738 return mil, pm, nil
1739}
1740
1741// While checking the filters on a message, we may need to get more message
1742// details as each filter passes. We check the filters that need the basic
1743// information first, and load and cache more details for the next filters.
1744// msgState holds parsed details for a message, it is updated while filtering,
1745// with more information or reset for a next message.
1746type msgState struct {
1747 acc *store.Account // Never changes during lifetime.
1748 err error // Once set, doesn't get cleared.
1749 m store.Message
1750 part *message.Part // Will be without Reader when msgr is nil.
1751 msgr *store.MsgReader
1752 log mlog.Log
1753
1754 // If not nil, messages will get their Preview field filled when nil, and message
1755 // id and preview added to newPreviews, and saved in a separate write transaction
1756 // when the operation is done.
1757 newPreviews map[int64]string
1758}
1759
1760func (ms *msgState) clear() {
1761 if ms.msgr != nil {
1762 err := ms.msgr.Close()
1763 ms.log.Check(err, "closing message reader from state")
1764 ms.msgr = nil
1765 }
1766 *ms = msgState{acc: ms.acc, err: ms.err, log: ms.log, newPreviews: ms.newPreviews}
1767}
1768
1769func (ms *msgState) ensureMsg(m store.Message) {
1770 if m.ID != ms.m.ID {
1771 ms.clear()
1772 }
1773 ms.m = m
1774}
1775
1776func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1777 ms.ensureMsg(m)
1778
1779 if ms.err == nil {
1780 if ms.part == nil {
1781 if m.ParsedBuf == nil {
1782 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1783 return false
1784 }
1785 var p message.Part
1786 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1787 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1788 return false
1789 }
1790 ms.part = &p
1791 }
1792 if withMsgReader && ms.msgr == nil {
1793 ms.msgr = ms.acc.MessageReader(m)
1794 ms.part.SetReaderAt(ms.msgr)
1795 }
1796 }
1797 return ms.part != nil
1798}
1799
1800// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1801// filters for a query. A nil function is returned if there are no flags to filter
1802// on.
1803func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1804 labels := map[string]bool{}
1805 for _, k := range q.Filter.Labels {
1806 labels[k] = true
1807 }
1808 for _, k := range q.NotFilter.Labels {
1809 labels[k] = false
1810 }
1811
1812 if len(labels) == 0 {
1813 return nil
1814 }
1815
1816 var mask, flags store.Flags
1817 systemflags := map[string][]*bool{
1818 `\answered`: {&mask.Answered, &flags.Answered},
1819 `\flagged`: {&mask.Flagged, &flags.Flagged},
1820 `\deleted`: {&mask.Deleted, &flags.Deleted},
1821 `\seen`: {&mask.Seen, &flags.Seen},
1822 `\draft`: {&mask.Draft, &flags.Draft},
1823 `$junk`: {&mask.Junk, &flags.Junk},
1824 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1825 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1826 `$phishing`: {&mask.Phishing, &flags.Phishing},
1827 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1828 }
1829 keywords := map[string]bool{}
1830 for k, v := range labels {
1831 k = strings.ToLower(k)
1832 if mf, ok := systemflags[k]; ok {
1833 *mf[0] = true
1834 *mf[1] = v
1835 } else {
1836 keywords[k] = v
1837 }
1838 }
1839 return func(msgFlags store.Flags, msgKeywords []string) bool {
1840 var f store.Flags
1841 if f.Set(mask, msgFlags) != flags {
1842 return false
1843 }
1844 for k, v := range keywords {
1845 if slices.Contains(msgKeywords, k) != v {
1846 return false
1847 }
1848 }
1849 return true
1850 }
1851}
1852
1853// attachmentFilterFn returns a function that filters for the attachment-related
1854// filter from the query. A nil function is returned if there are attachment
1855// filters.
1856func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1857 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1858 return nil
1859 }
1860
1861 return func(m store.Message) bool {
1862 if !state.ensurePart(m, true) {
1863 return false
1864 }
1865 types, err := attachmentTypes(log, m, state)
1866 if err != nil {
1867 state.err = err
1868 return false
1869 }
1870 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1871 }
1872}
1873
1874var attachmentMimetypes = map[string]AttachmentType{
1875 "application/pdf": AttachmentPDF,
1876 "application/zip": AttachmentArchive,
1877 "application/x-rar-compressed": AttachmentArchive,
1878 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1879 "application/vnd.ms-excel": AttachmentSpreadsheet,
1880 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1881 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1882 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1883 "application/vnd.ms-powerpoint": AttachmentPresentation,
1884 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1885}
1886var attachmentExtensions = map[string]AttachmentType{
1887 ".pdf": AttachmentPDF,
1888 ".zip": AttachmentArchive,
1889 ".tar": AttachmentArchive,
1890 ".tgz": AttachmentArchive,
1891 ".tar.gz": AttachmentArchive,
1892 ".tbz2": AttachmentArchive,
1893 ".tar.bz2": AttachmentArchive,
1894 ".tar.lz": AttachmentArchive,
1895 ".tlz": AttachmentArchive,
1896 ".tar.xz": AttachmentArchive,
1897 ".txz": AttachmentArchive,
1898 ".tar.zst": AttachmentArchive,
1899 ".tar.lz4": AttachmentArchive,
1900 ".7z": AttachmentArchive,
1901 ".rar": AttachmentArchive,
1902 ".ods": AttachmentSpreadsheet,
1903 ".xls": AttachmentSpreadsheet,
1904 ".xlsx": AttachmentSpreadsheet,
1905 ".odt": AttachmentDocument,
1906 ".doc": AttachmentDocument,
1907 ".docx": AttachmentDocument,
1908 ".odp": AttachmentPresentation,
1909 ".ppt": AttachmentPresentation,
1910 ".pptx": AttachmentPresentation,
1911}
1912
1913func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1914 types := map[AttachmentType]bool{}
1915
1916 pm, err := parsedMessage(log, &m, state, false, false, false)
1917 if err != nil {
1918 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1919 }
1920 for _, a := range pm.attachments {
1921 if a.Part.MediaType == "IMAGE" {
1922 types[AttachmentImage] = true
1923 continue
1924 }
1925 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1926 if t, ok := attachmentMimetypes[mt]; ok {
1927 types[t] = true
1928 continue
1929 }
1930 _, filename, err := a.Part.DispositionFilename()
1931 if err != nil && (errors.Is(err, message.ErrParamEncoding) || errors.Is(err, message.ErrHeader)) {
1932 log.Debugx("parsing disposition/filename", err)
1933 } else if err != nil {
1934 return nil, fmt.Errorf("reading disposition/filename: %v", err)
1935 }
1936 if ext := filepath.Ext(filename); ext != "" {
1937 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1938 types[t] = true
1939 }
1940 }
1941 }
1942
1943 if len(types) == 0 {
1944 types[AttachmentNone] = true
1945 } else {
1946 types[AttachmentAny] = true
1947 }
1948 return types, nil
1949}
1950
1951// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1952// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1953// clash with SMTP envelope) for the query. A nil function is returned if no
1954// filtering is needed.
1955func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1956 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1957 return nil
1958 }
1959
1960 lower := func(l []string) []string {
1961 if len(l) == 0 {
1962 return nil
1963 }
1964 r := make([]string, len(l))
1965 for i, s := range l {
1966 r[i] = strings.ToLower(s)
1967 }
1968 return r
1969 }
1970
1971 filterSubject := lower(q.Filter.Subject)
1972 notFilterSubject := lower(q.NotFilter.Subject)
1973 filterFrom := lower(q.Filter.From)
1974 notFilterFrom := lower(q.NotFilter.From)
1975 filterTo := lower(q.Filter.To)
1976 notFilterTo := lower(q.NotFilter.To)
1977
1978 return func(m store.Message) bool {
1979 if !state.ensurePart(m, false) {
1980 return false
1981 }
1982
1983 var env message.Envelope
1984 if state.part.Envelope != nil {
1985 env = *state.part.Envelope
1986 }
1987
1988 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1989 subject := strings.ToLower(env.Subject)
1990 for _, s := range filterSubject {
1991 if !strings.Contains(subject, s) {
1992 return false
1993 }
1994 }
1995 for _, s := range notFilterSubject {
1996 if strings.Contains(subject, s) {
1997 return false
1998 }
1999 }
2000 }
2001
2002 contains := func(textLower []string, l []message.Address, all bool) bool {
2003 next:
2004 for _, s := range textLower {
2005 for _, a := range l {
2006 name := strings.ToLower(a.Name)
2007 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
2008 if strings.Contains(name, s) || strings.Contains(addr, s) {
2009 if !all {
2010 return true
2011 }
2012 continue next
2013 }
2014 }
2015 if all {
2016 return false
2017 }
2018 }
2019 return all
2020 }
2021
2022 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
2023 return false
2024 }
2025 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
2026 return false
2027 }
2028 if len(filterTo) > 0 || len(notFilterTo) > 0 {
2029 to := slices.Concat(env.To, env.CC, env.BCC)
2030 if len(filterTo) > 0 && !contains(filterTo, to, true) {
2031 return false
2032 }
2033 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
2034 return false
2035 }
2036 }
2037 return true
2038 }
2039}
2040
2041// headerFilterFn returns a function that filters for the header filters in the
2042// query. A nil function is returned if there are no header filters.
2043func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2044 if len(q.Filter.Headers) == 0 {
2045 return nil
2046 }
2047
2048 lowerValues := make([]string, len(q.Filter.Headers))
2049 for i, t := range q.Filter.Headers {
2050 lowerValues[i] = strings.ToLower(t[1])
2051 }
2052
2053 return func(m store.Message) bool {
2054 if !state.ensurePart(m, true) {
2055 return false
2056 }
2057 hdr, err := state.part.Header()
2058 if err != nil {
2059 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
2060 return false
2061 }
2062
2063 next:
2064 for i, t := range q.Filter.Headers {
2065 k := t[0]
2066 v := lowerValues[i]
2067 l := hdr.Values(k)
2068 if v == "" && len(l) > 0 {
2069 continue
2070 }
2071 for _, e := range l {
2072 if strings.Contains(strings.ToLower(e), v) {
2073 continue next
2074 }
2075 }
2076 return false
2077 }
2078 return true
2079 }
2080}
2081
2082// wordFiltersFn returns a function that applies the word filters of the query. A
2083// nil function is returned when query does not contain a word filter.
2084func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2085 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
2086 return nil
2087 }
2088
2089 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
2090
2091 return func(m store.Message) bool {
2092 if !state.ensurePart(m, true) {
2093 return false
2094 }
2095
2096 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
2097 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)
2098 return false
2099 } else {
2100 return ok
2101 }
2102 }
2103}
2104