3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
8 cryptrand "crypto/rand"
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxio"
33 "github.com/mjl-/mox/moxvar"
34 "github.com/mjl-/mox/smtp"
35 "github.com/mjl-/mox/store"
38// Request is a request to an SSE connection to send messages, either for a new
39// view, to continue with an existing view, or to a cancel an ongoing request.
43 SSEID int64 // SSE connection.
45 // To indicate a request is a continuation (more results) of the previous view.
46 // Echoed in events, client checks if it is getting results for the latest request.
49 // If set, this request and its view are canceled. A new view must be started.
59 ThreadOff ThreadMode = "off"
60 ThreadOn ThreadMode = "on"
61 ThreadUnread ThreadMode = "unread"
64// Query is a request for messages that match filters, in a given order.
66 OrderAsc bool // Order by received ascending or desending.
72// AttachmentType is for filtering by attachment type.
73type AttachmentType string
76 AttachmentIndifferent AttachmentType = ""
77 AttachmentNone AttachmentType = "none"
78 AttachmentAny AttachmentType = "any"
79 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
80 AttachmentPDF AttachmentType = "pdf"
81 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
82 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
83 AttachmentDocument AttachmentType = "document" // odt, docx, ...
84 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
87// Filter selects the messages to return. Fields that are set must all match,
88// for slices each element by match ("and").
90 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
93 // If true, also submailboxes are included in the search.
94 MailboxChildrenIncluded bool
96 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
97 // connection setup, where it is turned into a MailboxID. Filtering only looks at
101 Words []string // Case insensitive substring match for each string.
103 To []string // Including Cc and Bcc.
107 Attachments AttachmentType
109 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
114// NotFilter matches messages that don't match these fields.
115type NotFilter struct {
120 Attachments AttachmentType
124// Page holds pagination parameters for a request.
126 // Start returning messages after this ID, if > 0. For pagination, fetching the
127 // next set of messages.
128 AnchorMessageID int64
130 // Number of messages to return, must be >= 1, we never return more than 10000 for
134 // If > 0, return messages until DestMessageID is found. More than Count messages
135 // can be returned. For long-running searches, it may take a while before this
140// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
142// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
144type MessageAddress struct {
145 Name string // Free-form name for display in mail applications.
146 User string // Localpart, encoded.
150// MessageEnvelope is like message.Envelope, as used in message.Part, but including
151// unicode host names for IDNA names.
152type MessageEnvelope struct {
153 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
156 From []MessageAddress
157 Sender []MessageAddress
158 ReplyTo []MessageAddress
166// MessageItem is sent by queries, it has derived information analyzed from
167// message.Part, made for the needs of the message items in the message list.
169type MessageItem struct {
170 Message store.Message // Without ParsedBuf and MsgPrefix, for size.
171 Envelope MessageEnvelope
172 Attachments []Attachment
175 FirstLine string // Of message body, for showing as preview.
176 MatchQuery bool // If message does not match query, it can still be included because of threading.
179// ParsedMessage has more parsed/derived information about a message, intended
180// for rendering the (contents of the) message. Information from MessageItem is
182type ParsedMessage struct {
185 Headers map[string][]string
187 // Text parts, can be empty.
190 // Whether there is an HTML part. The webclient renders HTML message parts through
191 // an iframe and a separate request with strict CSP headers to prevent script
192 // execution and loading of external resources, which isn't possible when loading
193 // in iframe with inline HTML because not all browsers support the iframe csp
197 ListReplyAddress *MessageAddress // From List-Post.
199 // Information used by MessageItem, not exported in this type.
200 envelope MessageEnvelope
201 attachments []Attachment
207// EventStart is the first message sent on an SSE connection, giving the client
208// basic data to populate its UI. After this event, messages will follow quickly in
209// an EventViewMsgs event.
210type EventStart struct {
212 LoginAddress MessageAddress
213 Addresses []MessageAddress
214 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
216 Mailboxes []store.Mailbox
217 RejectsMailbox string
221// DomainAddressConfig has the address (localpart) configuration for a domain, so
222// the webmail client can decide if an address matches the addresses of the
224type DomainAddressConfig struct {
225 LocalpartCatchallSeparator string // Can be empty.
226 LocalpartCaseSensitive bool
229// EventViewMsgs contains messages for a view, possibly a continuation of an
230// earlier list of messages.
231type EventViewMsgs struct {
235 // If empty, this was the last message for the request. If non-empty, a list of
236 // thread messages. Each with the first message being the reason this thread is
237 // included and can be used as AnchorID in followup requests. If the threading mode
238 // is "off" in the query, there will always be only a single message. If a thread
239 // is sent, all messages in the thread are sent, including those that don't match
240 // the query (e.g. from another mailbox). Threads can be displayed based on the
241 // ThreadParentIDs field, with possibly slightly different display based on field
242 // ThreadMissingLink.
243 MessageItems [][]MessageItem
245 // If set, will match the target page.DestMessageID from the request.
246 ParsedMessage *ParsedMessage
248 // If set, there are no more messages in this view at this moment. Messages can be
249 // added, typically via Change messages, e.g. for new deliveries.
253// EventViewErr indicates an error during a query for messages. The request is
254// aborted, no more request-related messages will be sent until the next request.
255type EventViewErr struct {
258 Err string // To be displayed in client.
259 err error // Original message, for checking against context.Canceled.
262// EventViewReset indicates that a request for the next set of messages in a few
263// could not be fulfilled, e.g. because the anchor message does not exist anymore.
264// The client should clear its list of messages. This can happen before
265// EventViewMsgs events are sent.
266type EventViewReset struct {
271// EventViewChanges contain one or more changes relevant for the client, either
272// with new mailbox total/unseen message counts, or messages added/removed/modified
273// (flags) for the current view.
274type EventViewChanges struct {
276 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
279// ChangeMsgAdd adds a new message and possibly its thread to the view.
280type ChangeMsgAdd struct {
282 MessageItems []MessageItem
285// ChangeMsgRemove removes one or more messages from the view.
286type ChangeMsgRemove struct {
287 store.ChangeRemoveUIDs
290// ChangeMsgFlags updates flags for one message.
291type ChangeMsgFlags struct {
295// ChangeMsgThread updates muted/collapsed fields for one message.
296type ChangeMsgThread struct {
300// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
301type ChangeMailboxRemove struct {
302 store.ChangeRemoveMailbox
305// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
306type ChangeMailboxAdd struct {
307 Mailbox store.Mailbox
310// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
311// It could be under a new parent.
312type ChangeMailboxRename struct {
313 store.ChangeRenameMailbox
316// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
317type ChangeMailboxCounts struct {
318 store.ChangeMailboxCounts
321// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
322type ChangeMailboxSpecialUse struct {
323 store.ChangeMailboxSpecialUse
326// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
327// a message was added with a keyword that wasn't in the mailbox yet.
328type ChangeMailboxKeywords struct {
329 store.ChangeMailboxKeywords
332// View holds the information about the returned data for a query. It is used to
333// determine whether mailbox changes should be sent to the client, we only send
334// addition/removal/flag-changes of messages that are in view, or would extend it
335// if the view is at the end of the results.
339 // Received of last message we sent to the client. We use it to decide if a newly
340 // delivered message is within the view and the client should get a notification.
341 LastMessageReceived time.Time
343 // If set, the last message in the query view has been sent. There is no need to do
344 // another query, it will not return more data. Used to decide if an event for a
345 // new message should be sent.
348 // Whether message must or must not match mailboxIDs.
350 // Mailboxes to match, can be multiple, for matching children. If empty, there is
351 // no filter on mailboxes.
352 mailboxIDs map[int64]bool
354 // Threads sent to client. New messages for this thread are also sent, regardless
355 // of regular query matching, so also for other mailboxes. If the user (re)moved
356 // all messages of a thread, they may still receive events for the thread. Only
357 // filled when query with threading not off.
358 threadIDs map[int64]struct{}
361// sses tracks all sse connections, and access to them.
368// sse represents an sse connection.
370 ID int64 // Also returned in EventStart and used in Request to identify the request.
371 AccountName string // Used to check the authenticated user has access to the SSE connection.
372 Request chan Request // Goroutine will receive requests from here, coming from API calls.
375// called by the goroutine when the connection is closed or breaks.
376func (sse sse) unregister() {
379 delete(sses.m, sse.ID)
381 // Drain any pending requests, preventing blocked goroutines from API calls.
391func sseRegister(accountName string) sse {
395 v := sse{sses.gen, accountName, make(chan Request, 1)}
400// sseGet returns a reference to an existing connection if it exists and user
402func sseGet(id int64, accountName string) (sse, bool) {
406 if s.AccountName != accountName {
412// ssetoken is a temporary token that has not yet been used to start an SSE
413// connection. Created by Token, consumed by a new SSE connection.
414type ssetoken struct {
415 token string // Uniquely generated.
417 address string // Address used to authenticate in call that created the token.
418 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
422// ssetokens maintains unused tokens. We have just one, but it's a type so we
423// can define methods.
424type ssetokens struct {
426 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
427 tokens map[string]ssetoken // Token to details, for finding account for a token.
430var sseTokens = ssetokens{
431 accountTokens: map[string][]ssetoken{},
432 tokens: map[string]ssetoken{},
435// xgenerate creates and saves a new token. It ensures no more than 10 tokens
436// per account exist, removing old ones if needed.
437func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
438 buf := make([]byte, 16)
439 _, err := cryptrand.Read(buf)
440 xcheckf(ctx, err, "generating token")
441 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
445 n := len(x.accountTokens[accName])
447 for _, ost := range x.accountTokens[accName][:n-9] {
448 delete(x.tokens, ost.token)
450 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
451 x.accountTokens[accName] = x.accountTokens[accName][:9]
453 x.accountTokens[accName] = append(x.accountTokens[accName], st)
454 x.tokens[st.token] = st
458// check verifies a token, and consumes it if valid.
459func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
463 st, ok := x.tokens[token]
465 return "", "", "", false, nil
467 delete(x.tokens, token)
468 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
469 return "", "", "", false, errors.New("internal error, could not find token in account")
471 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
472 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
473 if len(x.accountTokens[st.accName]) == 0 {
474 delete(x.accountTokens, st.accName)
477 if time.Now().After(st.validUntil) {
478 return "", "", "", false, nil
480 return st.accName, st.address, st.sessionToken, true, nil
483// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
488// serveEvents serves an SSE connection. Authentication is done through a query
489// string parameter "token", a one-time-use token returned by the Token API call.
490func serveEvents(ctx context.Context, log mlog.Log, w http.ResponseWriter, r *http.Request) {
491 if r.Method != "GET" {
492 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
496 flusher, ok := w.(http.Flusher)
498 log.Error("internal error: ResponseWriter not a http.Flusher")
499 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
504 token := q.Get("token")
506 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
509 accName, address, sessionToken, ok, err := sseTokens.check(token)
511 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
515 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
518 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
519 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
523 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
524 // incoming responses with its slow-network similation.
525 var waitMin, waitMax time.Duration
526 waitMinMsec := q.Get("waitMinMsec")
527 waitMaxMsec := q.Get("waitMaxMsec")
528 if waitMinMsec != "" && waitMaxMsec != "" {
529 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
530 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
533 waitMin = time.Duration(v) * time.Millisecond
536 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
537 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
540 waitMax = time.Duration(v) * time.Millisecond
544 // Parse the request with initial mailbox/search criteria.
546 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
547 dec.DisallowUnknownFields()
548 if err := dec.Decode(&req); err != nil {
549 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
551 } else if req.Page.Count <= 0 {
552 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
555 if req.Query.Threading == "" {
556 req.Query.Threading = ThreadOff
559 var writer *eventWriter
561 metricSSEConnections.Inc()
562 defer metricSSEConnections.Dec()
564 // Below here, error handling cause through xcheckf, which panics with
565 // *sherpa.Error, after which we send an error event to the client. We can also get
566 // an *ioErr when the connection is broken.
572 if err, ok := x.(*sherpa.Error); ok {
573 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
574 } else if _, ok := x.(ioErr); ok {
577 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
579 metrics.PanicInc(metrics.Webmail)
585 h.Set("Content-Type", "text/event-stream")
586 h.Set("Cache-Control", "no-cache")
588 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
589 // keys), so should be quite compressible.
591 gz := mox.AcceptsGzip(r)
593 h.Set("Content-Encoding", "gzip")
594 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
598 out = httpFlusher{out, flusher}
600 // We'll be writing outgoing SSE events through writer.
601 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
604 // Fetch initial data.
605 acc, err := store.OpenAccount(log, accName)
606 xcheckf(ctx, err, "open account")
609 log.Check(err, "closing account")
611 comm := store.RegisterComm(acc)
612 defer comm.Unregister()
614 // List addresses that the client can use to send email from.
615 accConf, _ := acc.Conf()
616 loginAddr, err := smtp.ParseAddress(address)
617 xcheckf(ctx, err, "parsing login address")
618 _, _, dest, err := mox.FindAccount(loginAddr.Localpart, loginAddr.Domain, false)
619 xcheckf(ctx, err, "looking up destination for login address")
620 loginName := accConf.FullName
621 if dest.FullName != "" {
622 loginName = dest.FullName
624 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
625 var addresses []MessageAddress
626 for a, dest := range accConf.Destinations {
627 name := dest.FullName
629 name = accConf.FullName
631 var ma MessageAddress
632 if strings.HasPrefix(a, "@") {
633 dom, err := dns.ParseDomain(a[1:])
634 xcheckf(ctx, err, "parsing destination address for account")
635 ma = MessageAddress{Domain: dom}
637 addr, err := smtp.ParseAddress(a)
638 xcheckf(ctx, err, "parsing destination address for account")
639 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
641 addresses = append(addresses, ma)
644 // We implicitly start a query. We use the reqctx for the transaction, because the
645 // transaction is passed to the query, which can be canceled.
646 reqctx, reqctxcancel := context.WithCancel(ctx)
648 // We also cancel in cancelDrain later on, but there is a brief window where the
649 // context wouldn't be canceled.
650 if reqctxcancel != nil {
656 // qtx is kept around during connection initialization, until we pass it off to the
657 // goroutine that starts querying for messages.
661 err := qtx.Rollback()
662 log.Check(err, "rolling back")
666 var mbl []store.Mailbox
668 // We only take the rlock when getting the tx.
669 acc.WithRLock(func() {
670 // Now a read-only transaction we'll use during the query.
671 qtx, err = acc.DB.Begin(reqctx, false)
672 xcheckf(ctx, err, "begin transaction")
674 mbl, err = bstore.QueryTx[store.Mailbox](qtx).List()
675 xcheckf(ctx, err, "list mailboxes")
678 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
679 var zerofilter Filter
680 var zeronotfilter NotFilter
681 var mailbox store.Mailbox
682 var mailboxPrefixes []string
683 var matchMailboxes bool
684 mailboxIDs := map[int64]bool{}
685 mailboxName := req.Query.Filter.MailboxName
686 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
687 if mailboxName == "" {
688 mailboxName = "Inbox"
691 var inbox store.Mailbox
692 for _, e := range mbl {
693 if e.Name == mailboxName {
696 if e.Name == "Inbox" {
704 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
706 req.Query.Filter.MailboxID = mailbox.ID
707 req.Query.Filter.MailboxName = ""
708 mailboxPrefixes = []string{mailbox.Name + "/"}
709 matchMailboxes = true
710 mailboxIDs[mailbox.ID] = true
712 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
714 if req.Query.Filter.MailboxChildrenIncluded {
715 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
718 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
720 sse := sseRegister(acc.Name)
721 defer sse.unregister()
723 // Per-domain localpart config so webclient can decide if an address belongs to the account.
724 domainAddressConfigs := map[string]DomainAddressConfig{}
725 for _, a := range addresses {
726 dom, _ := mox.Conf.Domain(a.Domain)
727 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparator, dom.LocalpartCaseSensitive}
730 // Write first event, allowing client to fill its UI with mailboxes.
731 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, moxvar.Version}
732 writer.xsendEvent(ctx, log, "start", start)
734 // The goroutine doing the querying will send messages on these channels, which
735 // result in an event being written on the SSE connection.
736 viewMsgsc := make(chan EventViewMsgs)
737 viewErrc := make(chan EventViewErr)
738 viewResetc := make(chan EventViewReset)
739 donec := make(chan int64) // When request is done.
741 // Start a view, it determines if we send a change to the client. And start an
742 // implicit query for messages, we'll send the messages to the client which can
743 // fill its ui with messages.
744 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
745 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
746 qtx = nil // viewRequestTx closes qtx
748 // When canceling a query, we must drain its messages until it says it is done.
749 // Otherwise the sending goroutine would hang indefinitely on a channel send.
750 cancelDrain := func() {
751 if reqctxcancel != nil {
752 // Cancel the goroutine doing the querying.
760 // Drain events until done.
772 // If we stop and a query is in progress, we must drain the channel it will send on.
775 // Changes broadcasted by other connections on this account. If applicable for the
776 // connection/view, we send events.
777 xprocessChanges := func(changes []store.Change) {
778 taggedChanges := [][2]any{}
780 // We get a transaction first time we need it.
784 err := xtx.Rollback()
785 log.Check(err, "rolling back transaction")
788 ensureTx := func() error {
795 xtx, err = acc.DB.Begin(ctx, false)
798 // This getmsg will now only be called mailboxID+UID, not with messageID set.
799 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
800 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
801 if err := ensureTx(); err != nil {
802 return store.Message{}, fmt.Errorf("transaction: %v", err)
804 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
807 // Return uids that are within range in view. Because the end has been reached, or
808 // because the UID is not after the last message.
809 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
810 uidsAny := make([]any, len(uids))
811 for i, uid := range uids {
815 xcheckf(ctx, err, "transaction")
816 q := bstore.QueryTx[store.Message](xtx)
817 q.FilterNonzero(store.Message{MailboxID: mailboxID})
818 q.FilterEqual("UID", uidsAny...)
819 mbOK := v.matchesMailbox(mailboxID)
820 err = q.ForEach(func(m store.Message) error {
821 _, thread := v.threadIDs[m.ThreadID]
822 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
823 changedUIDs = append(changedUIDs, m.UID)
827 xcheckf(ctx, err, "fetching messages for change")
831 // Forward changes that are relevant to the current view.
832 for _, change := range changes {
833 switch c := change.(type) {
834 case store.ChangeAddUID:
835 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
836 xcheckf(ctx, err, "matching new message against view")
837 m, err := getmsg(0, c.MailboxID, c.UID)
838 xcheckf(ctx, err, "get message")
839 _, thread := v.threadIDs[m.ThreadID]
843 state := msgState{acc: acc}
844 mi, err := messageItem(log, m, &state)
846 xcheckf(ctx, err, "make messageitem")
849 mil := []MessageItem{mi}
850 if !thread && req.Query.Threading != ThreadOff {
852 xcheckf(ctx, err, "transaction")
853 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false)
854 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
855 mil = append(mil, more...)
856 v.threadIDs[m.ThreadID] = struct{}{}
859 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
861 // If message extends the view, store it as such.
862 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
863 v.LastMessageReceived = m.Received
866 case store.ChangeRemoveUIDs:
867 // We may send changes for uids the client doesn't know, that's fine.
868 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
869 if len(changedUIDs) == 0 {
872 ch := ChangeMsgRemove{c}
873 ch.UIDs = changedUIDs
874 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
876 case store.ChangeFlags:
877 // We may send changes for uids the client doesn't know, that's fine.
878 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
879 if len(changedUIDs) == 0 {
882 ch := ChangeMsgFlags{c}
883 ch.UID = changedUIDs[0]
884 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
886 case store.ChangeThread:
887 // Change in muted/collaped state, just always ship it.
888 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
890 case store.ChangeRemoveMailbox:
891 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
893 case store.ChangeAddMailbox:
894 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
896 case store.ChangeRenameMailbox:
897 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
899 case store.ChangeMailboxCounts:
900 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
902 case store.ChangeMailboxSpecialUse:
903 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
905 case store.ChangeMailboxKeywords:
906 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
908 case store.ChangeAddSubscription:
909 // Webmail does not care about subscriptions.
912 panic(fmt.Sprintf("missing case for change %T", c))
916 if len(taggedChanges) > 0 {
917 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
918 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
922 timer := time.NewTimer(5 * time.Minute) // For keepalives.
926 timer.Reset(5 * time.Minute)
930 pending := comm.Pending
936 case <-mox.Shutdown.Done():
937 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
938 // Work around go vet, it doesn't see defer cancelDrain.
939 if reqctxcancel != nil {
945 _, err := fmt.Fprintf(out, ": keepalive\n\n")
947 log.Errorx("write keepalive", err)
948 // Work around go vet, it doesn't see defer cancelDrain.
949 if reqctxcancel != nil {
957 case vm := <-viewMsgsc:
958 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
959 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
964 if len(vm.MessageItems) > 0 {
965 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
967 writer.xsendEvent(ctx, log, "viewMsgs", vm)
969 case ve := <-viewErrc:
970 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
971 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
973 if errors.Is(ve.err, context.Canceled) || moxio.IsClosed(ve.err) {
974 // Work around go vet, it doesn't see defer cancelDrain.
975 if reqctxcancel != nil {
980 writer.xsendEvent(ctx, log, "viewErr", ve)
982 case vr := <-viewResetc:
983 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
984 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
986 writer.xsendEvent(ctx, log, "viewReset", vr)
989 if id != v.Request.ID {
990 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
992 if reqctxcancel != nil {
998 case req := <-sse.Request:
1003 v = view{req, time.Time{}, false, false, nil, nil}
1007 reqctx, reqctxcancel = context.WithCancel(ctx)
1009 stop := func() (stop bool) {
1010 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1015 err = rtx.Rollback()
1016 log.Check(err, "rolling back transaction")
1019 acc.WithRLock(func() {
1020 rtx, err = acc.DB.Begin(reqctx, false)
1027 if errors.Is(err, context.Canceled) {
1030 err := fmt.Errorf("begin transaction: %v", err)
1031 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1032 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1036 // Reset view state for new query.
1037 if req.ViewID != v.Request.ViewID {
1038 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1039 if req.Query.Filter.MailboxChildrenIncluded {
1040 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1042 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1046 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1055 xprocessChanges(comm.Get())
1058 // Work around go vet, it doesn't see defer cancelDrain.
1059 if reqctxcancel != nil {
1067// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1068// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1069// mailboxIDs must or must not match. mailboxPrefixes is for use with
1070// xgatherMailboxIDs to gather children of the mailboxIDs.
1071func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1072 matchMailboxes = true
1073 mailboxIDs = map[int64]bool{}
1074 if f.MailboxID == -1 {
1075 matchMailboxes = false
1076 // Add the trash, junk and account rejects mailbox.
1077 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1078 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1079 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1080 mailboxIDs[mb.ID] = true
1084 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1085 } else if f.MailboxID > 0 {
1086 mb := store.Mailbox{ID: f.MailboxID}
1088 xcheckf(ctx, err, "get mailbox")
1089 mailboxIDs[f.MailboxID] = true
1090 mailboxPrefixes = []string{mb.Name + "/"}
1095// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1096// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1097func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1098 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1099 if len(mailboxPrefixes) == 0 {
1102 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
1103 for _, p := range mailboxPrefixes {
1104 if strings.HasPrefix(mb.Name, p) {
1105 mailboxIDs[mb.ID] = true
1111 xcheckf(ctx, err, "gathering mailboxes")
1114// matchesMailbox returns whether a mailbox matches the view.
1115func (v view) matchesMailbox(mailboxID int64) bool {
1116 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1119// inRange returns whether m is within the range for the view, whether a change for
1120// this message should be sent to the client so it can update its state.
1121func (v view) inRange(m store.Message) bool {
1122 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1125// matches checks if the message, identified by either messageID or mailboxID+UID,
1126// is in the current "view" (i.e. passing the filters, and if checkRange is set
1127// also if within the range of sent messages based on sort order and the last seen
1128// message). getmsg retrieves the message, which may be necessary depending on the
1129// active filters. Used to determine if a store.Change with a new message should be
1130// sent, and for the destination and anchor messages in view requests.
1131func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1133 ensureMessage := func() bool {
1134 if m.ID == 0 && rerr == nil {
1135 m, rerr = getmsg(messageID, mailboxID, uid)
1140 q := v.Request.Query
1142 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1145 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1148 // note: anchorMessageID is not relevant for matching.
1149 flagfilter := q.flagFilterFn()
1150 if flagfilter != nil && !flagfilter(flags, keywords) {
1154 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1157 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1161 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1164 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1168 state := msgState{acc: acc}
1170 if rerr == nil && state.err != nil {
1176 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1177 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1181 envFilter := q.envFilterFn(log, &state)
1182 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1186 headerFilter := q.headerFilterFn(log, &state)
1187 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1191 wordsFilter := q.wordsFilterFn(log, &state)
1192 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1196 // Now check that we are either within the sorting order, or "last" was sent.
1197 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1203type msgResp struct {
1204 err error // If set, an error happened and fields below are not set.
1205 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1206 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1207 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1208 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1211// viewRequestTx executes a request (query with filters, pagination) by
1212// launching a new goroutine with queryMessages, receiving results as msgResp,
1213// and sending Event* to the SSE connection.
1215// It always closes tx.
1216func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1218 err := tx.Rollback()
1219 log.Check(err, "rolling back query transaction")
1221 donec <- v.Request.ID
1223 x := recover() // Should not happen, but don't take program down if it does.
1225 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1227 metrics.PanicInc(metrics.Webmailrequest)
1231 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1232 var parsedMessage *ParsedMessage
1235 var immediate bool // No waiting, flush immediate.
1236 t := time.NewTimer(300 * time.Millisecond)
1239 sendViewMsgs := func(force bool) {
1240 if len(msgitems) == 0 && !force {
1245 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1248 t.Reset(300 * time.Millisecond)
1251 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1253 mrc := make(chan msgResp, 1)
1254 go queryMessages(ctx, log, acc, tx, v, mrc)
1258 case mr, ok := <-mrc:
1261 // Empty message list signals this query is done.
1262 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1267 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1271 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1280 msgitems = append(msgitems, mr.mil)
1282 parsedMessage = mr.pm
1289 if len(msgitems) == 0 {
1290 // Nothing to send yet. We'll send immediately when the next message comes in.
1299// queryMessages executes a query, with filter, pagination, destination message id
1300// to fetch (the message that the client had in view and wants to display again).
1301// It sends on msgc, with several types of messages: errors, whether the view is
1302// reset due to missing AnchorMessageID, and when the end of the view was reached
1303// and/or for a message.
1304func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp) {
1306 x := recover() // Should not happen, but don't take program down if it does.
1308 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1310 mrc <- msgResp{err: fmt.Errorf("query failed")}
1311 metrics.PanicInc(metrics.Webmailquery)
1317 query := v.Request.Query
1318 page := v.Request.Page
1320 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1322 checkMessage := func(id int64) (valid bool, rerr error) {
1323 m := store.Message{ID: id}
1325 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1327 } else if err != nil {
1330 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1336 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1337 if page.AnchorMessageID > 0 {
1338 // Check if message exists and (still) matches the filter.
1339 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1340 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1341 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1344 mrc <- msgResp{reset: true}
1345 page.AnchorMessageID = 0
1349 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1350 // it instead of continuing to send message till the end of the view.
1351 if page.DestMessageID > 0 {
1352 if valid, err := checkMessage(page.DestMessageID); err != nil {
1353 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1356 page.DestMessageID = 0
1360 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1362 q := bstore.QueryTx[store.Message](tx)
1363 q.FilterEqual("Expunged", false)
1364 if len(v.mailboxIDs) > 0 {
1365 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1366 // Should result in fast indexed query.
1367 for mbID := range v.mailboxIDs {
1368 q.FilterNonzero(store.Message{MailboxID: mbID})
1371 idsAny := make([]any, 0, len(v.mailboxIDs))
1372 for mbID := range v.mailboxIDs {
1373 idsAny = append(idsAny, mbID)
1375 if v.matchMailboxIDs {
1376 q.FilterEqual("MailboxID", idsAny...)
1378 q.FilterNotEqual("MailboxID", idsAny...)
1383 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1384 if page.AnchorMessageID > 0 {
1386 q.FilterFn(func(m store.Message) bool {
1390 seen = m.ID == page.AnchorMessageID
1395 // We may be added filters the the query below. The FilterFn signature does not
1396 // implement reporting errors, or anything else, just a bool. So when making the
1397 // filter functions, we give them a place to store parsed message state, and an
1398 // error. We check the error during and after query execution.
1399 state := msgState{acc: acc}
1402 flagfilter := query.flagFilterFn()
1403 if flagfilter != nil {
1404 q.FilterFn(func(m store.Message) bool {
1405 return flagfilter(m.Flags, m.Keywords)
1409 if query.Filter.Oldest != nil {
1410 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1412 if query.Filter.Newest != nil {
1413 q.FilterLessEqual("Received", *query.Filter.Newest)
1416 if query.Filter.SizeMin > 0 {
1417 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1419 if query.Filter.SizeMax > 0 {
1420 q.FilterLessEqual("Size", query.Filter.SizeMax)
1423 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1424 if attachmentFilter != nil {
1425 q.FilterFn(attachmentFilter)
1428 envFilter := query.envFilterFn(log, &state)
1429 if envFilter != nil {
1430 q.FilterFn(envFilter)
1433 headerFilter := query.headerFilterFn(log, &state)
1434 if headerFilter != nil {
1435 q.FilterFn(headerFilter)
1438 wordsFilter := query.wordsFilterFn(log, &state)
1439 if wordsFilter != nil {
1440 q.FilterFn(wordsFilter)
1444 q.SortAsc("Received")
1446 q.SortDesc("Received")
1448 found := page.DestMessageID <= 0
1451 err := q.ForEach(func(m store.Message) error {
1452 // Check for an error in one of the filters, propagate it.
1453 if state.err != nil {
1457 if have >= page.Count && found || have > 10000 {
1459 return bstore.StopForEach
1462 if _, ok := v.threadIDs[m.ThreadID]; ok {
1463 // Message was already returned as part of a thread.
1467 var pm *ParsedMessage
1468 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1469 // For threads, if there was not DestMessageID, we may be getting the newest
1470 // message. For an initial view, this isn't necessarily the first the user is
1471 // expected to read first, that would be the first unread, which we'll get below
1472 // when gathering the thread.
1474 xpm, err := parsedMessage(log, m, &state, true, false)
1476 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1481 mi, err := messageItem(log, m, &state)
1483 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1485 mil := []MessageItem{mi}
1486 if query.Threading != ThreadOff {
1487 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0)
1489 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1495 mil = append(mil, more...)
1496 v.threadIDs[m.ThreadID] = struct{}{}
1498 // Calculate how many messages the frontend is going to show, and only count those as returned.
1499 collapsed := map[int64]bool{}
1500 for _, mi := range mil {
1501 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1503 unread := map[int64]bool{} // Propagated to thread root.
1504 if query.Threading == ThreadUnread {
1505 for _, mi := range mil {
1511 for _, id := range m.ThreadParentIDs {
1516 for _, mi := range mil {
1520 for _, id := range m.ThreadParentIDs {
1521 if _, ok := collapsed[id]; ok {
1526 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1533 mrc <- msgResp{mil: mil, pm: pm}
1536 // Check for an error in one of the filters again. Check in ForEach would not
1537 // trigger if the last message has the error.
1538 if err == nil && state.err != nil {
1542 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1546 mrc <- msgResp{viewEnd: true}
1550func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool) ([]MessageItem, *ParsedMessage, error) {
1551 if m.ThreadID == 0 {
1552 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1553 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1556 // Fetch other messages for this thread.
1557 qt := bstore.QueryTx[store.Message](tx)
1558 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1559 qt.FilterEqual("Expunged", false)
1560 qt.FilterNotEqual("ID", m.ID)
1562 tml, err := qt.List()
1564 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1567 var mil []MessageItem
1568 var pm *ParsedMessage
1569 var firstUnread bool
1570 for _, tm := range tml {
1571 err := func() error {
1572 xstate := msgState{acc: acc}
1573 defer xstate.clear()
1575 mi, err := messageItem(log, tm, &xstate)
1577 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1579 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1583 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1585 mil = append(mil, mi)
1587 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1588 firstUnread = !tm.Seen
1589 xpm, err := parsedMessage(log, tm, &xstate, true, false)
1591 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1598 return nil, nil, err
1602 // Finally, the message that caused us to gather this thread (which is likely the
1603 // most recent message in the thread) could be the only unread message.
1604 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1605 xstate := msgState{acc: acc}
1606 defer xstate.clear()
1607 xpm, err := parsedMessage(log, m, &xstate, true, false)
1609 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1617// While checking the filters on a message, we may need to get more message
1618// details as each filter passes. We check the filters that need the basic
1619// information first, and load and cache more details for the next filters.
1620// msgState holds parsed details for a message, it is updated while filtering,
1621// with more information or reset for a next message.
1622type msgState struct {
1623 acc *store.Account // Never changes during lifetime.
1624 err error // Once set, doesn't get cleared.
1626 part *message.Part // Will be without Reader when msgr is nil.
1627 msgr *store.MsgReader
1630func (ms *msgState) clear() {
1635 *ms = msgState{acc: ms.acc, err: ms.err}
1638func (ms *msgState) ensureMsg(m store.Message) {
1639 if m.ID != ms.m.ID {
1645func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1650 if m.ParsedBuf == nil {
1651 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1655 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1656 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1661 if withMsgReader && ms.msgr == nil {
1662 ms.msgr = ms.acc.MessageReader(m)
1663 ms.part.SetReaderAt(ms.msgr)
1666 return ms.part != nil
1669// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1670// filters for a query. A nil function is returned if there are no flags to filter
1672func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1673 labels := map[string]bool{}
1674 for _, k := range q.Filter.Labels {
1677 for _, k := range q.NotFilter.Labels {
1681 if len(labels) == 0 {
1685 var mask, flags store.Flags
1686 systemflags := map[string][]*bool{
1687 `\answered`: {&mask.Answered, &flags.Answered},
1688 `\flagged`: {&mask.Flagged, &flags.Flagged},
1689 `\deleted`: {&mask.Deleted, &flags.Deleted},
1690 `\seen`: {&mask.Seen, &flags.Seen},
1691 `\draft`: {&mask.Draft, &flags.Draft},
1692 `$junk`: {&mask.Junk, &flags.Junk},
1693 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1694 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1695 `$phishing`: {&mask.Phishing, &flags.Phishing},
1696 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1698 keywords := map[string]bool{}
1699 for k, v := range labels {
1700 k = strings.ToLower(k)
1701 if mf, ok := systemflags[k]; ok {
1708 return func(msgFlags store.Flags, msgKeywords []string) bool {
1710 if f.Set(mask, msgFlags) != flags {
1713 for k, v := range keywords {
1714 if slices.Contains(msgKeywords, k) != v {
1722// attachmentFilterFn returns a function that filters for the attachment-related
1723// filter from the query. A nil function is returned if there are attachment
1725func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1726 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1730 return func(m store.Message) bool {
1731 if !state.ensurePart(m, false) {
1734 types, err := attachmentTypes(log, m, state)
1739 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1743var attachmentMimetypes = map[string]AttachmentType{
1744 "application/pdf": AttachmentPDF,
1745 "application/zip": AttachmentArchive,
1746 "application/x-rar-compressed": AttachmentArchive,
1747 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1748 "application/vnd.ms-excel": AttachmentSpreadsheet,
1749 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1750 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1751 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1752 "application/vnd.ms-powerpoint": AttachmentPresentation,
1753 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1755var attachmentExtensions = map[string]AttachmentType{
1756 ".pdf": AttachmentPDF,
1757 ".zip": AttachmentArchive,
1758 ".tar": AttachmentArchive,
1759 ".tgz": AttachmentArchive,
1760 ".tar.gz": AttachmentArchive,
1761 ".tbz2": AttachmentArchive,
1762 ".tar.bz2": AttachmentArchive,
1763 ".tar.lz": AttachmentArchive,
1764 ".tlz": AttachmentArchive,
1765 ".tar.xz": AttachmentArchive,
1766 ".txz": AttachmentArchive,
1767 ".tar.zst": AttachmentArchive,
1768 ".tar.lz4": AttachmentArchive,
1769 ".7z": AttachmentArchive,
1770 ".rar": AttachmentArchive,
1771 ".ods": AttachmentSpreadsheet,
1772 ".xls": AttachmentSpreadsheet,
1773 ".xlsx": AttachmentSpreadsheet,
1774 ".odt": AttachmentDocument,
1775 ".doc": AttachmentDocument,
1776 ".docx": AttachmentDocument,
1777 ".odp": AttachmentPresentation,
1778 ".ppt": AttachmentPresentation,
1779 ".pptx": AttachmentPresentation,
1782func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1783 types := map[AttachmentType]bool{}
1785 pm, err := parsedMessage(log, m, state, false, false)
1787 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1789 for _, a := range pm.attachments {
1790 if a.Part.MediaType == "IMAGE" {
1791 types[AttachmentImage] = true
1794 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1795 if t, ok := attachmentMimetypes[mt]; ok {
1797 } else if ext := filepath.Ext(tryDecodeParam(log, a.Part.ContentTypeParams["name"])); ext != "" {
1798 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1806 if len(types) == 0 {
1807 types[AttachmentNone] = true
1809 types[AttachmentAny] = true
1814// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1815// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1816// clash with SMTP envelope) for the query. A nil function is returned if no
1817// filtering is needed.
1818func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1819 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1823 lower := func(l []string) []string {
1827 r := make([]string, len(l))
1828 for i, s := range l {
1829 r[i] = strings.ToLower(s)
1834 filterSubject := lower(q.Filter.Subject)
1835 notFilterSubject := lower(q.NotFilter.Subject)
1836 filterFrom := lower(q.Filter.From)
1837 notFilterFrom := lower(q.NotFilter.From)
1838 filterTo := lower(q.Filter.To)
1839 notFilterTo := lower(q.NotFilter.To)
1841 return func(m store.Message) bool {
1842 if !state.ensurePart(m, false) {
1846 var env message.Envelope
1847 if state.part.Envelope != nil {
1848 env = *state.part.Envelope
1851 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1852 subject := strings.ToLower(env.Subject)
1853 for _, s := range filterSubject {
1854 if !strings.Contains(subject, s) {
1858 for _, s := range notFilterSubject {
1859 if strings.Contains(subject, s) {
1865 contains := func(textLower []string, l []message.Address, all bool) bool {
1867 for _, s := range textLower {
1868 for _, a := range l {
1869 name := strings.ToLower(a.Name)
1870 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
1871 if strings.Contains(name, s) || strings.Contains(addr, s) {
1885 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
1888 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
1891 if len(filterTo) > 0 || len(notFilterTo) > 0 {
1892 to := append(append(append([]message.Address{}, env.To...), env.CC...), env.BCC...)
1893 if len(filterTo) > 0 && !contains(filterTo, to, true) {
1896 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
1904// headerFilterFn returns a function that filters for the header filters in the
1905// query. A nil function is returned if there are no header filters.
1906func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1907 if len(q.Filter.Headers) == 0 {
1911 lowerValues := make([]string, len(q.Filter.Headers))
1912 for i, t := range q.Filter.Headers {
1913 lowerValues[i] = strings.ToLower(t[1])
1916 return func(m store.Message) bool {
1917 if !state.ensurePart(m, true) {
1920 hdr, err := state.part.Header()
1922 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
1927 for i, t := range q.Filter.Headers {
1931 if v == "" && len(l) > 0 {
1934 for _, e := range l {
1935 if strings.Contains(strings.ToLower(e), v) {
1945// wordFiltersFn returns a function that applies the word filters of the query. A
1946// nil function is returned when query does not contain a word filter.
1947func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1948 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
1952 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
1954 return func(m store.Message) bool {
1955 if !state.ensurePart(m, true) {
1959 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
1960 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)