13	"golang.org/x/exp/slices"
 
14	"golang.org/x/exp/slog"
 
16	"github.com/mjl-/bstore"
 
18	"github.com/mjl-/mox/message"
 
19	"github.com/mjl-/mox/mlog"
 
20	"github.com/mjl-/mox/moxio"
 
23// Assign a new/incoming message to a thread. Message does not yet have an ID. If
 
24// this isn't a response, ThreadID should remain 0 (unless this is a message with
 
25// existing message-id) and the caller must set ThreadID to ID.
 
26// If the account is still busy upgrading messages with threadids in the background, parents
 
27// may have a threadid 0. That results in this message getting threadid 0, which
 
28// will handled by the background upgrade process assigning a threadid when it gets
 
30func assignThread(log mlog.Log, tx *bstore.Tx, m *Message, part *message.Part) error {
 
31	if m.MessageID != "" {
 
32		// Match against existing different message with same Message-ID.
 
33		q := bstore.QueryTx[Message](tx)
 
34		q.FilterNonzero(Message{MessageID: m.MessageID})
 
35		q.FilterEqual("Expunged", false)
 
36		q.FilterNotEqual("ID", m.ID)
 
37		q.FilterNotEqual("ThreadID", int64(0))
 
41		if err != nil && err != bstore.ErrAbsent {
 
42			return fmt.Errorf("looking up existing message with message-id: %v", err)
 
43		} else if err == nil {
 
44			assignParent(m, em, true)
 
49	h, err := part.Header()
 
51		log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
 
53	messageIDs, err := message.ReferencedIDs(h.Values("References"), h.Values("In-Reply-To"))
 
55		log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
 
57	for i := len(messageIDs) - 1; i >= 0; i-- {
 
58		messageID := messageIDs[i]
 
59		if messageID == m.MessageID {
 
62		tm, _, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase)
 
64			return fmt.Errorf("looking up thread message for new message: %v", err)
 
66			assignParent(m, *tm, true)
 
69		m.ThreadMissingLink = true
 
71	if len(messageIDs) > 0 {
 
76	if part != nil && part.Envelope != nil {
 
77		m.SubjectBase, isResp = message.ThreadSubject(part.Envelope.Subject, false)
 
79	if !isResp || m.SubjectBase == "" {
 
82	m.ThreadMissingLink = true
 
83	tm, err := lookupThreadMessageSubject(tx, *m, m.SubjectBase)
 
85		return fmt.Errorf("looking up thread message by subject: %v", err)
 
87		assignParent(m, *tm, true)
 
92// assignParent assigns threading fields to m that make it a child of parent message pm.
 
93// updateSeen indicates if m.Seen should be cleared if pm is thread-muted.
 
94func assignParent(m *Message, pm Message, updateSeen bool) {
 
96		panic(fmt.Sprintf("assigning message id %d/d%q to parent message id %d/%q which has threadid 0", m.ID, m.MessageID, pm.ID, pm.MessageID))
 
99		panic(fmt.Sprintf("trying to make message id %d/%q its own parent", m.ID, m.MessageID))
 
101	m.ThreadID = pm.ThreadID
 
102	// Make sure we don't add cycles.
 
103	if !slices.Contains(pm.ThreadParentIDs, m.ID) {
 
104		m.ThreadParentIDs = append([]int64{pm.ID}, pm.ThreadParentIDs...)
 
105	} else if pm.ID != m.ID {
 
106		m.ThreadParentIDs = []int64{pm.ID}
 
108		m.ThreadParentIDs = nil
 
110	if m.MessageID != "" && m.MessageID == pm.MessageID {
 
111		m.ThreadMissingLink = true
 
113	m.ThreadMuted = pm.ThreadMuted
 
114	m.ThreadCollapsed = pm.ThreadCollapsed
 
115	if updateSeen && m.ThreadMuted {
 
120// ResetThreading resets the MessageID and SubjectBase fields for all messages in
 
121// the account. If clearIDs is true, all Thread* fields are also cleared. Changes
 
122// are made in transactions of batchSize changes. The total number of updated
 
123// messages is returned.
 
125// ModSeq is not changed. Calles should bump the uid validity of the mailboxes
 
126// to propagate the changes to IMAP clients.
 
127func (a *Account) ResetThreading(ctx context.Context, log mlog.Log, batchSize int, clearIDs bool) (int, error) {
 
128	// todo: should this send Change events for ThreadMuted and ThreadCollapsed? worth it?
 
135		prepareMessages := func(in, out chan moxio.Work[Message, Message]) {
 
144				// We have the Message-ID and Subject headers in ParsedBuf. We use a partial part
 
145				// struct so we don't generate so much garbage for the garbage collector to sift
 
148					Envelope *message.Envelope
 
150				if err := json.Unmarshal(m.ParsedBuf, &part); err != nil {
 
151					log.Errorx("unmarshal json parsedbuf for setting message-id, skipping", err, slog.Int64("msgid", m.ID))
 
154					if part.Envelope != nil && part.Envelope.MessageID != "" {
 
155						s, _, err := message.MessageIDCanonical(part.Envelope.MessageID)
 
157							log.Debugx("parsing message-id, skipping", err, slog.Int64("msgid", m.ID), slog.String("messageid", part.Envelope.MessageID))
 
161					if part.Envelope != nil {
 
162						m.SubjectBase, _ = message.ThreadSubject(part.Envelope.Subject, false)
 
171		err := a.DB.Write(ctx, func(tx *bstore.Tx) error {
 
172			processMessage := func(in, m Message) error {
 
175					m.ThreadParentIDs = nil
 
176					m.ThreadMissingLink = false
 
181			// JSON parsing is relatively heavy, we benefit from multiple goroutines.
 
182			procs := runtime.GOMAXPROCS(0)
 
183			wq := moxio.NewWorkQueue[Message, Message](procs, 2*procs, prepareMessages, processMessage)
 
185			q := bstore.QueryTx[Message](tx)
 
186			q.FilterEqual("Expunged", false)
 
187			q.FilterGreater("ID", lastID)
 
189			err := q.ForEach(func(m Message) error {
 
190				// We process in batches so we don't block other operations for a long time.
 
192					return bstore.StopForEach
 
194				// Update starting point for next batch.
 
207			return total, fmt.Errorf("upgrading account to threads storage, step 1/2: %w", err)
 
217// AssignThreads assigns thread-related fields to messages with ID >=
 
218// startMessageID. Changes are committed each batchSize changes if txOpt is nil
 
219// (i.e. during automatic account upgrade, we don't want to block database access
 
220// for a long time). If txOpt is not nil, all changes are made in that
 
223// When resetting thread assignments, the caller must first clear the existing
 
226// Messages are processed in order of ID, so when added to the account, not
 
227// necessarily by received/date. Most threaded messages can immediately be matched
 
228// to their parent message. If not, we keep track of the missing message-id and
 
229// resolve as soon as we encounter it. At the end, we resolve all remaining
 
230// messages, they start with a cycle.
 
232// Does not set Seen flag for muted threads.
 
234// Progress is written to progressWriter, every 100k messages.
 
235func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore.Tx, startMessageID int64, batchSize int, progressWriter io.Writer) error {
 
236	// We use a more basic version of the thread-matching algorithm describe in:
 
238	// The algorithm assumes you'll select messages, then group into threads. We normally do
 
239	// thread-calculation when messages are delivered. Here, we assign threads as soon
 
240	// as we can, but will queue messages that reference known ancestors and resolve as
 
241	// soon as we process them. We can handle large number of messages, but not very
 
242	// quickly because we make lots of database queries.
 
244	type childMsg struct {
 
245		ID                int64  // This message will be fetched and updated with the threading fields once the parent is resolved.
 
246		MessageID         string // Of child message. Once child is resolved, its own children can be resolved too.
 
247		ThreadMissingLink bool
 
249	// Messages that have a References/In-Reply-To that we want to set as parent, but
 
250	// where the parent doesn't have a ThreadID yet are added to pending. The key is
 
251	// the normalized MessageID of the parent, and the value is a list of messages that
 
252	// can get resolved once the parent gets its ThreadID. The kids will get the same
 
253	// ThreadIDs, and they themselves may be parents to kids, and so on.
 
254	// For duplicate messages (messages with identical Message-ID), the second
 
255	// Message-ID to be added to pending is added under its own message-id, so it gets
 
256	// its original as parent.
 
257	pending := map[string][]childMsg{}
 
259	// Current tx. If not equal to txOpt, we clean it up before we leave.
 
262		if tx != nil && tx != txOpt {
 
264			log.Check(err, "rolling back transaction")
 
268	// Set thread-related fields for a single message. Caller must save the message,
 
269	// only if not an error and not added to the pending list.
 
270	assign := func(m *Message, references, inReplyTo []string, subject string) (pend bool, rerr error) {
 
271		if m.MessageID != "" {
 
272			// Attempt to match against existing different message with same Message-ID that
 
273			// already has a threadid.
 
274			// If there are multiple messages for a message-id a future call to assign may use
 
275			// its threadid, or it may end up in pending and we resolve it when we need to.
 
276			q := bstore.QueryTx[Message](tx)
 
277			q.FilterNonzero(Message{MessageID: m.MessageID})
 
278			q.FilterEqual("Expunged", false)
 
279			q.FilterLess("ID", m.ID)
 
283			if err != nil && err != bstore.ErrAbsent {
 
284				return false, fmt.Errorf("looking up existing message with message-id: %v", err)
 
285			} else if err == nil {
 
286				if em.ThreadID == 0 {
 
287					pending[em.MessageID] = append(pending[em.MessageID], childMsg{m.ID, m.MessageID, true})
 
290					assignParent(m, em, false)
 
296		refids, err := message.ReferencedIDs(references, inReplyTo)
 
298			log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
 
301		for i := len(refids) - 1; i >= 0; i-- {
 
302			messageID := refids[i]
 
303			if messageID == m.MessageID {
 
306			tm, exists, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase)
 
308				return false, fmt.Errorf("lookup up thread by message-id %s for message id %d: %w", messageID, m.ID, err)
 
309			} else if tm != nil {
 
310				assignParent(m, *tm, false)
 
313				pending[messageID] = append(pending[messageID], childMsg{m.ID, m.MessageID, i < len(refids)-1})
 
318		var subjectBase string
 
321			subjectBase, isResp = message.ThreadSubject(subject, false)
 
323		if len(refids) > 0 || !isResp || subjectBase == "" {
 
325			m.ThreadMissingLink = len(refids) > 0
 
329		// No references to use. If this is a reply/forward (based on subject), we'll match
 
330		// against base subject, at most 4 weeks back so we don't match against ancient
 
331		// messages and 1 day ahead so we can match against delayed deliveries.
 
332		tm, err := lookupThreadMessageSubject(tx, *m, subjectBase)
 
334			return false, fmt.Errorf("looking up recent messages by base subject %q: %w", subjectBase, err)
 
335		} else if tm != nil {
 
336			m.ThreadID = tm.ThreadID
 
337			m.ThreadParentIDs = []int64{tm.ThreadID} // Always under root message with subject-match.
 
338			m.ThreadMissingLink = true
 
339			m.ThreadMuted = tm.ThreadMuted
 
340			m.ThreadCollapsed = tm.ThreadCollapsed
 
347	npendingResolved := 0
 
349	// Resolve pending messages that wait on m.MessageID to be resolved, recursively.
 
350	var resolvePending func(tm Message, cyclic bool) error
 
351	resolvePending = func(tm Message, cyclic bool) error {
 
352		if tm.MessageID == "" {
 
355		l := pending[tm.MessageID]
 
356		delete(pending, tm.MessageID)
 
357		for _, mi := range l {
 
358			m := Message{ID: mi.ID}
 
359			if err := tx.Get(&m); err != nil {
 
360				return fmt.Errorf("get message %d for resolving pending thread for message-id %s, %d: %w", mi.ID, tm.MessageID, tm.ID, err)
 
363				// ThreadID already set because this is a cyclic message. If we would assign a
 
364				// parent again, we would create a cycle.
 
365				if m.MessageID != tm.MessageID && !cyclic {
 
366					panic(fmt.Sprintf("threadid already set (%d) while handling non-cyclic message id %d/%q and with different message-id %q as parent message id %d", m.ThreadID, m.ID, m.MessageID, tm.MessageID, tm.ID))
 
370			assignParent(&m, tm, false)
 
371			m.ThreadMissingLink = mi.ThreadMissingLink
 
372			if err := tx.Update(&m); err != nil {
 
373				return fmt.Errorf("update message %d for resolving pending thread for message-id %s, %d: %w", mi.ID, tm.MessageID, tm.ID, err)
 
375			if err := resolvePending(m, cyclic); err != nil {
 
383	// Output of the worker goroutines.
 
384	type threadPrep struct {
 
390	// Single allocation.
 
391	threadingFields := [][]byte{
 
392		[]byte("references"),
 
393		[]byte("in-reply-to"),
 
397	// Worker goroutine function. We start with a reasonably large buffer for reading
 
398	// the header into. And we have scratch space to copy the needed headers into. That
 
399	// means we normally won't allocate any more buffers.
 
400	prepareMessages := func(in, out chan moxio.Work[Message, threadPrep]) {
 
401		headerbuf := make([]byte, 8*1024)
 
402		scratch := make([]byte, 4*1024)
 
410			var partialPart struct {
 
414			if err := json.Unmarshal(m.ParsedBuf, &partialPart); err != nil {
 
415				w.Err = fmt.Errorf("unmarshal part: %v", err)
 
417				size := partialPart.BodyOffset - partialPart.HeaderOffset
 
418				if int(size) > len(headerbuf) {
 
419					headerbuf = make([]byte, size)
 
422					buf := headerbuf[:int(size)]
 
423					err := func() error {
 
424						mr := a.MessageReader(m)
 
427						// ReadAt returns whole buffer or error. Single read should be fast.
 
428						n, err := mr.ReadAt(buf, partialPart.HeaderOffset)
 
429						if err != nil || n != len(buf) {
 
430							return fmt.Errorf("read header: %v", err)
 
436					} else if h, err := message.ParseHeaderFields(buf, scratch, threadingFields); err != nil {
 
439						w.Out.references = h["References"]
 
440						w.Out.inReplyTo = h["In-Reply-To"]
 
453	// Assign threads to messages, possibly in batches.
 
460			tx, err = a.DB.Begin(ctx, true)
 
462				return fmt.Errorf("begin transaction: %w", err)
 
466		processMessage := func(m Message, prep threadPrep) error {
 
467			pend, err := assign(&m, prep.references, prep.inReplyTo, prep.subject)
 
469				return fmt.Errorf("for msgid %d: %w", m.ID, err)
 
474				panic(fmt.Sprintf("no threadid after assign of message id %d/%q", m.ID, m.MessageID))
 
476			// Fields have been set, store in database and resolve messages waiting for this MessageID.
 
477			if slices.Contains(m.ThreadParentIDs, m.ID) {
 
478				panic(fmt.Sprintf("message id %d/%q contains itself in parent ids %v", m.ID, m.MessageID, m.ThreadParentIDs))
 
480			if err := tx.Update(&m); err != nil {
 
483			if err := resolvePending(m, false); err != nil {
 
484				return fmt.Errorf("resolving pending message-id: %v", err)
 
489		// Use multiple worker goroutines to read parse headers from on-disk messages.
 
490		procs := runtime.GOMAXPROCS(0)
 
491		wq := moxio.NewWorkQueue[Message, threadPrep](2*procs, 4*procs, prepareMessages, processMessage)
 
493		// We assign threads in order by ID, so messages delivered in between our
 
494		// transaction will get assigned threads too: they'll have the highest id's.
 
495		q := bstore.QueryTx[Message](tx)
 
496		q.FilterGreaterEqual("ID", startMessageID)
 
497		q.FilterEqual("Expunged", false)
 
499		err := q.ForEach(func(m Message) error {
 
500			// Batch number of changes, so we give other users of account a change to run.
 
501			if txOpt == nil && n >= batchSize {
 
502				return bstore.StopForEach
 
504			// Starting point for next batch.
 
505			startMessageID = m.ID + 1
 
506			// Don't process again. Can happen when earlier upgrade was aborted.
 
519		if err == nil && txOpt == nil {
 
524			return fmt.Errorf("assigning threads: %w", err)
 
530		if nassigned%100000 == 0 {
 
531			log.Debug("assigning threads, progress", slog.Int("count", nassigned), slog.Int("unresolved", len(pending)))
 
532			if _, err := fmt.Fprintf(progressWriter, "assigning threads, progress: %d messages\n", nassigned); err != nil {
 
533				return fmt.Errorf("writing progress: %v", err)
 
537	if _, err := fmt.Fprintf(progressWriter, "assigning threads, done: %d messages\n", nassigned); err != nil {
 
538		return fmt.Errorf("writing progress: %v", err)
 
541	log.Debug("assigning threads, mostly done, finishing with resolving of cyclic messages", slog.Int("count", nassigned), slog.Int("unresolved", len(pending)))
 
543	if _, err := fmt.Fprintf(progressWriter, "assigning threads, resolving %d cyclic pending message-ids\n", len(pending)); err != nil {
 
544		return fmt.Errorf("writing progress: %v", err)
 
547	// Remaining messages in pending have cycles and possibly tails. The cycle is at
 
548	// the head of the thread. Once we resolve that, the rest of the thread can be
 
549	// resolved too. Ignoring self-references (duplicate messages), there can only be
 
550	// one cycle, and it is at the head. So we look for cycles, ignoring
 
551	// self-references, and resolve a message as soon as we see the cycle.
 
553	parent := map[string]string{} // Child Message-ID pointing to the parent Message-ID, excluding self-references.
 
554	pendlist := []string{}
 
555	for pmsgid, l := range pending {
 
556		pendlist = append(pendlist, pmsgid)
 
557		for _, k := range l {
 
558			if k.MessageID == pmsgid {
 
559				// No self-references for duplicate messages.
 
562			if _, ok := parent[k.MessageID]; !ok {
 
563				parent[k.MessageID] = pmsgid
 
565			// else, this message should be resolved by following pending.
 
568	sort.Strings(pendlist)
 
573		tx, err = a.DB.Begin(ctx, true)
 
575			return fmt.Errorf("begin transaction: %w", err)
 
579	// We walk through all messages of pendlist, but some will already have been
 
580	// resolved by the time we get to them.
 
581	done := map[string]bool{}
 
582	for _, msgid := range pendlist {
 
587		// We walk up to parent, until we see a message-id we've already seen, a cycle.
 
588		seen := map[string]bool{}
 
590			pmsgid, ok := parent[msgid]
 
592				panic(fmt.Sprintf("missing parent message-id %q, not a cycle?", msgid))
 
600			// Cycle detected. Make this message-id the thread root.
 
601			q := bstore.QueryTx[Message](tx)
 
602			q.FilterNonzero(Message{MessageID: msgid})
 
603			q.FilterEqual("ThreadID", int64(0))
 
604			q.FilterEqual("Expunged", false)
 
607			if err == nil && len(l) == 0 {
 
608				err = errors.New("no messages")
 
611				return fmt.Errorf("list message by message-id for cyclic thread root: %v", err)
 
613			for i, m := range l {
 
615				m.ThreadMissingLink = true
 
617					m.ThreadParentIDs = nil
 
618					l[0] = m // For resolvePending below.
 
620					assignParent(&m, l[0], false)
 
622				if slices.Contains(m.ThreadParentIDs, m.ID) {
 
623					panic(fmt.Sprintf("message id %d/%q contains itself in parents %v", m.ID, m.MessageID, m.ThreadParentIDs))
 
625				if err := tx.Update(&m); err != nil {
 
626					return fmt.Errorf("assigning threadid to cyclic thread root: %v", err)
 
630			// Mark all children as done so we don't process these messages again.
 
631			walk := map[string]struct{}{msgid: {}}
 
633				for msgid := range walk {
 
639					for _, mi := range pending[msgid] {
 
640						if !done[mi.MessageID] {
 
641							walk[mi.MessageID] = struct{}{}
 
647			// Resolve all messages in this thread.
 
648			if err := resolvePending(l[0], true); err != nil {
 
649				return fmt.Errorf("resolving cyclic children of cyclic thread root: %v", err)
 
656	// Check that there are no more messages without threadid.
 
657	q := bstore.QueryTx[Message](tx)
 
658	q.FilterEqual("ThreadID", int64(0))
 
659	q.FilterEqual("Expunged", false)
 
661	if err == nil && len(l) > 0 {
 
662		err = errors.New("found messages without threadid")
 
665		return fmt.Errorf("listing messages without threadid: %v", err)
 
672			return fmt.Errorf("commit resolving cyclic thread roots: %v", err)
 
678// lookupThreadMessage tries to find the parent message with messageID that must
 
679// have a matching subjectBase.
 
681// If the message isn't present (with a valid thread id), a nil message and nil
 
682// error is returned. The bool return value indicates if a message with the
 
683// message-id exists at all.
 
684func lookupThreadMessage(tx *bstore.Tx, mID int64, messageID, subjectBase string) (*Message, bool, error) {
 
685	q := bstore.QueryTx[Message](tx)
 
686	q.FilterNonzero(Message{MessageID: messageID})
 
687	q.FilterEqual("SubjectBase", subjectBase)
 
688	q.FilterEqual("Expunged", false)
 
689	q.FilterNotEqual("ID", mID)
 
693		return nil, false, fmt.Errorf("message-id %s: %w", messageID, err)
 
696	for _, tm := range l {
 
697		if tm.ThreadID != 0 {
 
698			return &tm, true, nil
 
701	return nil, exists, nil
 
704// lookupThreadMessageSubject looks up a parent/ancestor message for the message
 
705// thread based on a matching subject. The message must have been delivered to the same mailbox originally.
 
707// If no message (with a threadid) is found a nil message and nil error is returned.
 
708func lookupThreadMessageSubject(tx *bstore.Tx, m Message, subjectBase string) (*Message, error) {
 
709	q := bstore.QueryTx[Message](tx)
 
710	q.FilterGreater("Received", m.Received.Add(-4*7*24*time.Hour))
 
711	q.FilterLess("Received", m.Received.Add(1*24*time.Hour))
 
712	q.FilterNonzero(Message{SubjectBase: subjectBase, MailboxOrigID: m.MailboxOrigID})
 
713	q.FilterEqual("Expunged", false)
 
714	q.FilterNotEqual("ID", m.ID)
 
715	q.FilterNotEqual("ThreadID", int64(0))
 
716	q.SortDesc("Received")
 
719	if err == bstore.ErrAbsent {
 
721	} else if err != nil {
 
727func upgradeThreads(ctx context.Context, log mlog.Log, acc *Account, up *Upgrade) error {
 
728	log = log.With(slog.String("account", acc.Name))
 
731		// Step 1 in the threads upgrade is storing the canonicalized Message-ID for each
 
732		// message and the base subject for thread matching. This allows efficient thread
 
733		// lookup in the second step.
 
735		log.Info("upgrading account for threading, step 1/2: updating all messages with message-id and base subject")
 
738		const batchSize = 10000
 
739		total, err := acc.ResetThreading(ctx, log, batchSize, true)
 
741			return fmt.Errorf("resetting message threading fields: %v", err)
 
745		if err := acc.DB.Update(ctx, up); err != nil {
 
747			return fmt.Errorf("saving upgrade process while upgrading account to threads storage, step 1/2: %w", err)
 
749		log.Info("upgrading account for threading, step 1/2: completed", slog.Duration("duration", time.Since(t0)), slog.Int("messages", total))
 
753		// Step 2 of the upgrade is going through all messages and assigning threadid's.
 
754		// Lookup of messageid and base subject is now fast through indexed database
 
757		log.Info("upgrading account for threading, step 2/2: matching messages to threads")
 
760		const batchSize = 10000
 
761		if err := acc.AssignThreads(ctx, log, nil, 1, batchSize, io.Discard); err != nil {
 
762			return fmt.Errorf("upgrading to threads storage, step 2/2: %w", err)
 
765		if err := acc.DB.Update(ctx, up); err != nil {
 
767			return fmt.Errorf("saving upgrade process for thread storage, step 2/2: %w", err)
 
769		log.Info("upgrading account for threading, step 2/2: completed", slog.Duration("duration", time.Since(t0)))
 
772	// Note: Not bumping uidvalidity or setting modseq. Clients haven't been able to
 
773	// use threadid's before, so there is nothing to be out of date.