3// todo: may want to add some json omitempty tags to MessageItem, or Message to reduce json size, or just have smaller types that send only the fields that are needed.
8 cryptrand "crypto/rand"
24 "github.com/mjl-/bstore"
25 "github.com/mjl-/sherpa"
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/message"
29 "github.com/mjl-/mox/metrics"
30 "github.com/mjl-/mox/mlog"
31 "github.com/mjl-/mox/mox-"
32 "github.com/mjl-/mox/moxvar"
33 "github.com/mjl-/mox/smtp"
34 "github.com/mjl-/mox/store"
37// Request is a request to an SSE connection to send messages, either for a new
38// view, to continue with an existing view, or to a cancel an ongoing request.
42 SSEID int64 // SSE connection.
44 // To indicate a request is a continuation (more results) of the previous view.
45 // Echoed in events, client checks if it is getting results for the latest request.
48 // If set, this request and its view are canceled. A new view must be started.
58 ThreadOff ThreadMode = "off"
59 ThreadOn ThreadMode = "on"
60 ThreadUnread ThreadMode = "unread"
63// Query is a request for messages that match filters, in a given order.
65 OrderAsc bool // Order by received ascending or desending.
71// AttachmentType is for filtering by attachment type.
72type AttachmentType string
75 AttachmentIndifferent AttachmentType = ""
76 AttachmentNone AttachmentType = "none"
77 AttachmentAny AttachmentType = "any"
78 AttachmentImage AttachmentType = "image" // png, jpg, gif, ...
79 AttachmentPDF AttachmentType = "pdf"
80 AttachmentArchive AttachmentType = "archive" // zip files, tgz, ...
81 AttachmentSpreadsheet AttachmentType = "spreadsheet" // ods, xlsx, ...
82 AttachmentDocument AttachmentType = "document" // odt, docx, ...
83 AttachmentPresentation AttachmentType = "presentation" // odp, pptx, ...
86// Filter selects the messages to return. Fields that are set must all match,
87// for slices each element by match ("and").
89 // If -1, then all mailboxes except Trash/Junk/Rejects. Otherwise, only active if > 0.
92 // If true, also submailboxes are included in the search.
93 MailboxChildrenIncluded bool
95 // In case client doesn't know mailboxes and their IDs yet. Only used during sse
96 // connection setup, where it is turned into a MailboxID. Filtering only looks at
100 Words []string // Case insensitive substring match for each string.
102 To []string // Including Cc and Bcc.
106 Attachments AttachmentType
108 Headers [][2]string // Header values can be empty, it's a check if the header is present, regardless of value.
113// NotFilter matches messages that don't match these fields.
114type NotFilter struct {
119 Attachments AttachmentType
123// Page holds pagination parameters for a request.
125 // Start returning messages after this ID, if > 0. For pagination, fetching the
126 // next set of messages.
127 AnchorMessageID int64
129 // Number of messages to return, must be >= 1, we never return more than 10000 for
133 // If > 0, return messages until DestMessageID is found. More than Count messages
134 // can be returned. For long-running searches, it may take a while before this
139// todo: MessageAddress and MessageEnvelope into message.Address and message.Envelope.
141// MessageAddress is like message.Address, but with a dns.Domain, with unicode name
143type MessageAddress struct {
144 Name string // Free-form name for display in mail applications.
145 User string // Localpart, encoded.
149// MessageEnvelope is like message.Envelope, as used in message.Part, but including
150// unicode host names for IDNA names.
151type MessageEnvelope struct {
152 // todo: should get sherpadoc to understand type embeds and embed the non-MessageAddress fields from message.Envelope.
155 From []MessageAddress
156 Sender []MessageAddress
157 ReplyTo []MessageAddress
165// MessageItem is sent by queries, it has derived information analyzed from
166// message.Part, made for the needs of the message items in the message list.
168type MessageItem struct {
169 Message store.Message // Without ParsedBuf and MsgPrefix, for size. With Preview, even if it isn't stored yet in the database.
170 Envelope MessageEnvelope
171 Attachments []Attachment
174 MatchQuery bool // If message does not match query, it can still be included because of threading.
175 MoreHeaders [][2]string // All headers from store.Settings.ShowHeaders that are present.
178// ParsedMessage has more parsed/derived information about a message, intended
179// for rendering the (contents of the) message. Information from MessageItem is
181type ParsedMessage struct {
184 Headers map[string][]string
185 ViewMode store.ViewMode
187 Texts []string // Contents of text parts, can be empty.
189 // Whether there is an HTML part. The webclient renders HTML message parts through
190 // an iframe and a separate request with strict CSP headers to prevent script
191 // execution and loading of external resources, which isn't possible when loading
192 // in iframe with inline HTML because not all browsers support the iframe csp
196 ListReplyAddress *MessageAddress // From List-Post.
198 TextPaths [][]int // Paths to text parts.
199 HTMLPath []int // Path to HTML part.
201 // Information used by MessageItem, not exported in this type.
202 envelope MessageEnvelope
203 attachments []Attachment
208// EventStart is the first message sent on an SSE connection, giving the client
209// basic data to populate its UI. After this event, messages will follow quickly in
210// an EventViewMsgs event.
211type EventStart struct {
213 LoginAddress MessageAddress
214 Addresses []MessageAddress
215 DomainAddressConfigs map[string]DomainAddressConfig // ASCII domain to address config.
217 Mailboxes []store.Mailbox
218 RejectsMailbox string
219 Settings store.Settings
220 AccountPath string // If nonempty, the path on same host to webaccount interface.
224// DomainAddressConfig has the address (localpart) configuration for a domain, so
225// the webmail client can decide if an address matches the addresses of the
227type DomainAddressConfig struct {
228 LocalpartCatchallSeparators []string // Can be empty.
229 LocalpartCaseSensitive bool
232// EventViewMsgs contains messages for a view, possibly a continuation of an
233// earlier list of messages.
234type EventViewMsgs struct {
238 // If empty, this was the last message for the request. If non-empty, a list of
239 // thread messages. Each with the first message being the reason this thread is
240 // included and can be used as AnchorID in followup requests. If the threading mode
241 // is "off" in the query, there will always be only a single message. If a thread
242 // is sent, all messages in the thread are sent, including those that don't match
243 // the query (e.g. from another mailbox). Threads can be displayed based on the
244 // ThreadParentIDs field, with possibly slightly different display based on field
245 // ThreadMissingLink.
246 MessageItems [][]MessageItem
248 // If set, will match the target page.DestMessageID from the request.
249 ParsedMessage *ParsedMessage
251 // If set, there are no more messages in this view at this moment. Messages can be
252 // added, typically via Change messages, e.g. for new deliveries.
256// EventViewErr indicates an error during a query for messages. The request is
257// aborted, no more request-related messages will be sent until the next request.
258type EventViewErr struct {
261 Err string // To be displayed in client.
262 err error // Original message, for checking against context.Canceled.
265// EventViewReset indicates that a request for the next set of messages in a few
266// could not be fulfilled, e.g. because the anchor message does not exist anymore.
267// The client should clear its list of messages. This can happen before
268// EventViewMsgs events are sent.
269type EventViewReset struct {
274// EventViewChanges contain one or more changes relevant for the client, either
275// with new mailbox total/unseen message counts, or messages added/removed/modified
276// (flags) for the current view.
277type EventViewChanges struct {
279 Changes [][2]any // The first field of [2]any is a string, the second of the Change types below.
282// ChangeMsgAdd adds a new message and possibly its thread to the view.
283type ChangeMsgAdd struct {
285 MessageItems []MessageItem
288// ChangeMsgRemove removes one or more messages from the view.
289type ChangeMsgRemove struct {
290 store.ChangeRemoveUIDs
293// ChangeMsgFlags updates flags for one message.
294type ChangeMsgFlags struct {
298// ChangeMsgThread updates muted/collapsed fields for one message.
299type ChangeMsgThread struct {
303// ChangeMailboxRemove indicates a mailbox was removed, including all its messages.
304type ChangeMailboxRemove struct {
305 store.ChangeRemoveMailbox
308// ChangeMailboxAdd indicates a new mailbox was added, initially without any messages.
309type ChangeMailboxAdd struct {
310 Mailbox store.Mailbox
313// ChangeMailboxRename indicates a mailbox was renamed. Its ID stays the same.
314// It could be under a new parent.
315type ChangeMailboxRename struct {
316 store.ChangeRenameMailbox
319// ChangeMailboxCounts set new total and unseen message counts for a mailbox.
320type ChangeMailboxCounts struct {
321 store.ChangeMailboxCounts
324// ChangeMailboxSpecialUse has updated special-use flags for a mailbox.
325type ChangeMailboxSpecialUse struct {
326 store.ChangeMailboxSpecialUse
329// ChangeMailboxKeywords has an updated list of keywords for a mailbox, e.g. after
330// a message was added with a keyword that wasn't in the mailbox yet.
331type ChangeMailboxKeywords struct {
332 store.ChangeMailboxKeywords
335// View holds the information about the returned data for a query. It is used to
336// determine whether mailbox changes should be sent to the client, we only send
337// addition/removal/flag-changes of messages that are in view, or would extend it
338// if the view is at the end of the results.
342 // Received of last message we sent to the client. We use it to decide if a newly
343 // delivered message is within the view and the client should get a notification.
344 LastMessageReceived time.Time
346 // If set, the last message in the query view has been sent. There is no need to do
347 // another query, it will not return more data. Used to decide if an event for a
348 // new message should be sent.
351 // Whether message must or must not match mailboxIDs.
353 // Mailboxes to match, can be multiple, for matching children. If empty, there is
354 // no filter on mailboxes.
355 mailboxIDs map[int64]bool
357 // Threads sent to client. New messages for this thread are also sent, regardless
358 // of regular query matching, so also for other mailboxes. If the user (re)moved
359 // all messages of a thread, they may still receive events for the thread. Only
360 // filled when query with threading not off.
361 threadIDs map[int64]struct{}
364// sses tracks all sse connections, and access to them.
371// sse represents an sse connection.
373 ID int64 // Also returned in EventStart and used in Request to identify the request.
374 AccountName string // Used to check the authenticated user has access to the SSE connection.
375 Request chan Request // Goroutine will receive requests from here, coming from API calls.
378// called by the goroutine when the connection is closed or breaks.
379func (sse sse) unregister() {
382 delete(sses.m, sse.ID)
384 // Drain any pending requests, preventing blocked goroutines from API calls.
394func sseRegister(accountName string) sse {
398 v := sse{sses.gen, accountName, make(chan Request, 1)}
403// sseGet returns a reference to an existing connection if it exists and user
405func sseGet(id int64, accountName string) (sse, bool) {
409 if s.AccountName != accountName {
415// ssetoken is a temporary token that has not yet been used to start an SSE
416// connection. Created by Token, consumed by a new SSE connection.
417type ssetoken struct {
418 token string // Uniquely generated.
420 address string // Address used to authenticate in call that created the token.
421 sessionToken store.SessionToken // SessionToken that created this token, checked before sending updates.
425// ssetokens maintains unused tokens. We have just one, but it's a type so we
426// can define methods.
427type ssetokens struct {
429 accountTokens map[string][]ssetoken // Account to max 10 most recent tokens, from old to new.
430 tokens map[string]ssetoken // Token to details, for finding account for a token.
433var sseTokens = ssetokens{
434 accountTokens: map[string][]ssetoken{},
435 tokens: map[string]ssetoken{},
438// xgenerate creates and saves a new token. It ensures no more than 10 tokens
439// per account exist, removing old ones if needed.
440func (x *ssetokens) xgenerate(ctx context.Context, accName, address string, sessionToken store.SessionToken) string {
441 buf := make([]byte, 16)
442 _, err := cryptrand.Read(buf)
443 xcheckf(ctx, err, "generating token")
444 st := ssetoken{base64.RawURLEncoding.EncodeToString(buf), accName, address, sessionToken, time.Now().Add(time.Minute)}
448 n := len(x.accountTokens[accName])
450 for _, ost := range x.accountTokens[accName][:n-9] {
451 delete(x.tokens, ost.token)
453 copy(x.accountTokens[accName], x.accountTokens[accName][n-9:])
454 x.accountTokens[accName] = x.accountTokens[accName][:9]
456 x.accountTokens[accName] = append(x.accountTokens[accName], st)
457 x.tokens[st.token] = st
461// check verifies a token, and consumes it if valid.
462func (x *ssetokens) check(token string) (string, string, store.SessionToken, bool, error) {
466 st, ok := x.tokens[token]
468 return "", "", "", false, nil
470 delete(x.tokens, token)
471 if i := slices.Index(x.accountTokens[st.accName], st); i < 0 {
472 return "", "", "", false, errors.New("internal error, could not find token in account")
474 copy(x.accountTokens[st.accName][i:], x.accountTokens[st.accName][i+1:])
475 x.accountTokens[st.accName] = x.accountTokens[st.accName][:len(x.accountTokens[st.accName])-1]
476 if len(x.accountTokens[st.accName]) == 0 {
477 delete(x.accountTokens, st.accName)
480 if time.Now().After(st.validUntil) {
481 return "", "", "", false, nil
483 return st.accName, st.address, st.sessionToken, true, nil
486// ioErr is panicked on i/o errors in serveEvents and handled in a defer.
491// ensure we have a non-nil moreHeaders, taking it from Settings.
492func ensureMoreHeaders(tx *bstore.Tx, moreHeaders []string) ([]string, error) {
493 if moreHeaders != nil {
494 return moreHeaders, nil
497 s := store.Settings{ID: 1}
498 if err := tx.Get(&s); err != nil {
499 return nil, fmt.Errorf("get settings: %v", err)
501 moreHeaders = s.ShowHeaders
502 if moreHeaders == nil {
503 moreHeaders = []string{} // Ensure we won't get Settings again next call.
505 return moreHeaders, nil
508// serveEvents serves an SSE connection. Authentication is done through a query
509// string parameter "singleUseToken", a one-time-use token returned by the Token
511func serveEvents(ctx context.Context, log mlog.Log, accountPath string, w http.ResponseWriter, r *http.Request) {
512 if r.Method != "GET" {
513 http.Error(w, "405 - method not allowed - use get", http.StatusMethodNotAllowed)
517 flusher, ok := w.(http.Flusher)
519 log.Error("internal error: ResponseWriter not a http.Flusher")
520 http.Error(w, "500 - internal error - cannot sync to http connection", 500)
525 token := q.Get("singleUseToken")
527 http.Error(w, "400 - bad request - missing credentials", http.StatusBadRequest)
530 accName, address, sessionToken, ok, err := sseTokens.check(token)
532 http.Error(w, "500 - internal server error - "+err.Error(), http.StatusInternalServerError)
536 http.Error(w, "400 - bad request - bad token", http.StatusBadRequest)
539 if _, err := store.SessionUse(ctx, log, accName, sessionToken, ""); err != nil {
540 http.Error(w, "400 - bad request - bad session token", http.StatusBadRequest)
544 // We can simulate a slow SSE connection. It seems firefox doesn't slow down
545 // incoming responses with its slow-network similation.
546 var waitMin, waitMax time.Duration
547 waitMinMsec := q.Get("waitMinMsec")
548 waitMaxMsec := q.Get("waitMaxMsec")
549 if waitMinMsec != "" && waitMaxMsec != "" {
550 if v, err := strconv.ParseInt(waitMinMsec, 10, 64); err != nil {
551 http.Error(w, "400 - bad request - parsing waitMinMsec: "+err.Error(), http.StatusBadRequest)
554 waitMin = time.Duration(v) * time.Millisecond
557 if v, err := strconv.ParseInt(waitMaxMsec, 10, 64); err != nil {
558 http.Error(w, "400 - bad request - parsing waitMaxMsec: "+err.Error(), http.StatusBadRequest)
561 waitMax = time.Duration(v) * time.Millisecond
565 // Parse the request with initial mailbox/search criteria.
567 dec := json.NewDecoder(strings.NewReader(q.Get("request")))
568 dec.DisallowUnknownFields()
569 if err := dec.Decode(&req); err != nil {
570 http.Error(w, "400 - bad request - bad request query string parameter: "+err.Error(), http.StatusBadRequest)
572 } else if req.Page.Count <= 0 {
573 http.Error(w, "400 - bad request - request cannot have Page.Count 0", http.StatusBadRequest)
576 if req.Query.Threading == "" {
577 req.Query.Threading = ThreadOff
580 var writer *eventWriter
582 metricSSEConnections.Inc()
583 defer metricSSEConnections.Dec()
585 // Below here, error handling cause through xcheckf, which panics with
586 // *sherpa.Error, after which we send an error event to the client. We can also get
587 // an *ioErr when the connection is broken.
593 if err, ok := x.(*sherpa.Error); ok {
594 writer.xsendEvent(ctx, log, "fatalErr", err.Message)
595 } else if _, ok := x.(ioErr); ok {
598 log.WithContext(ctx).Error("serveEvents panic", slog.Any("err", x))
600 metrics.PanicInc(metrics.Webmail)
606 h.Set("Content-Type", "text/event-stream")
607 h.Set("Cache-Control", "no-cache")
609 // We'll be sending quite a bit of message data (text) in JSON (plenty duplicate
610 // keys), so should be quite compressible.
612 gz := mox.AcceptsGzip(r)
614 h.Set("Content-Encoding", "gzip")
615 out, _ = gzip.NewWriterLevel(w, gzip.BestSpeed)
619 out = httpFlusher{out, flusher}
621 // We'll be writing outgoing SSE events through writer.
622 writer = newEventWriter(out, waitMin, waitMax, accName, sessionToken)
625 // Fetch initial data.
626 acc, err := store.OpenAccount(log, accName, true)
627 xcheckf(ctx, err, "open account")
630 log.Check(err, "closing account")
632 comm := store.RegisterComm(acc)
633 defer comm.Unregister()
635 // List addresses that the client can use to send email from.
636 accConf, _ := acc.Conf()
637 loginAddr, err := smtp.ParseAddress(address)
638 xcheckf(ctx, err, "parsing login address")
639 _, _, _, dest, err := mox.LookupAddress(loginAddr.Localpart, loginAddr.Domain, false, false, false)
640 xcheckf(ctx, err, "looking up destination for login address")
641 loginName := accConf.FullName
642 if dest.FullName != "" {
643 loginName = dest.FullName
645 loginAddress := MessageAddress{Name: loginName, User: loginAddr.Localpart.String(), Domain: loginAddr.Domain}
646 var addresses []MessageAddress
647 for a, dest := range accConf.Destinations {
648 name := dest.FullName
650 name = accConf.FullName
652 var ma MessageAddress
653 if strings.HasPrefix(a, "@") {
654 dom, err := dns.ParseDomain(a[1:])
655 xcheckf(ctx, err, "parsing destination address for account")
656 ma = MessageAddress{Domain: dom}
658 addr, err := smtp.ParseAddress(a)
659 xcheckf(ctx, err, "parsing destination address for account")
660 ma = MessageAddress{Name: name, User: addr.Localpart.String(), Domain: addr.Domain}
662 addresses = append(addresses, ma)
664 // User is allowed to send using alias address as message From address. Webmail
665 // will choose it when replying to a message sent to that address.
666 aliasAddrs := map[MessageAddress]bool{}
667 for _, a := range accConf.Aliases {
668 if a.Alias.AllowMsgFrom {
669 ma := MessageAddress{User: a.Alias.LocalpartStr, Domain: a.Alias.Domain}
671 addresses = append(addresses, ma)
673 aliasAddrs[ma] = true
677 // We implicitly start a query. We use the reqctx for the transaction, because the
678 // transaction is passed to the query, which can be canceled.
679 reqctx, reqctxcancel := context.WithCancel(ctx)
681 // We also cancel in cancelDrain later on, but there is a brief window where the
682 // context wouldn't be canceled.
683 if reqctxcancel != nil {
689 // qtx is kept around during connection initialization, until we pass it off to the
690 // goroutine that starts querying for messages.
694 err := qtx.Rollback()
695 log.Check(err, "rolling back")
699 var mbl []store.Mailbox
700 settings := store.Settings{ID: 1}
702 // We only take the rlock when getting the tx.
703 acc.WithRLock(func() {
704 // Now a read-only transaction we'll use during the query.
705 qtx, err = acc.DB.Begin(reqctx, false)
706 xcheckf(ctx, err, "begin transaction")
708 mbl, err = bstore.QueryTx[store.Mailbox](qtx).FilterEqual("Expunged", false).List()
709 xcheckf(ctx, err, "list mailboxes")
711 err = qtx.Get(&settings)
712 xcheckf(ctx, err, "get settings")
715 // Find the designated mailbox if a mailbox name is set, or there are no filters at all.
716 var zerofilter Filter
717 var zeronotfilter NotFilter
718 var mailbox store.Mailbox
719 var mailboxPrefixes []string
720 var matchMailboxes bool
721 mailboxIDs := map[int64]bool{}
722 mailboxName := req.Query.Filter.MailboxName
723 if mailboxName != "" || reflect.DeepEqual(req.Query.Filter, zerofilter) && reflect.DeepEqual(req.Query.NotFilter, zeronotfilter) {
724 if mailboxName == "" {
725 mailboxName = "Inbox"
728 var inbox store.Mailbox
729 for _, e := range mbl {
730 if e.Name == mailboxName {
733 if e.Name == "Inbox" {
741 xcheckf(ctx, errors.New("inbox not found"), "setting initial mailbox")
743 req.Query.Filter.MailboxID = mailbox.ID
744 req.Query.Filter.MailboxName = ""
745 mailboxPrefixes = []string{mailbox.Name + "/"}
746 matchMailboxes = true
747 mailboxIDs[mailbox.ID] = true
749 matchMailboxes, mailboxIDs, mailboxPrefixes = xprepareMailboxIDs(ctx, qtx, req.Query.Filter, accConf.RejectsMailbox)
751 if req.Query.Filter.MailboxChildrenIncluded {
752 xgatherMailboxIDs(ctx, qtx, mailboxIDs, mailboxPrefixes)
755 // todo: write a last-event-id based on modseq? if last-event-id is present, we would have to send changes to mailboxes, messages, hopefully reducing the amount of data sent.
757 sse := sseRegister(acc.Name)
758 defer sse.unregister()
760 // Per-domain localpart config so webclient can decide if an address belongs to the account.
761 domainAddressConfigs := map[string]DomainAddressConfig{}
762 for _, a := range addresses {
763 dom, _ := mox.Conf.Domain(a.Domain)
764 domainAddressConfigs[a.Domain.ASCII] = DomainAddressConfig{dom.LocalpartCatchallSeparatorsEffective, dom.LocalpartCaseSensitive}
767 // Write first event, allowing client to fill its UI with mailboxes.
768 start := EventStart{sse.ID, loginAddress, addresses, domainAddressConfigs, mailbox.Name, mbl, accConf.RejectsMailbox, settings, accountPath, moxvar.Version}
769 writer.xsendEvent(ctx, log, "start", start)
771 // The goroutine doing the querying will send messages on these channels, which
772 // result in an event being written on the SSE connection.
773 viewMsgsc := make(chan EventViewMsgs)
774 viewErrc := make(chan EventViewErr)
775 viewResetc := make(chan EventViewReset)
776 donec := make(chan int64) // When request is done.
778 // Start a view, it determines if we send a change to the client. And start an
779 // implicit query for messages, we'll send the messages to the client which can
780 // fill its ui with messages.
781 v := view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
782 go viewRequestTx(reqctx, log, acc, qtx, v, viewMsgsc, viewErrc, viewResetc, donec)
783 qtx = nil // viewRequestTx closes qtx
785 // When canceling a query, we must drain its messages until it says it is done.
786 // Otherwise the sending goroutine would hang indefinitely on a channel send.
787 cancelDrain := func() {
788 if reqctxcancel != nil {
789 // Cancel the goroutine doing the querying.
797 // Drain events until done.
809 // If we stop and a query is in progress, we must drain the channel it will send on.
812 // Changes broadcasted by other connections on this account. If applicable for the
813 // connection/view, we send events.
814 xprocessChanges := func(changes []store.Change) {
815 taggedChanges := [][2]any{}
817 newPreviews := map[int64]string{}
818 defer storeNewPreviews(ctx, log, acc, newPreviews)
820 // We get a transaction first time we need it.
824 err := xtx.Rollback()
825 log.Check(err, "rolling back transaction")
828 ensureTx := func() error {
835 xtx, err = acc.DB.Begin(ctx, false)
838 // This getmsg will now only be called mailboxID+UID, not with messageID set.
839 // todo jmap: change store.Change* to include MessageID's? would mean duplication of information resulting in possible mismatch.
840 getmsg := func(messageID int64, mailboxID int64, uid store.UID) (store.Message, error) {
841 if err := ensureTx(); err != nil {
842 return store.Message{}, fmt.Errorf("transaction: %v", err)
844 return bstore.QueryTx[store.Message](xtx).FilterEqual("Expunged", false).FilterNonzero(store.Message{MailboxID: mailboxID, UID: uid}).Get()
847 // Additional headers from settings to add to MessageItems.
848 var moreHeaders []string
849 xmoreHeaders := func() []string {
851 xcheckf(ctx, err, "transaction")
853 moreHeaders, err = ensureMoreHeaders(xtx, moreHeaders)
854 xcheckf(ctx, err, "ensuring more headers")
858 // Return uids that are within range in view. Because the end has been reached, or
859 // because the UID is not after the last message.
860 xchangedUIDs := func(mailboxID int64, uids []store.UID, isRemove bool) (changedUIDs []store.UID) {
861 uidsAny := make([]any, len(uids))
862 for i, uid := range uids {
866 xcheckf(ctx, err, "transaction")
867 q := bstore.QueryTx[store.Message](xtx)
868 q.FilterNonzero(store.Message{MailboxID: mailboxID})
869 q.FilterEqual("UID", uidsAny...)
870 mbOK := v.matchesMailbox(mailboxID)
871 err = q.ForEach(func(m store.Message) error {
872 _, thread := v.threadIDs[m.ThreadID]
873 if thread || mbOK && (v.inRange(m) || isRemove && m.Expunged) {
874 changedUIDs = append(changedUIDs, m.UID)
878 xcheckf(ctx, err, "fetching messages for change")
882 // Forward changes that are relevant to the current view.
883 for _, change := range changes {
884 switch c := change.(type) {
885 case store.ChangeAddUID:
886 ok, err := v.matches(log, acc, true, 0, c.MailboxID, c.UID, c.Flags, c.Keywords, getmsg)
887 xcheckf(ctx, err, "matching new message against view")
888 m, err := getmsg(0, c.MailboxID, c.UID)
889 xcheckf(ctx, err, "get message")
890 _, thread := v.threadIDs[m.ThreadID]
895 state := msgState{acc: acc, log: log, newPreviews: newPreviews}
896 mi, err := messageItem(log, m, &state, xmoreHeaders())
898 xcheckf(ctx, err, "make messageitem")
901 mil := []MessageItem{mi}
902 if !thread && req.Query.Threading != ThreadOff {
904 xcheckf(ctx, err, "transaction")
905 more, _, err := gatherThread(log, xtx, acc, v, m, 0, false, xmoreHeaders(), newPreviews)
906 xcheckf(ctx, err, "gathering thread messages for id %d, thread %d", m.ID, m.ThreadID)
907 mil = append(mil, more...)
908 v.threadIDs[m.ThreadID] = struct{}{}
911 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgAdd", ChangeMsgAdd{c, mil}})
913 // If message extends the view, store it as such.
914 if !v.Request.Query.OrderAsc && m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && m.Received.After(v.LastMessageReceived) {
915 v.LastMessageReceived = m.Received
918 case store.ChangeRemoveUIDs:
921 // We may send changes for uids the client doesn't know, that's fine.
922 changedUIDs := xchangedUIDs(c.MailboxID, c.UIDs, true)
923 if len(changedUIDs) == 0 {
926 ch := ChangeMsgRemove{c}
927 ch.UIDs = changedUIDs
928 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgRemove", ch})
930 case store.ChangeFlags:
931 // We may send changes for uids the client doesn't know, that's fine.
932 changedUIDs := xchangedUIDs(c.MailboxID, []store.UID{c.UID}, false)
933 if len(changedUIDs) == 0 {
936 ch := ChangeMsgFlags{c}
937 ch.UID = changedUIDs[0]
938 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgFlags", ch})
940 case store.ChangeThread:
941 // Change in muted/collaped state, just always ship it.
942 taggedChanges = append(taggedChanges, [2]any{"ChangeMsgThread", ChangeMsgThread{c}})
944 case store.ChangeRemoveMailbox:
945 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRemove", ChangeMailboxRemove{c}})
947 case store.ChangeAddMailbox:
948 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxAdd", ChangeMailboxAdd{c.Mailbox}})
950 case store.ChangeRenameMailbox:
951 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxRename", ChangeMailboxRename{c}})
953 case store.ChangeMailboxCounts:
954 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxCounts", ChangeMailboxCounts{c}})
956 case store.ChangeMailboxSpecialUse:
957 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxSpecialUse", ChangeMailboxSpecialUse{c}})
959 case store.ChangeMailboxKeywords:
960 taggedChanges = append(taggedChanges, [2]any{"ChangeMailboxKeywords", ChangeMailboxKeywords{c}})
962 case store.ChangeAddSubscription, store.ChangeRemoveSubscription:
963 // Webmail does not care about subscriptions.
965 case store.ChangeAnnotation:
969 panic(fmt.Sprintf("missing case for change %T", c))
973 if len(taggedChanges) > 0 {
974 viewChanges := EventViewChanges{v.Request.ViewID, taggedChanges}
975 writer.xsendEvent(ctx, log, "viewChanges", viewChanges)
979 timer := time.NewTimer(5 * time.Minute) // For keepalives.
983 timer.Reset(5 * time.Minute)
987 pending := comm.Pending
993 case <-mox.Shutdown.Done():
994 writer.xsendEvent(ctx, log, "fatalErr", "server is shutting down")
995 // Work around go vet, it doesn't see defer cancelDrain.
996 if reqctxcancel != nil {
1002 _, err := fmt.Fprintf(out, ": keepalive\n\n")
1007 log.Errorx("write keepalive", err)
1008 // Work around go vet, it doesn't see defer cancelDrain.
1009 if reqctxcancel != nil {
1016 case vm := <-viewMsgsc:
1017 if vm.RequestID != v.Request.ID || vm.ViewID != v.Request.ViewID {
1018 panic(fmt.Sprintf("received msgs for view,request id %d,%d instead of %d,%d", vm.ViewID, vm.RequestID, v.Request.ViewID, v.Request.ID))
1023 if len(vm.MessageItems) > 0 {
1024 v.LastMessageReceived = vm.MessageItems[len(vm.MessageItems)-1][0].Message.Received
1026 writer.xsendEvent(ctx, log, "viewMsgs", vm)
1028 case ve := <-viewErrc:
1029 if ve.RequestID != v.Request.ID || ve.ViewID != v.Request.ViewID {
1030 panic(fmt.Sprintf("received err for view,request id %d,%d instead of %d,%d", ve.ViewID, ve.RequestID, v.Request.ViewID, v.Request.ID))
1032 if errors.Is(ve.err, context.Canceled) || mlog.IsClosed(ve.err) {
1033 // Work around go vet, it doesn't see defer cancelDrain.
1034 if reqctxcancel != nil {
1039 writer.xsendEvent(ctx, log, "viewErr", ve)
1041 case vr := <-viewResetc:
1042 if vr.RequestID != v.Request.ID || vr.ViewID != v.Request.ViewID {
1043 panic(fmt.Sprintf("received reset for view,request id %d,%d instead of %d,%d", vr.ViewID, vr.RequestID, v.Request.ViewID, v.Request.ID))
1045 writer.xsendEvent(ctx, log, "viewReset", vr)
1048 if id != v.Request.ID {
1049 panic(fmt.Sprintf("received done for request id %d instead of %d", id, v.Request.ID))
1051 if reqctxcancel != nil {
1057 case req := <-sse.Request:
1062 v = view{req, time.Time{}, false, false, nil, nil}
1066 reqctx, reqctxcancel = context.WithCancel(ctx)
1068 stop := func() (stop bool) {
1069 // rtx is handed off viewRequestTx below, but we must clean it up in case of errors.
1074 err = rtx.Rollback()
1075 log.Check(err, "rolling back transaction")
1078 acc.WithRLock(func() {
1079 rtx, err = acc.DB.Begin(reqctx, false)
1086 if errors.Is(err, context.Canceled) {
1089 err := fmt.Errorf("begin transaction: %v", err)
1090 viewErr := EventViewErr{v.Request.ViewID, v.Request.ID, err.Error(), err}
1091 writer.xsendEvent(ctx, log, "viewErr", viewErr)
1095 // Reset view state for new query.
1096 if req.ViewID != v.Request.ViewID {
1097 matchMailboxes, mailboxIDs, mailboxPrefixes := xprepareMailboxIDs(ctx, rtx, req.Query.Filter, accConf.RejectsMailbox)
1098 if req.Query.Filter.MailboxChildrenIncluded {
1099 xgatherMailboxIDs(ctx, rtx, mailboxIDs, mailboxPrefixes)
1101 v = view{req, time.Time{}, false, matchMailboxes, mailboxIDs, map[int64]struct{}{}}
1105 go viewRequestTx(reqctx, log, acc, rtx, v, viewMsgsc, viewErrc, viewResetc, donec)
1114 overflow, changes := comm.Get()
1116 writer.xsendEvent(ctx, log, "fatalErr", "out of sync, too many pending changes")
1119 xprocessChanges(changes)
1122 // Work around go vet, it doesn't see defer cancelDrain.
1123 if reqctxcancel != nil {
1131// xprepareMailboxIDs prepare the first half of filters for mailboxes, based on
1132// f.MailboxID (-1 is special). matchMailboxes indicates whether the IDs in
1133// mailboxIDs must or must not match. mailboxPrefixes is for use with
1134// xgatherMailboxIDs to gather children of the mailboxIDs.
1135func xprepareMailboxIDs(ctx context.Context, tx *bstore.Tx, f Filter, rejectsMailbox string) (matchMailboxes bool, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1136 matchMailboxes = true
1137 mailboxIDs = map[int64]bool{}
1138 if f.MailboxID == -1 {
1139 matchMailboxes = false
1140 // Add the trash, junk and account rejects mailbox.
1141 err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
1142 if mb.Trash || mb.Junk || mb.Name == rejectsMailbox {
1143 mailboxPrefixes = append(mailboxPrefixes, mb.Name+"/")
1144 mailboxIDs[mb.ID] = true
1148 xcheckf(ctx, err, "finding trash/junk/rejects mailbox")
1149 } else if f.MailboxID > 0 {
1150 mb, err := store.MailboxID(tx, f.MailboxID)
1151 xcheckf(ctx, err, "get mailbox")
1152 mailboxIDs[f.MailboxID] = true
1153 mailboxPrefixes = []string{mb.Name + "/"}
1158// xgatherMailboxIDs adds all mailboxes with a prefix matching any of
1159// mailboxPrefixes to mailboxIDs, to expand filtering to children of mailboxes.
1160func xgatherMailboxIDs(ctx context.Context, tx *bstore.Tx, mailboxIDs map[int64]bool, mailboxPrefixes []string) {
1161 // Gather more mailboxes to filter on, based on mailboxPrefixes.
1162 if len(mailboxPrefixes) == 0 {
1165 err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
1166 for _, p := range mailboxPrefixes {
1167 if strings.HasPrefix(mb.Name, p) {
1168 mailboxIDs[mb.ID] = true
1174 xcheckf(ctx, err, "gathering mailboxes")
1177// matchesMailbox returns whether a mailbox matches the view.
1178func (v view) matchesMailbox(mailboxID int64) bool {
1179 return len(v.mailboxIDs) == 0 || v.matchMailboxIDs && v.mailboxIDs[mailboxID] || !v.matchMailboxIDs && !v.mailboxIDs[mailboxID]
1182// inRange returns whether m is within the range for the view, whether a change for
1183// this message should be sent to the client so it can update its state.
1184func (v view) inRange(m store.Message) bool {
1185 return v.End || !v.Request.Query.OrderAsc && !m.Received.Before(v.LastMessageReceived) || v.Request.Query.OrderAsc && !m.Received.After(v.LastMessageReceived)
1188// matches checks if the message, identified by either messageID or mailboxID+UID,
1189// is in the current "view" (i.e. passing the filters, and if checkRange is set
1190// also if within the range of sent messages based on sort order and the last seen
1191// message). getmsg retrieves the message, which may be necessary depending on the
1192// active filters. Used to determine if a store.Change with a new message should be
1193// sent, and for the destination and anchor messages in view requests.
1194func (v view) matches(log mlog.Log, acc *store.Account, checkRange bool, messageID int64, mailboxID int64, uid store.UID, flags store.Flags, keywords []string, getmsg func(int64, int64, store.UID) (store.Message, error)) (match bool, rerr error) {
1196 ensureMessage := func() bool {
1197 if m.ID == 0 && rerr == nil {
1198 m, rerr = getmsg(messageID, mailboxID, uid)
1203 q := v.Request.Query
1205 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1208 if len(v.mailboxIDs) > 0 && (!ensureMessage() || v.matchMailboxIDs && !v.mailboxIDs[m.MailboxID] || !v.matchMailboxIDs && v.mailboxIDs[m.MailboxID]) {
1211 // note: anchorMessageID is not relevant for matching.
1212 flagfilter := q.flagFilterFn()
1213 if flagfilter != nil && !flagfilter(flags, keywords) {
1217 if q.Filter.Oldest != nil && (!ensureMessage() || m.Received.Before(*q.Filter.Oldest)) {
1220 if q.Filter.Newest != nil && (!ensureMessage() || !m.Received.Before(*q.Filter.Newest)) {
1224 if q.Filter.SizeMin > 0 && (!ensureMessage() || m.Size < q.Filter.SizeMin) {
1227 if q.Filter.SizeMax > 0 && (!ensureMessage() || m.Size > q.Filter.SizeMax) {
1231 state := msgState{acc: acc, log: log}
1233 if rerr == nil && state.err != nil {
1239 attachmentFilter := q.attachmentFilterFn(log, acc, &state)
1240 if attachmentFilter != nil && (!ensureMessage() || !attachmentFilter(m)) {
1244 envFilter := q.envFilterFn(log, &state)
1245 if envFilter != nil && (!ensureMessage() || !envFilter(m)) {
1249 headerFilter := q.headerFilterFn(log, &state)
1250 if headerFilter != nil && (!ensureMessage() || !headerFilter(m)) {
1254 wordsFilter := q.wordsFilterFn(log, &state)
1255 if wordsFilter != nil && (!ensureMessage() || !wordsFilter(m)) {
1259 // Now check that we are either within the sorting order, or "last" was sent.
1260 if !checkRange || v.End || ensureMessage() && v.inRange(m) {
1266type msgResp struct {
1267 err error // If set, an error happened and fields below are not set.
1268 reset bool // If set, the anchor message does not exist (anymore?) and we are sending messages from the start, fields below not set.
1269 viewEnd bool // If set, the last message for the view was seen, no more should be requested, fields below not set.
1270 mil []MessageItem // If none of the cases above apply, the messages that was found matching the query. First message was reason the thread is returned, for use as AnchorID in followup request.
1271 pm *ParsedMessage // If m was the target page.DestMessageID, or this is the first match, this is the parsed message of mi.
1274func storeNewPreviews(ctx context.Context, log mlog.Log, acc *store.Account, newPreviews map[int64]string) {
1275 if len(newPreviews) == 0 {
1282 log.Error("unhandled panic in storeNewPreviews", slog.Any("err", x))
1284 metrics.PanicInc(metrics.Store)
1288 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1289 for id, preview := range newPreviews {
1290 m := store.Message{ID: id}
1291 if err := tx.Get(&m); err != nil {
1292 return fmt.Errorf("get message with id %d to store preview: %w", id, err)
1293 } else if !m.Expunged {
1294 m.Preview = &preview
1295 if err := tx.Update(&m); err != nil {
1296 return fmt.Errorf("updating message with id %d: %v", m.ID, err)
1302 log.Check(err, "saving new previews with messages")
1305// viewRequestTx executes a request (query with filters, pagination) by
1306// launching a new goroutine with queryMessages, receiving results as msgResp,
1307// and sending Event* to the SSE connection.
1309// It always closes tx.
1310func viewRequestTx(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, msgc chan EventViewMsgs, errc chan EventViewErr, resetc chan EventViewReset, donec chan int64) {
1311 // Newly generated previews which we'll save when the operation is done.
1312 newPreviews := map[int64]string{}
1315 err := tx.Rollback()
1316 log.Check(err, "rolling back query transaction")
1318 donec <- v.Request.ID
1320 // ctx can be canceled, we still want to store the previews.
1321 storeNewPreviews(context.Background(), log, acc, newPreviews)
1323 x := recover() // Should not happen, but don't take program down if it does.
1325 log.WithContext(ctx).Error("viewRequestTx panic", slog.Any("err", x))
1327 metrics.PanicInc(metrics.Webmailrequest)
1331 var msgitems [][]MessageItem // Gathering for 300ms, then flushing.
1332 var parsedMessage *ParsedMessage
1335 var immediate bool // No waiting, flush immediate.
1336 t := time.NewTimer(300 * time.Millisecond)
1339 sendViewMsgs := func(force bool) {
1340 if len(msgitems) == 0 && !force {
1345 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, msgitems, parsedMessage, viewEnd}
1348 t.Reset(300 * time.Millisecond)
1351 // todo: should probably rewrite code so we don't start yet another goroutine, but instead handle the query responses directly (through a struct that keeps state?) in the sse connection goroutine.
1353 mrc := make(chan msgResp, 1)
1354 go queryMessages(ctx, log, acc, tx, v, mrc, newPreviews)
1358 case mr, ok := <-mrc:
1361 // Empty message list signals this query is done.
1362 msgc <- EventViewMsgs{v.Request.ViewID, v.Request.ID, nil, nil, false}
1367 errc <- EventViewErr{v.Request.ViewID, v.Request.ID, mr.err.Error(), mr.err}
1371 resetc <- EventViewReset{v.Request.ViewID, v.Request.ID}
1380 msgitems = append(msgitems, mr.mil)
1382 parsedMessage = mr.pm
1389 if len(msgitems) == 0 {
1390 // Nothing to send yet. We'll send immediately when the next message comes in.
1399// queryMessages executes a query, with filter, pagination, destination message id
1400// to fetch (the message that the client had in view and wants to display again).
1401// It sends on msgc, with several types of messages: errors, whether the view is
1402// reset due to missing AnchorMessageID, and when the end of the view was reached
1403// and/or for a message.
1404// newPreviews is filled with previews, the caller must save them.
1405func queryMessages(ctx context.Context, log mlog.Log, acc *store.Account, tx *bstore.Tx, v view, mrc chan msgResp, newPreviews map[int64]string) {
1407 x := recover() // Should not happen, but don't take program down if it does.
1409 log.WithContext(ctx).Error("queryMessages panic", slog.Any("err", x))
1411 mrc <- msgResp{err: fmt.Errorf("query failed")}
1412 metrics.PanicInc(metrics.Webmailquery)
1418 query := v.Request.Query
1419 page := v.Request.Page
1421 // Warning: Filters must be kept in sync between queryMessage and view.matches.
1423 checkMessage := func(id int64) (valid bool, rerr error) {
1424 m := store.Message{ID: id}
1426 if err == bstore.ErrAbsent || err == nil && m.Expunged {
1428 } else if err != nil {
1431 return v.matches(log, acc, false, m.ID, m.MailboxID, m.UID, m.Flags, m.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1437 // Check if AnchorMessageID exists and matches filter. If not, we will reset the view.
1438 if page.AnchorMessageID > 0 {
1439 // Check if message exists and (still) matches the filter.
1440 // todo: if AnchorMessageID exists but no longer matches the filter, we are resetting the view, but could handle it more gracefully in the future. if the message is in a different mailbox, we cannot query as efficiently, we'll have to read through more messages.
1441 if valid, err := checkMessage(page.AnchorMessageID); err != nil {
1442 mrc <- msgResp{err: fmt.Errorf("querying AnchorMessageID: %v", err)}
1445 mrc <- msgResp{reset: true}
1446 page.AnchorMessageID = 0
1450 // Check if page.DestMessageID exists and matches filter. If not, we will ignore
1451 // it instead of continuing to send message till the end of the view.
1452 if page.DestMessageID > 0 {
1453 if valid, err := checkMessage(page.DestMessageID); err != nil {
1454 mrc <- msgResp{err: fmt.Errorf("querying requested message: %v", err)}
1457 page.DestMessageID = 0
1461 // todo optimize: we would like to have more filters directly on the database if they can use an index. eg if there is a keyword filter and no mailbox filter.
1463 q := bstore.QueryTx[store.Message](tx)
1464 q.FilterEqual("Expunged", false)
1465 if len(v.mailboxIDs) > 0 {
1466 if len(v.mailboxIDs) == 1 && v.matchMailboxIDs {
1467 // Should result in fast indexed query.
1468 for mbID := range v.mailboxIDs {
1469 q.FilterNonzero(store.Message{MailboxID: mbID})
1472 idsAny := make([]any, 0, len(v.mailboxIDs))
1473 for mbID := range v.mailboxIDs {
1474 idsAny = append(idsAny, mbID)
1476 if v.matchMailboxIDs {
1477 q.FilterEqual("MailboxID", idsAny...)
1479 q.FilterNotEqual("MailboxID", idsAny...)
1484 // If we are looking for an anchor, keep skipping message early (cheaply) until we've seen it.
1485 if page.AnchorMessageID > 0 {
1487 q.FilterFn(func(m store.Message) bool {
1491 seen = m.ID == page.AnchorMessageID
1496 // We may be added filters the the query below. The FilterFn signature does not
1497 // implement reporting errors, or anything else, just a bool. So when making the
1498 // filter functions, we give them a place to store parsed message state, and an
1499 // error. We check the error during and after query execution.
1500 state := msgState{acc: acc, log: log, newPreviews: newPreviews}
1503 flagfilter := query.flagFilterFn()
1504 if flagfilter != nil {
1505 q.FilterFn(func(m store.Message) bool {
1506 return flagfilter(m.Flags, m.Keywords)
1510 if query.Filter.Oldest != nil {
1511 q.FilterGreaterEqual("Received", *query.Filter.Oldest)
1513 if query.Filter.Newest != nil {
1514 q.FilterLessEqual("Received", *query.Filter.Newest)
1517 if query.Filter.SizeMin > 0 {
1518 q.FilterGreaterEqual("Size", query.Filter.SizeMin)
1520 if query.Filter.SizeMax > 0 {
1521 q.FilterLessEqual("Size", query.Filter.SizeMax)
1524 attachmentFilter := query.attachmentFilterFn(log, acc, &state)
1525 if attachmentFilter != nil {
1526 q.FilterFn(attachmentFilter)
1529 envFilter := query.envFilterFn(log, &state)
1530 if envFilter != nil {
1531 q.FilterFn(envFilter)
1534 headerFilter := query.headerFilterFn(log, &state)
1535 if headerFilter != nil {
1536 q.FilterFn(headerFilter)
1539 wordsFilter := query.wordsFilterFn(log, &state)
1540 if wordsFilter != nil {
1541 q.FilterFn(wordsFilter)
1544 var moreHeaders []string // From store.Settings.ShowHeaders
1547 q.SortAsc("Received")
1549 q.SortDesc("Received")
1551 found := page.DestMessageID <= 0
1554 err := q.ForEach(func(m store.Message) error {
1555 // Check for an error in one of the filters, propagate it.
1556 if state.err != nil {
1560 if have >= page.Count && found || have > 10000 {
1562 return bstore.StopForEach
1565 if _, ok := v.threadIDs[m.ThreadID]; ok {
1566 // Message was already returned as part of a thread.
1570 var pm *ParsedMessage
1571 if m.ID == page.DestMessageID || page.DestMessageID == 0 && have == 0 && page.AnchorMessageID == 0 {
1572 // For threads, if there was no DestMessageID, we may be getting the newest
1573 // message. For an initial view, this isn't necessarily the first the user is
1574 // expected to read first, that would be the first unread, which we'll get below
1575 // when gathering the thread.
1577 xpm, err := parsedMessage(log, &m, &state, true, false, false)
1578 if err != nil && errors.Is(err, message.ErrHeader) {
1579 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1580 } else if err != nil {
1581 return fmt.Errorf("parsing message %d: %v", m.ID, err)
1588 moreHeaders, err = ensureMoreHeaders(tx, moreHeaders)
1590 return fmt.Errorf("ensuring more headers: %v", err)
1593 mi, err := messageItem(log, m, &state, moreHeaders)
1595 return fmt.Errorf("making messageitem for message %d: %v", m.ID, err)
1597 mil := []MessageItem{mi}
1598 if query.Threading != ThreadOff {
1599 more, xpm, err := gatherThread(log, tx, acc, v, m, page.DestMessageID, page.AnchorMessageID == 0 && have == 0, moreHeaders, state.newPreviews)
1601 return fmt.Errorf("gathering thread messages for id %d, thread %d: %v", m.ID, m.ThreadID, err)
1607 mil = append(mil, more...)
1608 v.threadIDs[m.ThreadID] = struct{}{}
1610 // Calculate how many messages the frontend is going to show, and only count those as returned.
1611 collapsed := map[int64]bool{}
1612 for _, mi := range mil {
1613 collapsed[mi.Message.ID] = mi.Message.ThreadCollapsed
1615 unread := map[int64]bool{} // Propagated to thread root.
1616 if query.Threading == ThreadUnread {
1617 for _, mi := range mil {
1622 unread[mm.ID] = true
1623 for _, id := range mm.ThreadParentIDs {
1628 for _, mi := range mil {
1632 for _, id := range mm.ThreadParentIDs {
1633 if _, ok := collapsed[id]; ok {
1638 if threadRoot || (query.Threading == ThreadOn && !collapsed[rootID] || query.Threading == ThreadUnread && unread[rootID]) {
1645 if pm != nil && len(pm.envelope.From) == 1 {
1646 pm.ViewMode, err = fromAddrViewMode(tx, pm.envelope.From[0])
1648 return fmt.Errorf("gathering view mode for id %d: %v", m.ID, err)
1651 mrc <- msgResp{mil: mil, pm: pm}
1654 // Check for an error in one of the filters again. Check in ForEach would not
1655 // trigger if the last message has the error.
1656 if err == nil && state.err != nil {
1660 mrc <- msgResp{err: fmt.Errorf("querying messages: %v", err)}
1664 mrc <- msgResp{viewEnd: true}
1668func gatherThread(log mlog.Log, tx *bstore.Tx, acc *store.Account, v view, m store.Message, destMessageID int64, first bool, moreHeaders []string, newPreviews map[int64]string) ([]MessageItem, *ParsedMessage, error) {
1669 if m.ThreadID == 0 {
1670 // If we would continue, FilterNonzero would fail because there are no non-zero fields.
1671 return nil, nil, fmt.Errorf("message has threadid 0, account is probably still being upgraded, try turning threading off until the upgrade is done")
1674 // Fetch other messages for this thread.
1675 qt := bstore.QueryTx[store.Message](tx)
1676 qt.FilterNonzero(store.Message{ThreadID: m.ThreadID})
1677 qt.FilterEqual("Expunged", false)
1678 qt.FilterNotEqual("ID", m.ID)
1680 tml, err := qt.List()
1682 return nil, nil, fmt.Errorf("listing other messages in thread for message %d, thread %d: %v", m.ID, m.ThreadID, err)
1685 var mil []MessageItem
1686 var pm *ParsedMessage
1687 var firstUnread bool
1688 for _, tm := range tml {
1689 err := func() error {
1690 xstate := msgState{acc: acc, log: log, newPreviews: newPreviews}
1691 defer xstate.clear()
1693 mi, err := messageItem(log, tm, &xstate, moreHeaders)
1695 return fmt.Errorf("making messageitem for message %d, for thread %d: %v", tm.ID, m.ThreadID, err)
1697 mi.MatchQuery, err = v.matches(log, acc, false, tm.ID, tm.MailboxID, tm.UID, tm.Flags, tm.Keywords, func(int64, int64, store.UID) (store.Message, error) {
1701 return fmt.Errorf("matching thread message %d against view query: %v", tm.ID, err)
1703 mil = append(mil, mi)
1705 if tm.ID == destMessageID || destMessageID == 0 && first && (pm == nil || !firstUnread && !tm.Seen) {
1706 firstUnread = !tm.Seen
1707 xpm, err := parsedMessage(log, &tm, &xstate, true, false, false)
1708 if err != nil && errors.Is(err, message.ErrHeader) {
1709 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1710 } else if err != nil {
1711 return fmt.Errorf("parsing thread message %d: %v", tm.ID, err)
1719 return nil, nil, err
1723 // Finally, the message that caused us to gather this thread (which is likely the
1724 // most recent message in the thread) could be the only unread message.
1725 if destMessageID == 0 && first && !m.Seen && !firstUnread {
1726 xstate := msgState{acc: acc, log: log}
1727 defer xstate.clear()
1728 xpm, err := parsedMessage(log, &m, &xstate, true, false, false)
1729 if err != nil && errors.Is(err, message.ErrHeader) {
1730 log.Debug("not returning parsed message due to invalid headers", slog.Int64("msgid", m.ID), slog.Any("err", err))
1731 } else if err != nil {
1732 return nil, nil, fmt.Errorf("parsing thread message %d: %v", m.ID, err)
1741// While checking the filters on a message, we may need to get more message
1742// details as each filter passes. We check the filters that need the basic
1743// information first, and load and cache more details for the next filters.
1744// msgState holds parsed details for a message, it is updated while filtering,
1745// with more information or reset for a next message.
1746type msgState struct {
1747 acc *store.Account // Never changes during lifetime.
1748 err error // Once set, doesn't get cleared.
1750 part *message.Part // Will be without Reader when msgr is nil.
1751 msgr *store.MsgReader
1754 // If not nil, messages will get their Preview field filled when nil, and message
1755 // id and preview added to newPreviews, and saved in a separate write transaction
1756 // when the operation is done.
1757 newPreviews map[int64]string
1760func (ms *msgState) clear() {
1762 err := ms.msgr.Close()
1763 ms.log.Check(err, "closing message reader from state")
1766 *ms = msgState{acc: ms.acc, err: ms.err, log: ms.log, newPreviews: ms.newPreviews}
1769func (ms *msgState) ensureMsg(m store.Message) {
1770 if m.ID != ms.m.ID {
1776func (ms *msgState) ensurePart(m store.Message, withMsgReader bool) bool {
1781 if m.ParsedBuf == nil {
1782 ms.err = fmt.Errorf("message %d not parsed", m.ID)
1786 if err := json.Unmarshal(m.ParsedBuf, &p); err != nil {
1787 ms.err = fmt.Errorf("load part for message %d: %w", m.ID, err)
1792 if withMsgReader && ms.msgr == nil {
1793 ms.msgr = ms.acc.MessageReader(m)
1794 ms.part.SetReaderAt(ms.msgr)
1797 return ms.part != nil
1800// flagFilterFn returns a function that applies the flag/keyword/"label"-related
1801// filters for a query. A nil function is returned if there are no flags to filter
1803func (q Query) flagFilterFn() func(store.Flags, []string) bool {
1804 labels := map[string]bool{}
1805 for _, k := range q.Filter.Labels {
1808 for _, k := range q.NotFilter.Labels {
1812 if len(labels) == 0 {
1816 var mask, flags store.Flags
1817 systemflags := map[string][]*bool{
1818 `\answered`: {&mask.Answered, &flags.Answered},
1819 `\flagged`: {&mask.Flagged, &flags.Flagged},
1820 `\deleted`: {&mask.Deleted, &flags.Deleted},
1821 `\seen`: {&mask.Seen, &flags.Seen},
1822 `\draft`: {&mask.Draft, &flags.Draft},
1823 `$junk`: {&mask.Junk, &flags.Junk},
1824 `$notjunk`: {&mask.Notjunk, &flags.Notjunk},
1825 `$forwarded`: {&mask.Forwarded, &flags.Forwarded},
1826 `$phishing`: {&mask.Phishing, &flags.Phishing},
1827 `$mdnsent`: {&mask.MDNSent, &flags.MDNSent},
1829 keywords := map[string]bool{}
1830 for k, v := range labels {
1831 k = strings.ToLower(k)
1832 if mf, ok := systemflags[k]; ok {
1839 return func(msgFlags store.Flags, msgKeywords []string) bool {
1841 if f.Set(mask, msgFlags) != flags {
1844 for k, v := range keywords {
1845 if slices.Contains(msgKeywords, k) != v {
1853// attachmentFilterFn returns a function that filters for the attachment-related
1854// filter from the query. A nil function is returned if there are attachment
1856func (q Query) attachmentFilterFn(log mlog.Log, acc *store.Account, state *msgState) func(m store.Message) bool {
1857 if q.Filter.Attachments == AttachmentIndifferent && q.NotFilter.Attachments == AttachmentIndifferent {
1861 return func(m store.Message) bool {
1862 if !state.ensurePart(m, true) {
1865 types, err := attachmentTypes(log, m, state)
1870 return (q.Filter.Attachments == AttachmentIndifferent || types[q.Filter.Attachments]) && (q.NotFilter.Attachments == AttachmentIndifferent || !types[q.NotFilter.Attachments])
1874var attachmentMimetypes = map[string]AttachmentType{
1875 "application/pdf": AttachmentPDF,
1876 "application/zip": AttachmentArchive,
1877 "application/x-rar-compressed": AttachmentArchive,
1878 "application/vnd.oasis.opendocument.spreadsheet": AttachmentSpreadsheet,
1879 "application/vnd.ms-excel": AttachmentSpreadsheet,
1880 "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": AttachmentSpreadsheet,
1881 "application/vnd.oasis.opendocument.text": AttachmentDocument,
1882 "application/vnd.oasis.opendocument.presentation": AttachmentPresentation,
1883 "application/vnd.ms-powerpoint": AttachmentPresentation,
1884 "application/vnd.openxmlformats-officedocument.presentationml.presentation": AttachmentPresentation,
1886var attachmentExtensions = map[string]AttachmentType{
1887 ".pdf": AttachmentPDF,
1888 ".zip": AttachmentArchive,
1889 ".tar": AttachmentArchive,
1890 ".tgz": AttachmentArchive,
1891 ".tar.gz": AttachmentArchive,
1892 ".tbz2": AttachmentArchive,
1893 ".tar.bz2": AttachmentArchive,
1894 ".tar.lz": AttachmentArchive,
1895 ".tlz": AttachmentArchive,
1896 ".tar.xz": AttachmentArchive,
1897 ".txz": AttachmentArchive,
1898 ".tar.zst": AttachmentArchive,
1899 ".tar.lz4": AttachmentArchive,
1900 ".7z": AttachmentArchive,
1901 ".rar": AttachmentArchive,
1902 ".ods": AttachmentSpreadsheet,
1903 ".xls": AttachmentSpreadsheet,
1904 ".xlsx": AttachmentSpreadsheet,
1905 ".odt": AttachmentDocument,
1906 ".doc": AttachmentDocument,
1907 ".docx": AttachmentDocument,
1908 ".odp": AttachmentPresentation,
1909 ".ppt": AttachmentPresentation,
1910 ".pptx": AttachmentPresentation,
1913func attachmentTypes(log mlog.Log, m store.Message, state *msgState) (map[AttachmentType]bool, error) {
1914 types := map[AttachmentType]bool{}
1916 pm, err := parsedMessage(log, &m, state, false, false, false)
1918 return nil, fmt.Errorf("parsing message for attachments: %w", err)
1920 for _, a := range pm.attachments {
1921 if a.Part.MediaType == "IMAGE" {
1922 types[AttachmentImage] = true
1925 mt := strings.ToLower(a.Part.MediaType + "/" + a.Part.MediaSubType)
1926 if t, ok := attachmentMimetypes[mt]; ok {
1930 _, filename, err := a.Part.DispositionFilename()
1931 if err != nil && (errors.Is(err, message.ErrParamEncoding) || errors.Is(err, message.ErrHeader)) {
1932 log.Debugx("parsing disposition/filename", err)
1933 } else if err != nil {
1934 return nil, fmt.Errorf("reading disposition/filename: %v", err)
1936 if ext := filepath.Ext(filename); ext != "" {
1937 if t, ok := attachmentExtensions[strings.ToLower(ext)]; ok {
1943 if len(types) == 0 {
1944 types[AttachmentNone] = true
1946 types[AttachmentAny] = true
1951// envFilterFn returns a filter function for the "envelope" headers ("envelope" as
1952// used by IMAP, i.e. basic message headers from/to/subject, an unfortunate name
1953// clash with SMTP envelope) for the query. A nil function is returned if no
1954// filtering is needed.
1955func (q Query) envFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
1956 if len(q.Filter.From) == 0 && len(q.Filter.To) == 0 && len(q.Filter.Subject) == 0 && len(q.NotFilter.From) == 0 && len(q.NotFilter.To) == 0 && len(q.NotFilter.Subject) == 0 {
1960 lower := func(l []string) []string {
1964 r := make([]string, len(l))
1965 for i, s := range l {
1966 r[i] = strings.ToLower(s)
1971 filterSubject := lower(q.Filter.Subject)
1972 notFilterSubject := lower(q.NotFilter.Subject)
1973 filterFrom := lower(q.Filter.From)
1974 notFilterFrom := lower(q.NotFilter.From)
1975 filterTo := lower(q.Filter.To)
1976 notFilterTo := lower(q.NotFilter.To)
1978 return func(m store.Message) bool {
1979 if !state.ensurePart(m, false) {
1983 var env message.Envelope
1984 if state.part.Envelope != nil {
1985 env = *state.part.Envelope
1988 if len(filterSubject) > 0 || len(notFilterSubject) > 0 {
1989 subject := strings.ToLower(env.Subject)
1990 for _, s := range filterSubject {
1991 if !strings.Contains(subject, s) {
1995 for _, s := range notFilterSubject {
1996 if strings.Contains(subject, s) {
2002 contains := func(textLower []string, l []message.Address, all bool) bool {
2004 for _, s := range textLower {
2005 for _, a := range l {
2006 name := strings.ToLower(a.Name)
2007 addr := strings.ToLower(fmt.Sprintf("<%s@%s>", a.User, a.Host))
2008 if strings.Contains(name, s) || strings.Contains(addr, s) {
2022 if len(filterFrom) > 0 && !contains(filterFrom, env.From, true) {
2025 if len(notFilterFrom) > 0 && contains(notFilterFrom, env.From, false) {
2028 if len(filterTo) > 0 || len(notFilterTo) > 0 {
2029 to := slices.Concat(env.To, env.CC, env.BCC)
2030 if len(filterTo) > 0 && !contains(filterTo, to, true) {
2033 if len(notFilterTo) > 0 && contains(notFilterTo, to, false) {
2041// headerFilterFn returns a function that filters for the header filters in the
2042// query. A nil function is returned if there are no header filters.
2043func (q Query) headerFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2044 if len(q.Filter.Headers) == 0 {
2048 lowerValues := make([]string, len(q.Filter.Headers))
2049 for i, t := range q.Filter.Headers {
2050 lowerValues[i] = strings.ToLower(t[1])
2053 return func(m store.Message) bool {
2054 if !state.ensurePart(m, true) {
2057 hdr, err := state.part.Header()
2059 state.err = fmt.Errorf("reading header for message %d: %w", m.ID, err)
2064 for i, t := range q.Filter.Headers {
2068 if v == "" && len(l) > 0 {
2071 for _, e := range l {
2072 if strings.Contains(strings.ToLower(e), v) {
2082// wordFiltersFn returns a function that applies the word filters of the query. A
2083// nil function is returned when query does not contain a word filter.
2084func (q Query) wordsFilterFn(log mlog.Log, state *msgState) func(m store.Message) bool {
2085 if len(q.Filter.Words) == 0 && len(q.NotFilter.Words) == 0 {
2089 ws := store.PrepareWordSearch(q.Filter.Words, q.NotFilter.Words)
2091 return func(m store.Message) bool {
2092 if !state.ensurePart(m, true) {
2096 if ok, err := ws.MatchPart(log, state.part, true); err != nil {
2097 state.err = fmt.Errorf("searching for words in message %d: %w", m.ID, err)