1package webmail
2
3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
4
5import (
6 "compress/gzip"
7 "context"
8 cryptrand "crypto/rand"
9 "encoding/base64"
10 "encoding/json"
11 "errors"
12 "fmt"
13 "log/slog"
14 "net/http"
15 "path/filepath"
16 "reflect"
17 "runtime/debug"
18 "slices"
19 "strconv"
20 "strings"
21 "sync"
22 "time"
23
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
26
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxio"
33 "github.com/mjl-/mox/moxvar"
34 "github.com/mjl-/mox/smtp"
35 "github.com/mjl-/mox/store"
36)
37
38// Request is a request to an SSE connection to send messages, either for a new
39// view, to continue with an existing view, or to a cancel an ongoing request.
40type Request struct {
41 ID int64
42
43 SSEID int64 // SSE connection.
44
45 // To indicate a request is a continuation (more results) of the previous view.
46 // Echoed in events, client checks if it is getting results for the latest request.
47 ViewID int64
48
49 // If set, this request and its view are canceled. A new view must be started.
50 Cancel bool
51
52 Query Query
53 Page Page
54}
55
56type ThreadMode string
57
58const (
59 ThreadOff ThreadMode = "off"
60 ThreadOn ThreadMode = "on"
61 ThreadUnread ThreadMode = "unread"
62)
63
64// Query is a request for messages that match filters, in a given order.
65type Query struct {
66 OrderAsc bool // Order by received ascending or desending.
67 Threading ThreadMode
68 Filter Filter
69 NotFilter NotFilter
70}
71
72// AttachmentType is for filtering by attachment type.
73type AttachmentType string
74
75const (
76 AttachmentIndifferent AttachmentType = ""
77 AttachmentNone AttachmentType = "none"
78 AttachmentAny AttachmentType = "any"
79 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
80 AttachmentPDF AttachmentType = "pdf"
81 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
82 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
83 AttachmentDocument AttachmentType = "document" // odt, docx, ...
84 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
85)
86
87// Filter selects the messages to return. Fields that are set must all match,
88// for slices each element by match ("and").
89type Filter struct {
90 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
91 MailboxID int64
92
93 // If true, also submailboxes are included in the search.
94 MailboxChildrenIncluded bool
95
96 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
97 // connection setup, where it is turned into a MailboxID. Filtering only looks at
98 // MailboxID.
99 MailboxName string
100
101 Words []string // Case insensitive substring match for each string.
102 From []string
103 To []string // Including Cc and Bcc.
104 Oldest *time.Time
105 Newest *time.Time
106 Subject []string
107 Attachments AttachmentType
108 Labels []string
109 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
110 SizeMin int64
111 SizeMax int64
112}
113
114// NotFilter matches messages that don't match these fields.
115type NotFilter struct {
116 Words []string
117 From []string
118 To []string
119 Subject []string
120 Attachments AttachmentType
121 Labels []string
122}
123
124// Page holds pagination parameters for a request.
125type Page struct {
126 // Start returning messages after this ID, if > 0. For pagination, fetching the
127 // next set of messages.
128 AnchorMessageID int64
129
130 // Number of messages to return, must be >= 1, we never return more than 10000 for
131 // one request.
132 Count int
133
134 // If > 0, return messages until DestMessageID is found. More than Count messages
135 // can be returned. For long-running searches, it may take a while before this
136 // message if found.
137 DestMessageID int64
138}
139
140// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
141
142// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
143// included.
144type MessageAddress struct {
145 Name string // Free-form name for display in mail applications.
146 User string // Localpart, encoded.
147 Domain dns.Domain
148}
149
150// MessageEnvelope is like message.Envelope, as used in message.Part, but including
151// unicode host names for IDNA names.
152type MessageEnvelope struct {
153 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
154 Date time.Time
155 Subject string
156 From []MessageAddress
157 Sender []MessageAddress
158 ReplyTo []MessageAddress
159 To []MessageAddress
160 CC []MessageAddress
161 BCC []MessageAddress
162 InReplyTo string
163 MessageID string
164}
165
166// MessageItem is sent by queries, it has derived information analyzed from
167// message.Part, made for the needs of the message items in the message list.
168// messages.
169type MessageItem struct {
170 Message store.Message // Without ParsedBuf and MsgPrefix, for size.
171 Envelope MessageEnvelope
172 Attachments []Attachment
173 IsSigned bool
174 IsEncrypted bool
175 FirstLine string // Of message body, for showing as preview.
176 MatchQuery bool // If message does not match query, it can still be included because of threading.
177}
178
179// ParsedMessage has more parsed/derived information about a message, intended
180// for rendering the (contents of the) message. Information from MessageItem is
181// not duplicated.
182type ParsedMessage struct {
183 ID int64
184 Part message.Part
185 Headers map[string][]string
186
187 // Text parts, can be empty.
188 Texts []string
189
190 // Whether there is an HTML part. The webclient renders HTML message parts through
191 // an iframe and a separate request with strict CSP headers to prevent script
192 // execution and loading of external resources, which isn't possible when loading
193 // in iframe with inline HTML because not all browsers support the iframe csp
194 // attribute.
195 HasHTML bool
196
197 ListReplyAddress *MessageAddress // From List-Post.
198
199 // Information used by MessageItem, not exported in this type.
200 envelope MessageEnvelope
201 attachments []Attachment
202 isSigned bool
203 isEncrypted bool
204 firstLine string
205}
206
207// EventStart is the first message sent on an SSE connection, giving the client
208// basic data to populate its UI. After this event, messages will follow quickly in
209// an EventViewMsgs event.
210type EventStart struct {
211 SSEID int64
212 LoginAddress MessageAddress
213 Addresses []MessageAddress
214 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
215 MailboxName string
216 Mailboxes []store.Mailbox
217 RejectsMailbox string
218 Version string
219}
220
221// DomainAddressConfig has the address (localpart) configuration for a domain, so
222// the webmail client can decide if an address matches the addresses of the
223// account.
224type DomainAddressConfig struct {
225 LocalpartCatchallSeparator string // Can be empty.
226 LocalpartCaseSensitive bool
227}
228
229// EventViewMsgs contains messages for a view, possibly a continuation of an
230// earlier list of messages.
231type EventViewMsgs struct {
232 ViewID int64
233 RequestID int64
234
235 // If empty, this was the last message for the request. If non-empty, a list of
236 // thread messages. Each with the first message being the reason this thread is
237 // included and can be used as AnchorID in followup requests. If the threading mode
238 // is "off" in the query, there will always be only a single message. If a thread
239 // is sent, all messages in the thread are sent, including those that don't match
240 // the query (e.g. from another mailbox). Threads can be displayed based on the
241 // ThreadParentIDs field, with possibly slightly different display based on field
242 // ThreadMissingLink.
243 MessageItems [][]MessageItem
244
245 // If set, will match the target page.DestMessageID from the request.
246 ParsedMessage *ParsedMessage
247
248 // If set, there are no more messages in this view at this moment. Messages can be
249 // added, typically via Change messages, e.g. for new deliveries.
250 ViewEnd bool
251}
252
253// EventViewErr indicates an error during a query for messages. The request is
254// aborted, no more request-related messages will be sent until the next request.
255type EventViewErr struct {
256 ViewID int64
257 RequestID int64
258 Err string // To be displayed in client.
259 err error // Original message, for checking against context.Canceled.
260}
261
262// EventViewReset indicates that a request for the next set of messages in a few
263// could not be fulfilled, e.g. because the anchor message does not exist anymore.
264// The client should clear its list of messages. This can happen before
265// EventViewMsgs events are sent.
266type EventViewReset struct {
267 ViewID int64
268 RequestID int64
269}
270
271// EventViewChanges contain one or more changes relevant for the client, either
272// with new mailbox total/unseen message counts, or messages added/removed/modified
273// (flags) for the current view.
274type EventViewChanges struct {
275 ViewID int64
276 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
277}
278
279// ChangeMsgAdd adds a new message and possibly its thread to the view.
280type ChangeMsgAdd struct {
281 store.ChangeAddUID
282 MessageItems []MessageItem
283}
284
285// ChangeMsgRemove removes one or more messages from the view.
286type ChangeMsgRemove struct {
287 store.ChangeRemoveUIDs
288}
289
290// ChangeMsgFlags updates flags for one message.
291type ChangeMsgFlags struct {
292 store.ChangeFlags
293}
294
295// ChangeMsgThread updates muted/collapsed fields for one message.
296type ChangeMsgThread struct {
297 store.ChangeThread
298}
299
300// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
301type ChangeMailboxRemove struct {
302 store.ChangeRemoveMailbox
303}
304
305// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
306type ChangeMailboxAdd struct {
307 Mailbox store.Mailbox
308}
309
310// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
311// It could be under a new parent.
312type ChangeMailboxRename struct {
313 store.ChangeRenameMailbox
314}
315
316// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
317type ChangeMailboxCounts struct {
318 store.ChangeMailboxCounts
319}
320
321// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
322type ChangeMailboxSpecialUse struct {
323 store.ChangeMailboxSpecialUse
324}
325
326// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
327// a message was added with a keyword that wasn't in the mailbox yet.
328type ChangeMailboxKeywords struct {
329 store.ChangeMailboxKeywords
330}
331
332// View holds the information about the returned data for a query. It is used to
333// determine whether mailbox changes should be sent to the client, we only send
334// addition/removal/flag-changes of messages that are in view, or would extend it
335// if the view is at the end of the results.
336type view struct {
337 Request Request
338
339 // Received of last message we sent to the client. We use it to decide if a newly
340 // delivered message is within the view and the client should get a notification.
341 LastMessageReceived time.Time
342
343 // If set, the last message in the query view has been sent. There is no need to do
344 // another query, it will not return more data. Used to decide if an event for a
345 // new message should be sent.
346 End bool
347
348 // Whether message must or must not match mailboxIDs.
349 matchMailboxIDs bool
350 // Mailboxes to match, can be multiple, for matching children. If empty, there is
351 // no filter on mailboxes.
352 mailboxIDs map[int64]bool
353
354 // Threads sent to client. New messages for this thread are also sent, regardless
355 // of regular query matching, so also for other mailboxes. If the user (re)moved
356 // all messages of a thread, they may still receive events for the thread. Only
357 // filled when query with threading not off.
358 threadIDs map[int64]struct{}
359}
360
361// sses tracks all sse connections, and access to them.
362var sses = struct {
363 sync.Mutex
364 gen int64
365 m map[int64]sse
366}{m: map[int64]sse{}}
367
368// sse represents an sse connection.
369type sse struct {
370 ID int64 // Also returned in EventStart and used in Request to identify the request.
371 AccountName string // Used to check the authenticated user has access to the SSE connection.
372 Request chan Request // Goroutine will receive requests from here, coming from API calls.
373}
374
375// called by the goroutine when the connection is closed or breaks.
376func (sse sse) unregister() {
377 sses.Lock()
378 defer sses.Unlock()
379 delete(sses.m, sse.ID)
380
381 // Drain any pending requests, preventing blocked goroutines from API calls.
382 for {
383 select {
384 case <-sse.Request:
385 default:
386 return
387 }
388 }
389}
390
391func sseRegister(accountName string) sse {
392 sses.Lock()
393 defer sses.Unlock()
394 sses.gen++
395 v := sse{sses.gen, accountName, make(chan Request, 1)}
396 sses.m[v.ID] = v
397 return v
398}
399
400// sseGet returns a reference to an existing connection if it exists and user
401// has access.
402func sseGet(id int64, accountName string) (sse, bool) {
403 sses.Lock()
404 defer sses.Unlock()
405 s := sses.m[id]
406 if s.AccountName != accountName {
407 return sse{}, false
408 }
409 return s, true
410}
411
412// ssetoken is a temporary token that has not yet been used to start an SSE
413// connection. Created by Token, consumed by a new SSE connection.
414type ssetoken struct {
415 token string // Uniquely generated.
416 accName string
417 address string // Address used to authenticate in call that created the token.
418 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
419 validUntil time.Time
420}
421
422// ssetokens maintains unused tokens. We have just one, but it's a type so we
423// can define methods.
424type ssetokens struct {
425 sync.Mutex
426 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
427 tokens map[string]ssetoken // Token to details, for finding account for a token.
428}
429
430var sseTokens = ssetokens{
431 accountTokens: map[string][]ssetoken{},
432 tokens: map[string]ssetoken{},
433}
434
435// xgenerate creates and saves a new token. It ensures no more than 10 tokens
436// per account exist, removing old ones if needed.
437func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
438 buf := make([]byte, 16)
439 _, err := cryptrand.Read(buf)
440 xcheckf(ctx, err, "generating token")
441 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
442
443 x.Lock()
444 defer x.Unlock()
445 n := len(x.accountTokens[accName])
446 if n >= 10 {
447 for _, ost := range x.accountTokens[accName][:n-9] {
448 delete(x.tokens, ost.token)
449 }
450 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
451 x.accountTokens[accName] = x.accountTokens[accName][:9]
452 }
453 x.accountTokens[accName] = append(x.accountTokens[accName], st)
454 x.tokens[st.token] = st
455 return st.token
456}
457
458// check verifies a token, and consumes it if valid.
459func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
460 x.Lock()
461 defer x.Unlock()
462
463 st, ok := x.tokens[token]
464 if !ok {
465 return "", "", "", false, nil
466 }
467 delete(x.tokens, token)
468 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
469 return "", "", "", false, errors.New("internal error, could not find token in account")
470 } else {
471 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
472 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
473 if len(x.accountTokens[st.accName]) == 0 {
474 delete(x.accountTokens, st.accName)
475 }
476 }
477 if time.Now().After(st.validUntil) {
478 return "", "", "", false, nil
479 }
480 return st.accName, st.address, st.sessionToken, true, nil
481}
482
483// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
484type ioErr struct {
485 err error
486}
487
488// serveEvents serves an SSE connection. Authentication is done through a query
489// string parameter "token", a one-time-use token returned by the Token API call.
490func serveEvents(ctx context.Context, log mlog.Log, w http.ResponseWriter, r *http.Request) {
491 if r.Method != "GET" {
492 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
493 return
494 }
495
496 flusher, ok := w.(http.Flusher)
497 if !ok {
498 log.Error("internal error: ResponseWriter not a http.Flusher")
499 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
500 return
501 }
502
503 q := r.URL.Query()
504 token := q.Get("token")
505 if token == "" {
506 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
507 return
508 }
509 accName, address, sessionToken, ok, err := sseTokens.check(token)
510 if err != nil {
511 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
512 return
513 }
514 if !ok {
515 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
516 return
517 }
518 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
519 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
520 return
521 }
522
523 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
524 // incoming responses with its slow-network similation.
525 var waitMin, waitMax time.Duration
526 waitMinMsec := q.Get("waitMinMsec")
527 waitMaxMsec := q.Get("waitMaxMsec")
528 if waitMinMsec != "" && waitMaxMsec != "" {
529 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
530 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
531 return
532 } else {
533 waitMin = time.Duration(v) * time.Millisecond
534 }
535
536 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
537 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
538 return
539 } else {
540 waitMax = time.Duration(v) * time.Millisecond
541 }
542 }
543
544 // Parse the request with initial mailbox/search criteria.
545 var req Request
546 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
547 dec.DisallowUnknownFields()
548 if err := dec.Decode(&req); err != nil {
549 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
550 return
551 } else if req.Page.Count <= 0 {
552 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
553 return
554 }
555 if req.Query.Threading == "" {
556 req.Query.Threading = ThreadOff
557 }
558
559 var writer *eventWriter
560
561 metricSSEConnections.Inc()
562 defer metricSSEConnections.Dec()
563
564 // Below here, error handling cause through xcheckf, which panics with
565 // *sherpa.Error, after which we send an error event to the client. We can also get
566 // an *ioErr when the connection is broken.
567 defer func() {
568 x := recover()
569 if x == nil {
570 return
571 }
572 if err, ok := x.(*sherpa.Error); ok {
573 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
574 } else if _, ok := x.(ioErr); ok {
575 return
576 } else {
577 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
578 debug.PrintStack()
579 metrics.PanicInc(metrics.Webmail)
580 panic(x)
581 }
582 }()
583
584 h := w.Header()
585 h.Set("Content-Type", "text/event-stream")
586 h.Set("Cache-Control", "no-cache")
587
588 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
589 // keys), so should be quite compressible.
590 var out writeFlusher
591 gz := mox.AcceptsGzip(r)
592 if gz {
593 h.Set("Content-Encoding", "gzip")
594 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
595 } else {
596 out = nopFlusher{w}
597 }
598 out = httpFlusher{out, flusher}
599
600 // We'll be writing outgoing SSE events through writer.
601 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
602 defer writer.close()
603
604 // Fetch initial data.
605 acc, err := store.OpenAccount(log, accName)
606 xcheckf(ctx, err, "open account")
607 defer func() {
608 err := acc.Close()
609 log.Check(err, "closing account")
610 }()
611 comm := store.RegisterComm(acc)
612 defer comm.Unregister()
613
614 // List addresses that the client can use to send email from.
615 accConf, _ := acc.Conf()
616 loginAddr, err := smtp.ParseAddress(address)
617 xcheckf(ctx, err, "parsing login address")
618 _, _, dest, err := mox.FindAccount(loginAddr.Localpart, loginAddr.Domain, false)
619 xcheckf(ctx, err, "looking up destination for login address")
620 loginName := accConf.FullName
621 if dest.FullName != "" {
622 loginName = dest.FullName
623 }
624 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
625 var addresses []MessageAddress
626 for a, dest := range accConf.Destinations {
627 name := dest.FullName
628 if name == "" {
629 name = accConf.FullName
630 }
631 var ma MessageAddress
632 if strings.HasPrefix(a, "@") {
633 dom, err := dns.ParseDomain(a[1:])
634 xcheckf(ctx, err, "parsing destination address for account")
635 ma = MessageAddress{Domain: dom}
636 } else {
637 addr, err := smtp.ParseAddress(a)
638 xcheckf(ctx, err, "parsing destination address for account")
639 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
640 }
641 addresses = append(addresses, ma)
642 }
643
644 // We implicitly start a query. We use the reqctx for the transaction, because the
645 // transaction is passed to the query, which can be canceled.
646 reqctx, reqctxcancel := context.WithCancel(ctx)
647 defer func() {
648 // We also cancel in cancelDrain later on, but there is a brief window where the
649 // context wouldn't be canceled.
650 if reqctxcancel != nil {
651 reqctxcancel()
652 reqctxcancel = nil
653 }
654 }()
655
656 // qtx is kept around during connection initialization, until we pass it off to the
657 // goroutine that starts querying for messages.
658 var qtx *bstore.Tx
659 defer func() {
660 if qtx != nil {
661 err := qtx.Rollback()
662 log.Check(err, "rolling back")
663 }
664 }()
665
666 var mbl []store.Mailbox
667
668 // We only take the rlock when getting the tx.
669 acc.WithRLock(func() {
670 // Now a read-only transaction we'll use during the query.
671 qtx, err = acc.DB.Begin(reqctx, false)
672 xcheckf(ctx, err, "begin transaction")
673
674 mbl, err = bstore.QueryTx[store.Mailbox](qtx).List()
675 xcheckf(ctx, err, "list mailboxes")
676 })
677
678 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
679 var zerofilter Filter
680 var zeronotfilter NotFilter
681 var mailbox store.Mailbox
682 var mailboxPrefixes []string
683 var matchMailboxes bool
684 mailboxIDs := map[int64]bool{}
685 mailboxName := req.Query.Filter.MailboxName
686 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
687 if mailboxName == "" {
688 mailboxName = "Inbox"
689 }
690
691 var inbox store.Mailbox
692 for _, e := range mbl {
693 if e.Name == mailboxName {
694 mailbox = e
695 }
696 if e.Name == "Inbox" {
697 inbox = e
698 }
699 }
700 if mailbox.ID == 0 {
701 mailbox = inbox
702 }
703 if mailbox.ID == 0 {
704 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
705 }
706 req.Query.Filter.MailboxID = mailbox.ID
707 req.Query.Filter.MailboxName = ""
708 mailboxPrefixes = []string{mailbox.Name + "/"}
709 matchMailboxes = true
710 mailboxIDs[mailbox.ID] = true
711 } else {
712 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
713 }
714 if req.Query.Filter.MailboxChildrenIncluded {
715 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
716 }
717
718 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
719
720 sse := sseRegister(acc.Name)
721 defer sse.unregister()
722
723 // Per-domain localpart config so webclient can decide if an address belongs to the account.
724 domainAddressConfigs := map[string]DomainAddressConfig{}
725 for _, a := range addresses {
726 dom, _ := mox.Conf.Domain(a.Domain)
727 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparator, dom.LocalpartCaseSensitive}
728 }
729
730 // Write first event, allowing client to fill its UI with mailboxes.
731 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, moxvar.Version}
732 writer.xsendEvent(ctx, log, "start", start)
733
734 // The goroutine doing the querying will send messages on these channels, which
735 // result in an event being written on the SSE connection.
736 viewMsgsc := make(chan EventViewMsgs)
737 viewErrc := make(chan EventViewErr)
738 viewResetc := make(chan EventViewReset)
739 donec := make(chan int64) // When request is done.
740
741 // Start a view, it determines if we send a change to the client. And start an
742 // implicit query for messages, we'll send the messages to the client which can
743 // fill its ui with messages.
744 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
745 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
746 qtx = nil // viewRequestTx closes qtx
747
748 // When canceling a query, we must drain its messages until it says it is done.
749 // Otherwise the sending goroutine would hang indefinitely on a channel send.
750 cancelDrain := func() {
751 if reqctxcancel != nil {
752 // Cancel the goroutine doing the querying.
753 reqctxcancel()
754 reqctx = nil
755 reqctxcancel = nil
756 } else {
757 return
758 }
759
760 // Drain events until done.
761 for {
762 select {
763 case <-viewMsgsc:
764 case <-viewErrc:
765 case <-viewResetc:
766 case <-donec:
767 return
768 }
769 }
770 }
771
772 // If we stop and a query is in progress, we must drain the channel it will send on.
773 defer cancelDrain()
774
775 // Changes broadcasted by other connections on this account. If applicable for the
776 // connection/view, we send events.
777 xprocessChanges := func(changes []store.Change) {
778 taggedChanges := [][2]any{}
779
780 // We get a transaction first time we need it.
781 var xtx *bstore.Tx
782 defer func() {
783 if xtx != nil {
784 err := xtx.Rollback()
785 log.Check(err, "rolling back transaction")
786 }
787 }()
788 ensureTx := func() error {
789 if xtx != nil {
790 return nil
791 }
792 acc.RLock()
793 defer acc.RUnlock()
794 var err error
795 xtx, err = acc.DB.Begin(ctx, false)
796 return err
797 }
798 // This getmsg will now only be called mailboxID+UID, not with messageID set.
799 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
800 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
801 if err := ensureTx(); err != nil {
802 return store.Message{}, fmt.Errorf("transaction: %v", err)
803 }
804 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
805 }
806
807 // Return uids that are within range in view. Because the end has been reached, or
808 // because the UID is not after the last message.
809 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
810 uidsAny := make([]any, len(uids))
811 for i, uid := range uids {
812 uidsAny[i] = uid
813 }
814 err := ensureTx()
815 xcheckf(ctx, err, "transaction")
816 q := bstore.QueryTx[store.Message](xtx)
817 q.FilterNonzero(store.Message{MailboxID: mailboxID})
818 q.FilterEqual("UID", uidsAny...)
819 mbOK := v.matchesMailbox(mailboxID)
820 err = q.ForEach(func(m store.Message) error {
821 _, thread := v.threadIDs[m.ThreadID]
822 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
823 changedUIDs = append(changedUIDs, m.UID)
824 }
825 return nil
826 })
827 xcheckf(ctx, err, "fetching messages for change")
828 return changedUIDs
829 }
830
831 // Forward changes that are relevant to the current view.
832 for _, change := range changes {
833 switch c := change.(type) {
834 case store.ChangeAddUID:
835 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
836 xcheckf(ctx, err, "matching new message against view")
837 m, err := getmsg(0, c.MailboxID, c.UID)
838 xcheckf(ctx, err, "get message")
839 _, thread := v.threadIDs[m.ThreadID]
840 if !ok && !thread {
841 continue
842 }
843 state := msgState{acc: acc}
844 mi, err := messageItem(log, m, &state)
845 state.clear()
846 xcheckf(ctx, err, "make messageitem")
847 mi.MatchQuery = ok
848
849 mil := []MessageItem{mi}
850 if !thread && req.Query.Threading != ThreadOff {
851 err := ensureTx()
852 xcheckf(ctx, err, "transaction")
853 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false)
854 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
855 mil = append(mil, more...)
856 v.threadIDs[m.ThreadID] = struct{}{}
857 }
858
859 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
860
861 // If message extends the view, store it as such.
862 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
863 v.LastMessageReceived = m.Received
864 }
865
866 case store.ChangeRemoveUIDs:
867 // We may send changes for uids the client doesn't know, that's fine.
868 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
869 if len(changedUIDs) == 0 {
870 continue
871 }
872 ch := ChangeMsgRemove{c}
873 ch.UIDs = changedUIDs
874 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
875
876 case store.ChangeFlags:
877 // We may send changes for uids the client doesn't know, that's fine.
878 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
879 if len(changedUIDs) == 0 {
880 continue
881 }
882 ch := ChangeMsgFlags{c}
883 ch.UID = changedUIDs[0]
884 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
885
886 case store.ChangeThread:
887 // Change in muted/collaped state, just always ship it.
888 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
889
890 case store.ChangeRemoveMailbox:
891 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
892
893 case store.ChangeAddMailbox:
894 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
895
896 case store.ChangeRenameMailbox:
897 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
898
899 case store.ChangeMailboxCounts:
900 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
901
902 case store.ChangeMailboxSpecialUse:
903 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
904
905 case store.ChangeMailboxKeywords:
906 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
907
908 case store.ChangeAddSubscription:
909 // Webmail does not care about subscriptions.
910
911 default:
912 panic(fmt.Sprintf("missing case for change %T", c))
913 }
914 }
915
916 if len(taggedChanges) > 0 {
917 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
918 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
919 }
920 }
921
922 timer := time.NewTimer(5 * time.Minute) // For keepalives.
923 defer timer.Stop()
924 for {
925 if writer.wrote {
926 timer.Reset(5 * time.Minute)
927 writer.wrote = false
928 }
929
930 pending := comm.Pending
931 if reqctx != nil {
932 pending = nil
933 }
934
935 select {
936 case <-mox.Shutdown.Done():
937 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
938 // Work around go vet, it doesn't see defer cancelDrain.
939 if reqctxcancel != nil {
940 reqctxcancel()
941 }
942 return
943
944 case <-timer.C:
945 _, err := fmt.Fprintf(out, ": keepalive\n\n")
946 if err != nil {
947 log.Errorx("write keepalive", err)
948 // Work around go vet, it doesn't see defer cancelDrain.
949 if reqctxcancel != nil {
950 reqctxcancel()
951 }
952 return
953 }
954 out.Flush()
955 writer.wrote = true
956
957 case vm := <-viewMsgsc:
958 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
959 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
960 }
961 if vm.ViewEnd {
962 v.End = true
963 }
964 if len(vm.MessageItems) > 0 {
965 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
966 }
967 writer.xsendEvent(ctx, log, "viewMsgs", vm)
968
969 case ve := <-viewErrc:
970 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
971 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
972 }
973 if errors.Is(ve.err, context.Canceled) || moxio.IsClosed(ve.err) {
974 // Work around go vet, it doesn't see defer cancelDrain.
975 if reqctxcancel != nil {
976 reqctxcancel()
977 }
978 return
979 }
980 writer.xsendEvent(ctx, log, "viewErr", ve)
981
982 case vr := <-viewResetc:
983 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
984 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
985 }
986 writer.xsendEvent(ctx, log, "viewReset", vr)
987
988 case id := <-donec:
989 if id != v.Request.ID {
990 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
991 }
992 if reqctxcancel != nil {
993 reqctxcancel()
994 }
995 reqctx = nil
996 reqctxcancel = nil
997
998 case req := <-sse.Request:
999 if reqctx != nil {
1000 cancelDrain()
1001 }
1002 if req.Cancel {
1003 v = view{req, time.Time{}, false, false, nil, nil}
1004 continue
1005 }
1006
1007 reqctx, reqctxcancel = context.WithCancel(ctx)
1008
1009 stop := func() (stop bool) {
1010 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1011 var rtx *bstore.Tx
1012 var err error
1013 defer func() {
1014 if rtx != nil {
1015 err = rtx.Rollback()
1016 log.Check(err, "rolling back transaction")
1017 }
1018 }()
1019 acc.WithRLock(func() {
1020 rtx, err = acc.DB.Begin(reqctx, false)
1021 })
1022 if err != nil {
1023 reqctxcancel()
1024 reqctx = nil
1025 reqctxcancel = nil
1026
1027 if errors.Is(err, context.Canceled) {
1028 return true
1029 }
1030 err := fmt.Errorf("begin transaction: %v", err)
1031 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1032 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1033 return false
1034 }
1035
1036 // Reset view state for new query.
1037 if req.ViewID != v.Request.ViewID {
1038 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1039 if req.Query.Filter.MailboxChildrenIncluded {
1040 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1041 }
1042 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1043 } else {
1044 v.Request = req
1045 }
1046 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1047 rtx = nil
1048 return false
1049 }()
1050 if stop {
1051 return
1052 }
1053
1054 case <-pending:
1055 xprocessChanges(comm.Get())
1056
1057 case <-ctx.Done():
1058 // Work around go vet, it doesn't see defer cancelDrain.
1059 if reqctxcancel != nil {
1060 reqctxcancel()
1061 }
1062 return
1063 }
1064 }
1065}
1066
1067// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1068// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1069// mailboxIDs must or must not match. mailboxPrefixes is for use with
1070// xgatherMailboxIDs to gather children of the mailboxIDs.
1071func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1072 matchMailboxes = true
1073 mailboxIDs = map[int64]bool{}
1074 if f.MailboxID == -1 {
1075 matchMailboxes = false
1076 // Add the trash, junk and account rejects mailbox.
1077 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1078 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1079 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1080 mailboxIDs[mb.ID] = true
1081 }
1082 return nil
1083 })
1084 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1085 } else if f.MailboxID > 0 {
1086 mb := store.Mailbox{ID: f.MailboxID}
1087 err := tx.Get(&mb)
1088 xcheckf(ctx, err, "get mailbox")
1089 mailboxIDs[f.MailboxID] = true
1090 mailboxPrefixes = []string{mb.Name + "/"}
1091 }
1092 return
1093}
1094
1095// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1096// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1097func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1098 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1099 if len(mailboxPrefixes) == 0 {
1100 return
1101 }
1102 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1103 for _, p := range mailboxPrefixes {
1104 if strings.HasPrefix(mb.Name, p) {
1105 mailboxIDs[mb.ID] = true
1106 break
1107 }
1108 }
1109 return nil
1110 })
1111 xcheckf(ctx, err, "gathering mailboxes")
1112}
1113
1114// matchesMailbox returns whether a mailbox matches the view.
1115func (v view) matchesMailbox(mailboxID int64) bool {
1116 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1117}
1118
1119// inRange returns whether m is within the range for the view, whether a change for
1120// this message should be sent to the client so it can update its state.
1121func (v view) inRange(m store.Message) bool {
1122 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1123}
1124
1125// matches checks if the message, identified by either messageID or mailboxID+UID,
1126// is in the current "view" (i.e. passing the filters, and if checkRange is set
1127// also if within the range of sent messages based on sort order and the last seen
1128// message). getmsg retrieves the message, which may be necessary depending on the
1129// active filters. Used to determine if a store.Change with a new message should be
1130// sent, and for the destination and anchor messages in view requests.
1131func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1132 var m store.Message
1133 ensureMessage := func() bool {
1134 if m.ID == 0 && rerr == nil {
1135 m, rerr = getmsg(messageID, mailboxID, uid)
1136 }
1137 return rerr == nil
1138 }
1139
1140 q := v.Request.Query
1141
1142 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1143
1144 // Check filters.
1145 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1146 return false, rerr
1147 }
1148 // note: anchorMessageID is not relevant for matching.
1149 flagfilter := q.flagFilterFn()
1150 if flagfilter != nil && !flagfilter(flags, keywords) {
1151 return false, rerr
1152 }
1153
1154 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1155 return false, rerr
1156 }
1157 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1158 return false, rerr
1159 }
1160
1161 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1162 return false, rerr
1163 }
1164 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1165 return false, rerr
1166 }
1167
1168 state := msgState{acc: acc}
1169 defer func() {
1170 if rerr == nil && state.err != nil {
1171 rerr = state.err
1172 }
1173 state.clear()
1174 }()
1175
1176 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1177 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1178 return false, rerr
1179 }
1180
1181 envFilter := q.envFilterFn(log, &state)
1182 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1183 return false, rerr
1184 }
1185
1186 headerFilter := q.headerFilterFn(log, &state)
1187 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1188 return false, rerr
1189 }
1190
1191 wordsFilter := q.wordsFilterFn(log, &state)
1192 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1193 return false, rerr
1194 }
1195
1196 // Now check that we are either within the sorting order, or "last" was sent.
1197 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1198 return true, rerr
1199 }
1200 return false, rerr
1201}
1202
1203type msgResp struct {
1204 err error // If set, an error happened and fields below are not set.
1205 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1206 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1207 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1208 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1209}
1210
1211// viewRequestTx executes a request (query with filters, pagination) by
1212// launching a new goroutine with queryMessages, receiving results as msgResp,
1213// and sending Event* to the SSE connection.
1214//
1215// It always closes tx.
1216func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1217 defer func() {
1218 err := tx.Rollback()
1219 log.Check(err, "rolling back query transaction")
1220
1221 donec <- v.Request.ID
1222
1223 x := recover() // Should not happen, but don't take program down if it does.
1224 if x != nil {
1225 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1226 debug.PrintStack()
1227 metrics.PanicInc(metrics.Webmailrequest)
1228 }
1229 }()
1230
1231 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1232 var parsedMessage *ParsedMessage
1233 var viewEnd bool
1234
1235 var immediate bool // No waiting, flush immediate.
1236 t := time.NewTimer(300 * time.Millisecond)
1237 defer t.Stop()
1238
1239 sendViewMsgs := func(force bool) {
1240 if len(msgitems) == 0 && !force {
1241 return
1242 }
1243
1244 immediate = false
1245 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1246 msgitems = nil
1247 parsedMessage = nil
1248 t.Reset(300 * time.Millisecond)
1249 }
1250
1251 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1252
1253 mrc := make(chan msgResp, 1)
1254 go queryMessages(ctx, log, acc, tx, v, mrc)
1255
1256 for {
1257 select {
1258 case mr, ok := <-mrc:
1259 if !ok {
1260 sendViewMsgs(false)
1261 // Empty message list signals this query is done.
1262 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1263 return
1264 }
1265 if mr.err != nil {
1266 sendViewMsgs(false)
1267 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1268 return
1269 }
1270 if mr.reset {
1271 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1272 continue
1273 }
1274 if mr.viewEnd {
1275 viewEnd = true
1276 sendViewMsgs(true)
1277 return
1278 }
1279
1280 msgitems = append(msgitems, mr.mil)
1281 if mr.pm != nil {
1282 parsedMessage = mr.pm
1283 }
1284 if immediate {
1285 sendViewMsgs(true)
1286 }
1287
1288 case <-t.C:
1289 if len(msgitems) == 0 {
1290 // Nothing to send yet. We'll send immediately when the next message comes in.
1291 immediate = true
1292 } else {
1293 sendViewMsgs(false)
1294 }
1295 }
1296 }
1297}
1298
1299// queryMessages executes a query, with filter, pagination, destination message id
1300// to fetch (the message that the client had in view and wants to display again).
1301// It sends on msgc, with several types of messages: errors, whether the view is
1302// reset due to missing AnchorMessageID, and when the end of the view was reached
1303// and/or for a message.
1304func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp) {
1305 defer func() {
1306 x := recover() // Should not happen, but don't take program down if it does.
1307 if x != nil {
1308 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1309 debug.PrintStack()
1310 mrc <- msgResp{err: fmt.Errorf("query failed")}
1311 metrics.PanicInc(metrics.Webmailquery)
1312 }
1313
1314 close(mrc)
1315 }()
1316
1317 query := v.Request.Query
1318 page := v.Request.Page
1319
1320 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1321
1322 checkMessage := func(id int64) (valid bool, rerr error) {
1323 m := store.Message{ID: id}
1324 err := tx.Get(&m)
1325 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1326 return false, nil
1327 } else if err != nil {
1328 return false, err
1329 } else {
1330 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1331 return m, nil
1332 })
1333 }
1334 }
1335
1336 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1337 if page.AnchorMessageID > 0 {
1338 // Check if message exists and (still) matches the filter.
1339 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1340 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1341 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1342 return
1343 } else if !valid {
1344 mrc <- msgResp{reset: true}
1345 page.AnchorMessageID = 0
1346 }
1347 }
1348
1349 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1350 // it instead of continuing to send message till the end of the view.
1351 if page.DestMessageID > 0 {
1352 if valid, err := checkMessage(page.DestMessageID); err != nil {
1353 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1354 return
1355 } else if !valid {
1356 page.DestMessageID = 0
1357 }
1358 }
1359
1360 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1361
1362 q := bstore.QueryTx[store.Message](tx)
1363 q.FilterEqual("Expunged", false)
1364 if len(v.mailboxIDs) > 0 {
1365 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1366 // Should result in fast indexed query.
1367 for mbID := range v.mailboxIDs {
1368 q.FilterNonzero(store.Message{MailboxID: mbID})
1369 }
1370 } else {
1371 idsAny := make([]any, 0, len(v.mailboxIDs))
1372 for mbID := range v.mailboxIDs {
1373 idsAny = append(idsAny, mbID)
1374 }
1375 if v.matchMailboxIDs {
1376 q.FilterEqual("MailboxID", idsAny...)
1377 } else {
1378 q.FilterNotEqual("MailboxID", idsAny...)
1379 }
1380 }
1381 }
1382
1383 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1384 if page.AnchorMessageID > 0 {
1385 var seen = false
1386 q.FilterFn(func(m store.Message) bool {
1387 if seen {
1388 return true
1389 }
1390 seen = m.ID == page.AnchorMessageID
1391 return false
1392 })
1393 }
1394
1395 // We may be added filters the the query below. The FilterFn signature does not
1396 // implement reporting errors, or anything else, just a bool. So when making the
1397 // filter functions, we give them a place to store parsed message state, and an
1398 // error. We check the error during and after query execution.
1399 state := msgState{acc: acc}
1400 defer state.clear()
1401
1402 flagfilter := query.flagFilterFn()
1403 if flagfilter != nil {
1404 q.FilterFn(func(m store.Message) bool {
1405 return flagfilter(m.Flags, m.Keywords)
1406 })
1407 }
1408
1409 if query.Filter.Oldest != nil {
1410 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1411 }
1412 if query.Filter.Newest != nil {
1413 q.FilterLessEqual("Received", *query.Filter.Newest)
1414 }
1415
1416 if query.Filter.SizeMin > 0 {
1417 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1418 }
1419 if query.Filter.SizeMax > 0 {
1420 q.FilterLessEqual("Size", query.Filter.SizeMax)
1421 }
1422
1423 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1424 if attachmentFilter != nil {
1425 q.FilterFn(attachmentFilter)
1426 }
1427
1428 envFilter := query.envFilterFn(log, &state)
1429 if envFilter != nil {
1430 q.FilterFn(envFilter)
1431 }
1432
1433 headerFilter := query.headerFilterFn(log, &state)
1434 if headerFilter != nil {
1435 q.FilterFn(headerFilter)
1436 }
1437
1438 wordsFilter := query.wordsFilterFn(log, &state)
1439 if wordsFilter != nil {
1440 q.FilterFn(wordsFilter)
1441 }
1442
1443 if query.OrderAsc {
1444 q.SortAsc("Received")
1445 } else {
1446 q.SortDesc("Received")
1447 }
1448 found := page.DestMessageID <= 0
1449 end := true
1450 have := 0
1451 err := q.ForEach(func(m store.Message) error {
1452 // Check for an error in one of the filters, propagate it.
1453 if state.err != nil {
1454 return state.err
1455 }
1456
1457 if have >= page.Count && found || have > 10000 {
1458 end = false
1459 return bstore.StopForEach
1460 }
1461
1462 if _, ok := v.threadIDs[m.ThreadID]; ok {
1463 // Message was already returned as part of a thread.
1464 return nil
1465 }
1466
1467 var pm *ParsedMessage
1468 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1469 // For threads, if there was not DestMessageID, we may be getting the newest
1470 // message. For an initial view, this isn't necessarily the first the user is
1471 // expected to read first, that would be the first unread, which we'll get below
1472 // when gathering the thread.
1473 found = true
1474 xpm, err := parsedMessage(log, m, &state, true, false)
1475 if err != nil {
1476 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1477 }
1478 pm = &xpm
1479 }
1480
1481 mi, err := messageItem(log, m, &state)
1482 if err != nil {
1483 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1484 }
1485 mil := []MessageItem{mi}
1486 if query.Threading != ThreadOff {
1487 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0)
1488 if err != nil {
1489 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1490 }
1491 if xpm != nil {
1492 pm = xpm
1493 found = true
1494 }
1495 mil = append(mil, more...)
1496 v.threadIDs[m.ThreadID] = struct{}{}
1497
1498 // Calculate how many messages the frontend is going to show, and only count those as returned.
1499 collapsed := map[int64]bool{}
1500 for _, mi := range mil {
1501 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1502 }
1503 unread := map[int64]bool{} // Propagated to thread root.
1504 if query.Threading == ThreadUnread {
1505 for _, mi := range mil {
1506 m := mi.Message
1507 if m.Seen {
1508 continue
1509 }
1510 unread[m.ID] = true
1511 for _, id := range m.ThreadParentIDs {
1512 unread[id] = true
1513 }
1514 }
1515 }
1516 for _, mi := range mil {
1517 m := mi.Message
1518 threadRoot := true
1519 rootID := m.ID
1520 for _, id := range m.ThreadParentIDs {
1521 if _, ok := collapsed[id]; ok {
1522 threadRoot = false
1523 rootID = id
1524 }
1525 }
1526 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1527 have++
1528 }
1529 }
1530 } else {
1531 have++
1532 }
1533 mrc <- msgResp{mil: mil, pm: pm}
1534 return nil
1535 })
1536 // Check for an error in one of the filters again. Check in ForEach would not
1537 // trigger if the last message has the error.
1538 if err == nil && state.err != nil {
1539 err = state.err
1540 }
1541 if err != nil {
1542 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1543 return
1544 }
1545 if end {
1546 mrc <- msgResp{viewEnd: true}
1547 }
1548}
1549
1550func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool) ([]MessageItem, *ParsedMessage, error) {
1551 if m.ThreadID == 0 {
1552 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1553 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1554 }
1555
1556 // Fetch other messages for this thread.
1557 qt := bstore.QueryTx[store.Message](tx)
1558 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1559 qt.FilterEqual("Expunged", false)
1560 qt.FilterNotEqual("ID", m.ID)
1561 qt.SortAsc("ID")
1562 tml, err := qt.List()
1563 if err != nil {
1564 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1565 }
1566
1567 var mil []MessageItem
1568 var pm *ParsedMessage
1569 var firstUnread bool
1570 for _, tm := range tml {
1571 err := func() error {
1572 xstate := msgState{acc: acc}
1573 defer xstate.clear()
1574
1575 mi, err := messageItem(log, tm, &xstate)
1576 if err != nil {
1577 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1578 }
1579 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1580 return tm, nil
1581 })
1582 if err != nil {
1583 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1584 }
1585 mil = append(mil, mi)
1586
1587 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1588 firstUnread = !tm.Seen
1589 xpm, err := parsedMessage(log, tm, &xstate, true, false)
1590 if err != nil {
1591 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1592 }
1593 pm = &xpm
1594 }
1595 return nil
1596 }()
1597 if err != nil {
1598 return nil, nil, err
1599 }
1600 }
1601
1602 // Finally, the message that caused us to gather this thread (which is likely the
1603 // most recent message in the thread) could be the only unread message.
1604 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1605 xstate := msgState{acc: acc}
1606 defer xstate.clear()
1607 xpm, err := parsedMessage(log, m, &xstate, true, false)
1608 if err != nil {
1609 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1610 }
1611 pm = &xpm
1612 }
1613
1614 return mil, pm, nil
1615}
1616
1617// While checking the filters on a message, we may need to get more message
1618// details as each filter passes. We check the filters that need the basic
1619// information first, and load and cache more details for the next filters.
1620// msgState holds parsed details for a message, it is updated while filtering,
1621// with more information or reset for a next message.
1622type msgState struct {
1623 acc *store.Account // Never changes during lifetime.
1624 err error // Once set, doesn't get cleared.
1625 m store.Message
1626 part *message.Part // Will be without Reader when msgr is nil.
1627 msgr *store.MsgReader
1628}
1629
1630func (ms *msgState) clear() {
1631 if ms.msgr != nil {
1632 ms.msgr.Close()
1633 ms.msgr = nil
1634 }
1635 *ms = msgState{acc: ms.acc, err: ms.err}
1636}
1637
1638func (ms *msgState) ensureMsg(m store.Message) {
1639 if m.ID != ms.m.ID {
1640 ms.clear()
1641 }
1642 ms.m = m
1643}
1644
1645func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1646 ms.ensureMsg(m)
1647
1648 if ms.err == nil {
1649 if ms.part == nil {
1650 if m.ParsedBuf == nil {
1651 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1652 return false
1653 }
1654 var p message.Part
1655 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1656 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1657 return false
1658 }
1659 ms.part = &p
1660 }
1661 if withMsgReader && ms.msgr == nil {
1662 ms.msgr = ms.acc.MessageReader(m)
1663 ms.part.SetReaderAt(ms.msgr)
1664 }
1665 }
1666 return ms.part != nil
1667}
1668
1669// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1670// filters for a query. A nil function is returned if there are no flags to filter
1671// on.
1672func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1673 labels := map[string]bool{}
1674 for _, k := range q.Filter.Labels {
1675 labels[k] = true
1676 }
1677 for _, k := range q.NotFilter.Labels {
1678 labels[k] = false
1679 }
1680
1681 if len(labels) == 0 {
1682 return nil
1683 }
1684
1685 var mask, flags store.Flags
1686 systemflags := map[string][]*bool{
1687 `\answered`: {&mask.Answered, &flags.Answered},
1688 `\flagged`: {&mask.Flagged, &flags.Flagged},
1689 `\deleted`: {&mask.Deleted, &flags.Deleted},
1690 `\seen`: {&mask.Seen, &flags.Seen},
1691 `\draft`: {&mask.Draft, &flags.Draft},
1692 `$junk`: {&mask.Junk, &flags.Junk},
1693 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1694 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1695 `$phishing`: {&mask.Phishing, &flags.Phishing},
1696 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1697 }
1698 keywords := map[string]bool{}
1699 for k, v := range labels {
1700 k = strings.ToLower(k)
1701 if mf, ok := systemflags[k]; ok {
1702 *mf[0] = true
1703 *mf[1] = v
1704 } else {
1705 keywords[k] = v
1706 }
1707 }
1708 return func(msgFlags store.Flags, msgKeywords []string) bool {
1709 var f store.Flags
1710 if f.Set(mask, msgFlags) != flags {
1711 return false
1712 }
1713 for k, v := range keywords {
1714 if slices.Contains(msgKeywords, k) != v {
1715 return false
1716 }
1717 }
1718 return true
1719 }
1720}
1721
1722// attachmentFilterFn returns a function that filters for the attachment-related
1723// filter from the query. A nil function is returned if there are attachment
1724// filters.
1725func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1726 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1727 return nil
1728 }
1729
1730 return func(m store.Message) bool {
1731 if !state.ensurePart(m, false) {
1732 return false
1733 }
1734 types, err := attachmentTypes(log, m, state)
1735 if err != nil {
1736 state.err = err
1737 return false
1738 }
1739 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1740 }
1741}
1742
1743var attachmentMimetypes = map[string]AttachmentType{
1744 "application/pdf": AttachmentPDF,
1745 "application/zip": AttachmentArchive,
1746 "application/x-rar-compressed": AttachmentArchive,
1747 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1748 "application/vnd.ms-excel": AttachmentSpreadsheet,
1749 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1750 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1751 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1752 "application/vnd.ms-powerpoint": AttachmentPresentation,
1753 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1754}
1755var attachmentExtensions = map[string]AttachmentType{
1756 ".pdf": AttachmentPDF,
1757 ".zip": AttachmentArchive,
1758 ".tar": AttachmentArchive,
1759 ".tgz": AttachmentArchive,
1760 ".tar.gz": AttachmentArchive,
1761 ".tbz2": AttachmentArchive,
1762 ".tar.bz2": AttachmentArchive,
1763 ".tar.lz": AttachmentArchive,
1764 ".tlz": AttachmentArchive,
1765 ".tar.xz": AttachmentArchive,
1766 ".txz": AttachmentArchive,
1767 ".tar.zst": AttachmentArchive,
1768 ".tar.lz4": AttachmentArchive,
1769 ".7z": AttachmentArchive,
1770 ".rar": AttachmentArchive,
1771 ".ods": AttachmentSpreadsheet,
1772 ".xls": AttachmentSpreadsheet,
1773 ".xlsx": AttachmentSpreadsheet,
1774 ".odt": AttachmentDocument,
1775 ".doc": AttachmentDocument,
1776 ".docx": AttachmentDocument,
1777 ".odp": AttachmentPresentation,
1778 ".ppt": AttachmentPresentation,
1779 ".pptx": AttachmentPresentation,
1780}
1781
1782func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1783 types := map[AttachmentType]bool{}
1784
1785 pm, err := parsedMessage(log, m, state, false, false)
1786 if err != nil {
1787 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1788 }
1789 for _, a := range pm.attachments {
1790 if a.Part.MediaType == "IMAGE" {
1791 types[AttachmentImage] = true
1792 continue
1793 }
1794 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1795 if t, ok := attachmentMimetypes[mt]; ok {
1796 types[t] = true
1797 } else if ext := filepath.Ext(tryDecodeParam(log, a.Part.ContentTypeParams["name"])); ext != "" {
1798 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1799 types[t] = true
1800 } else {
1801 continue
1802 }
1803 }
1804 }
1805
1806 if len(types) == 0 {
1807 types[AttachmentNone] = true
1808 } else {
1809 types[AttachmentAny] = true
1810 }
1811 return types, nil
1812}
1813
1814// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1815// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1816// clash with SMTP envelope) for the query. A nil function is returned if no
1817// filtering is needed.
1818func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1819 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1820 return nil
1821 }
1822
1823 lower := func(l []string) []string {
1824 if len(l) == 0 {
1825 return nil
1826 }
1827 r := make([]string, len(l))
1828 for i, s := range l {
1829 r[i] = strings.ToLower(s)
1830 }
1831 return r
1832 }
1833
1834 filterSubject := lower(q.Filter.Subject)
1835 notFilterSubject := lower(q.NotFilter.Subject)
1836 filterFrom := lower(q.Filter.From)
1837 notFilterFrom := lower(q.NotFilter.From)
1838 filterTo := lower(q.Filter.To)
1839 notFilterTo := lower(q.NotFilter.To)
1840
1841 return func(m store.Message) bool {
1842 if !state.ensurePart(m, false) {
1843 return false
1844 }
1845
1846 var env message.Envelope
1847 if state.part.Envelope != nil {
1848 env = *state.part.Envelope
1849 }
1850
1851 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1852 subject := strings.ToLower(env.Subject)
1853 for _, s := range filterSubject {
1854 if !strings.Contains(subject, s) {
1855 return false
1856 }
1857 }
1858 for _, s := range notFilterSubject {
1859 if strings.Contains(subject, s) {
1860 return false
1861 }
1862 }
1863 }
1864
1865 contains := func(textLower []string, l []message.Address, all bool) bool {
1866 next:
1867 for _, s := range textLower {
1868 for _, a := range l {
1869 name := strings.ToLower(a.Name)
1870 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
1871 if strings.Contains(name, s) || strings.Contains(addr, s) {
1872 if !all {
1873 return true
1874 }
1875 continue next
1876 }
1877 }
1878 if all {
1879 return false
1880 }
1881 }
1882 return all
1883 }
1884
1885 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
1886 return false
1887 }
1888 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
1889 return false
1890 }
1891 if len(filterTo) > 0 || len(notFilterTo) > 0 {
1892 to := append(append(append([]message.Address{}, env.To...), env.CC...), env.BCC...)
1893 if len(filterTo) > 0 && !contains(filterTo, to, true) {
1894 return false
1895 }
1896 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
1897 return false
1898 }
1899 }
1900 return true
1901 }
1902}
1903
1904// headerFilterFn returns a function that filters for the header filters in the
1905// query. A nil function is returned if there are no header filters.
1906func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1907 if len(q.Filter.Headers) == 0 {
1908 return nil
1909 }
1910
1911 lowerValues := make([]string, len(q.Filter.Headers))
1912 for i, t := range q.Filter.Headers {
1913 lowerValues[i] = strings.ToLower(t[1])
1914 }
1915
1916 return func(m store.Message) bool {
1917 if !state.ensurePart(m, true) {
1918 return false
1919 }
1920 hdr, err := state.part.Header()
1921 if err != nil {
1922 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
1923 return false
1924 }
1925
1926 next:
1927 for i, t := range q.Filter.Headers {
1928 k := t[0]
1929 v := lowerValues[i]
1930 l := hdr.Values(k)
1931 if v == "" && len(l) > 0 {
1932 continue
1933 }
1934 for _, e := range l {
1935 if strings.Contains(strings.ToLower(e), v) {
1936 continue next
1937 }
1938 }
1939 return false
1940 }
1941 return true
1942 }
1943}
1944
1945// wordFiltersFn returns a function that applies the word filters of the query. A
1946// nil function is returned when query does not contain a word filter.
1947func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1948 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
1949 return nil
1950 }
1951
1952 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
1953
1954 return func(m store.Message) bool {
1955 if !state.ensurePart(m, true) {
1956 return false
1957 }
1958
1959 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
1960 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)
1961 return false
1962 } else {
1963 return ok
1964 }
1965 }
1966}
1967