15	"github.com/mjl-/bstore"
 
17	"github.com/mjl-/mox/message"
 
18	"github.com/mjl-/mox/mlog"
 
19	"github.com/mjl-/mox/moxio"
 
22// Assign a new/incoming message to a thread. Message does not yet have an ID. If
 
23// this isn't a response, ThreadID should remain 0 (unless this is a message with
 
24// existing message-id) and the caller must set ThreadID to ID.
 
25// If the account is still busy upgrading messages with threadids in the background, parents
 
26// may have a threadid 0. That results in this message getting threadid 0, which
 
27// will handled by the background upgrade process assigning a threadid when it gets
 
29func assignThread(log mlog.Log, tx *bstore.Tx, m *Message, part *message.Part) error {
 
30	if m.MessageID != "" {
 
31		// Match against existing different message with same Message-ID.
 
32		q := bstore.QueryTx[Message](tx)
 
33		q.FilterNonzero(Message{MessageID: m.MessageID})
 
34		q.FilterEqual("Expunged", false)
 
35		q.FilterNotEqual("ID", m.ID)
 
36		q.FilterNotEqual("ThreadID", int64(0))
 
40		if err != nil && err != bstore.ErrAbsent {
 
41			return fmt.Errorf("looking up existing message with message-id: %v", err)
 
42		} else if err == nil {
 
43			assignParent(m, em, true)
 
48	h, err := part.Header()
 
50		log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
 
52	messageIDs, err := message.ReferencedIDs(h.Values("References"), h.Values("In-Reply-To"))
 
54		log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
 
56	for i := len(messageIDs) - 1; i >= 0; i-- {
 
57		messageID := messageIDs[i]
 
58		if messageID == m.MessageID {
 
61		tm, _, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase, m.DSN)
 
63			return fmt.Errorf("looking up thread message for new message: %v", err)
 
65			assignParent(m, *tm, true)
 
68		m.ThreadMissingLink = true
 
70	if len(messageIDs) > 0 {
 
75	if part != nil && part.Envelope != nil {
 
76		m.SubjectBase, isResp = message.ThreadSubject(part.Envelope.Subject, false)
 
78	if !isResp || m.SubjectBase == "" {
 
81	m.ThreadMissingLink = true
 
82	tm, err := lookupThreadMessageSubject(tx, *m, m.SubjectBase)
 
84		return fmt.Errorf("looking up thread message by subject: %v", err)
 
86		assignParent(m, *tm, true)
 
91// assignParent assigns threading fields to m that make it a child of parent message pm.
 
92// updateSeen indicates if m.Seen should be cleared if pm is thread-muted.
 
93func assignParent(m *Message, pm Message, updateSeen bool) {
 
95		panic(fmt.Sprintf("assigning message id %d/d%q to parent message id %d/%q which has threadid 0", m.ID, m.MessageID, pm.ID, pm.MessageID))
 
98		panic(fmt.Sprintf("trying to make message id %d/%q its own parent", m.ID, m.MessageID))
 
100	m.ThreadID = pm.ThreadID
 
101	// Make sure we don't add cycles.
 
102	if !slices.Contains(pm.ThreadParentIDs, m.ID) {
 
103		m.ThreadParentIDs = append([]int64{pm.ID}, pm.ThreadParentIDs...)
 
104	} else if pm.ID != m.ID {
 
105		m.ThreadParentIDs = []int64{pm.ID}
 
107		m.ThreadParentIDs = nil
 
109	if m.MessageID != "" && m.MessageID == pm.MessageID {
 
110		m.ThreadMissingLink = true
 
112	m.ThreadMuted = pm.ThreadMuted
 
113	m.ThreadCollapsed = pm.ThreadCollapsed
 
114	if updateSeen && m.ThreadMuted {
 
119// ResetThreading resets the MessageID and SubjectBase fields for all messages in
 
120// the account. If clearIDs is true, all Thread* fields are also cleared. Changes
 
121// are made in transactions of batchSize changes. The total number of updated
 
122// messages is returned.
 
124// ModSeq is not changed. Calles should bump the uid validity of the mailboxes
 
125// to propagate the changes to IMAP clients.
 
126func (a *Account) ResetThreading(ctx context.Context, log mlog.Log, batchSize int, clearIDs bool) (int, error) {
 
127	// todo: should this send Change events for ThreadMuted and ThreadCollapsed? worth it?
 
134		prepareMessages := func(in, out chan moxio.Work[Message, Message]) {
 
143				// We have the Message-ID and Subject headers in ParsedBuf. We use a partial part
 
144				// struct so we don't generate so much garbage for the garbage collector to sift
 
147					Envelope *message.Envelope
 
149				if err := json.Unmarshal(m.ParsedBuf, &part); err != nil {
 
150					log.Errorx("unmarshal json parsedbuf for setting message-id, skipping", err, slog.Int64("msgid", m.ID))
 
153					if part.Envelope != nil && part.Envelope.MessageID != "" {
 
154						s, _, err := message.MessageIDCanonical(part.Envelope.MessageID)
 
156							log.Debugx("parsing message-id, skipping", err, slog.Int64("msgid", m.ID), slog.String("messageid", part.Envelope.MessageID))
 
160					if part.Envelope != nil {
 
161						m.SubjectBase, _ = message.ThreadSubject(part.Envelope.Subject, false)
 
170		err := a.DB.Write(ctx, func(tx *bstore.Tx) error {
 
171			processMessage := func(in, m Message) error {
 
174					m.ThreadParentIDs = nil
 
175					m.ThreadMissingLink = false
 
180			// JSON parsing is relatively heavy, we benefit from multiple goroutines.
 
181			procs := runtime.GOMAXPROCS(0)
 
182			wq := moxio.NewWorkQueue[Message, Message](procs, 2*procs, prepareMessages, processMessage)
 
184			q := bstore.QueryTx[Message](tx)
 
185			q.FilterEqual("Expunged", false)
 
186			q.FilterGreater("ID", lastID)
 
188			err := q.ForEach(func(m Message) error {
 
189				// We process in batches so we don't block other operations for a long time.
 
191					return bstore.StopForEach
 
193				// Update starting point for next batch.
 
206			return total, fmt.Errorf("upgrading account to threads storage, step 1/2: %w", err)
 
216// AssignThreads assigns thread-related fields to messages with ID >=
 
217// startMessageID. Changes are committed each batchSize changes if txOpt is nil
 
218// (i.e. during automatic account upgrade, we don't want to block database access
 
219// for a long time). If txOpt is not nil, all changes are made in that
 
222// When resetting thread assignments, the caller must first clear the existing
 
225// Messages are processed in order of ID, so when added to the account, not
 
226// necessarily by received/date. Most threaded messages can immediately be matched
 
227// to their parent message. If not, we keep track of the missing message-id and
 
228// resolve as soon as we encounter it. At the end, we resolve all remaining
 
229// messages, they start with a cycle.
 
231// Does not set Seen flag for muted threads.
 
233// Progress is written to progressWriter, every 100k messages.
 
234func (a *Account) AssignThreads(ctx context.Context, log mlog.Log, txOpt *bstore.Tx, startMessageID int64, batchSize int, progressWriter io.Writer) error {
 
235	// We use a more basic version of the thread-matching algorithm describe in:
 
237	// The algorithm assumes you'll select messages, then group into threads. We normally do
 
238	// thread-calculation when messages are delivered. Here, we assign threads as soon
 
239	// as we can, but will queue messages that reference known ancestors and resolve as
 
240	// soon as we process them. We can handle large number of messages, but not very
 
241	// quickly because we make lots of database queries.
 
243	type childMsg struct {
 
244		ID                int64  // This message will be fetched and updated with the threading fields once the parent is resolved.
 
245		MessageID         string // Of child message. Once child is resolved, its own children can be resolved too.
 
246		ThreadMissingLink bool
 
248	// Messages that have a References/In-Reply-To that we want to set as parent, but
 
249	// where the parent doesn't have a ThreadID yet are added to pending. The key is
 
250	// the normalized MessageID of the parent, and the value is a list of messages that
 
251	// can get resolved once the parent gets its ThreadID. The kids will get the same
 
252	// ThreadIDs, and they themselves may be parents to kids, and so on.
 
253	// For duplicate messages (messages with identical Message-ID), the second
 
254	// Message-ID to be added to pending is added under its own message-id, so it gets
 
255	// its original as parent.
 
256	pending := map[string][]childMsg{}
 
258	// Current tx. If not equal to txOpt, we clean it up before we leave.
 
261		if tx != nil && tx != txOpt {
 
263			log.Check(err, "rolling back transaction")
 
267	// Set thread-related fields for a single message. Caller must save the message,
 
268	// only if not an error and not added to the pending list.
 
269	assign := func(m *Message, references, inReplyTo []string, subject string) (pend bool, rerr error) {
 
270		if m.MessageID != "" {
 
271			// Attempt to match against existing different message with same Message-ID that
 
272			// already has a threadid.
 
273			// If there are multiple messages for a message-id a future call to assign may use
 
274			// its threadid, or it may end up in pending and we resolve it when we need to.
 
275			q := bstore.QueryTx[Message](tx)
 
276			q.FilterNonzero(Message{MessageID: m.MessageID})
 
277			q.FilterEqual("Expunged", false)
 
278			q.FilterLess("ID", m.ID)
 
282			if err != nil && err != bstore.ErrAbsent {
 
283				return false, fmt.Errorf("looking up existing message with message-id: %v", err)
 
284			} else if err == nil {
 
285				if em.ThreadID == 0 {
 
286					pending[em.MessageID] = append(pending[em.MessageID], childMsg{m.ID, m.MessageID, true})
 
289					assignParent(m, em, false)
 
295		refids, err := message.ReferencedIDs(references, inReplyTo)
 
297			log.Errorx("assigning threads: parsing references/in-reply-to headers, not matching by message-id", err, slog.Int64("msgid", m.ID))
 
300		for i := len(refids) - 1; i >= 0; i-- {
 
301			messageID := refids[i]
 
302			if messageID == m.MessageID {
 
305			tm, exists, err := lookupThreadMessage(tx, m.ID, messageID, m.SubjectBase, m.DSN)
 
307				return false, fmt.Errorf("lookup up thread by message-id %s for message id %d: %w", messageID, m.ID, err)
 
308			} else if tm != nil {
 
309				assignParent(m, *tm, false)
 
312				pending[messageID] = append(pending[messageID], childMsg{m.ID, m.MessageID, i < len(refids)-1})
 
317		var subjectBase string
 
320			subjectBase, isResp = message.ThreadSubject(subject, false)
 
322		if len(refids) > 0 || !isResp || subjectBase == "" {
 
324			m.ThreadMissingLink = len(refids) > 0
 
328		// No references to use. If this is a reply/forward (based on subject), we'll match
 
329		// against base subject, at most 4 weeks back so we don't match against ancient
 
330		// messages and 1 day ahead so we can match against delayed deliveries.
 
331		tm, err := lookupThreadMessageSubject(tx, *m, subjectBase)
 
333			return false, fmt.Errorf("looking up recent messages by base subject %q: %w", subjectBase, err)
 
334		} else if tm != nil {
 
335			m.ThreadID = tm.ThreadID
 
336			m.ThreadParentIDs = []int64{tm.ThreadID} // Always under root message with subject-match.
 
337			m.ThreadMissingLink = true
 
338			m.ThreadMuted = tm.ThreadMuted
 
339			m.ThreadCollapsed = tm.ThreadCollapsed
 
346	npendingResolved := 0
 
348	// Resolve pending messages that wait on m.MessageID to be resolved, recursively.
 
349	var resolvePending func(tm Message, cyclic bool) error
 
350	resolvePending = func(tm Message, cyclic bool) error {
 
351		if tm.MessageID == "" {
 
354		l := pending[tm.MessageID]
 
355		delete(pending, tm.MessageID)
 
356		for _, mi := range l {
 
357			m := Message{ID: mi.ID}
 
358			if err := tx.Get(&m); err != nil {
 
359				return fmt.Errorf("get message %d for resolving pending thread for message-id %s, %d: %w", mi.ID, tm.MessageID, tm.ID, err)
 
362				// ThreadID already set because this is a cyclic message. If we would assign a
 
363				// parent again, we would create a cycle.
 
364				if m.MessageID != tm.MessageID && !cyclic {
 
365					panic(fmt.Sprintf("threadid already set (%d) while handling non-cyclic message id %d/%q and with different message-id %q as parent message id %d", m.ThreadID, m.ID, m.MessageID, tm.MessageID, tm.ID))
 
369			assignParent(&m, tm, false)
 
370			m.ThreadMissingLink = mi.ThreadMissingLink
 
371			if err := tx.Update(&m); err != nil {
 
372				return fmt.Errorf("update message %d for resolving pending thread for message-id %s, %d: %w", mi.ID, tm.MessageID, tm.ID, err)
 
374			if err := resolvePending(m, cyclic); err != nil {
 
382	// Output of the worker goroutines.
 
383	type threadPrep struct {
 
389	// Single allocation.
 
390	threadingFields := [][]byte{
 
391		[]byte("references"),
 
392		[]byte("in-reply-to"),
 
396	// Worker goroutine function. We start with a reasonably large buffer for reading
 
397	// the header into. And we have scratch space to copy the needed headers into. That
 
398	// means we normally won't allocate any more buffers.
 
399	prepareMessages := func(in, out chan moxio.Work[Message, threadPrep]) {
 
400		headerbuf := make([]byte, 8*1024)
 
401		scratch := make([]byte, 4*1024)
 
409			var partialPart struct {
 
413			if err := json.Unmarshal(m.ParsedBuf, &partialPart); err != nil {
 
414				w.Err = fmt.Errorf("unmarshal part: %v", err)
 
416				size := partialPart.BodyOffset - partialPart.HeaderOffset
 
417				if int(size) > len(headerbuf) {
 
418					headerbuf = make([]byte, size)
 
421					buf := headerbuf[:int(size)]
 
422					err := func() error {
 
423						mr := a.MessageReader(m)
 
426						// ReadAt returns whole buffer or error. Single read should be fast.
 
427						n, err := mr.ReadAt(buf, partialPart.HeaderOffset)
 
428						if err != nil || n != len(buf) {
 
429							return fmt.Errorf("read header: %v", err)
 
435					} else if h, err := message.ParseHeaderFields(buf, scratch, threadingFields); err != nil {
 
438						w.Out.references = h["References"]
 
439						w.Out.inReplyTo = h["In-Reply-To"]
 
452	// Assign threads to messages, possibly in batches.
 
459			tx, err = a.DB.Begin(ctx, true)
 
461				return fmt.Errorf("begin transaction: %w", err)
 
465		processMessage := func(m Message, prep threadPrep) error {
 
466			pend, err := assign(&m, prep.references, prep.inReplyTo, prep.subject)
 
468				return fmt.Errorf("for msgid %d: %w", m.ID, err)
 
473				panic(fmt.Sprintf("no threadid after assign of message id %d/%q", m.ID, m.MessageID))
 
475			// Fields have been set, store in database and resolve messages waiting for this MessageID.
 
476			if slices.Contains(m.ThreadParentIDs, m.ID) {
 
477				panic(fmt.Sprintf("message id %d/%q contains itself in parent ids %v", m.ID, m.MessageID, m.ThreadParentIDs))
 
479			if err := tx.Update(&m); err != nil {
 
482			if err := resolvePending(m, false); err != nil {
 
483				return fmt.Errorf("resolving pending message-id: %v", err)
 
488		// Use multiple worker goroutines to parse headers from on-disk messages.
 
489		procs := runtime.GOMAXPROCS(0)
 
490		wq := moxio.NewWorkQueue[Message, threadPrep](2*procs, 4*procs, prepareMessages, processMessage)
 
492		// We assign threads in order by ID, so messages delivered in between our
 
493		// transaction will get assigned threads too: they'll have the highest id's.
 
494		q := bstore.QueryTx[Message](tx)
 
495		q.FilterGreaterEqual("ID", startMessageID)
 
496		q.FilterEqual("Expunged", false)
 
498		err := q.ForEach(func(m Message) error {
 
499			// Batch number of changes, so we give other users of account a change to run.
 
500			if txOpt == nil && n >= batchSize {
 
501				return bstore.StopForEach
 
503			// Starting point for next batch.
 
504			startMessageID = m.ID + 1
 
505			// Don't process again. Can happen when earlier upgrade was aborted.
 
518		if err == nil && txOpt == nil {
 
523			return fmt.Errorf("assigning threads: %w", err)
 
529		if nassigned%100000 == 0 {
 
530			log.Debug("assigning threads, progress", slog.Int("count", nassigned), slog.Int("unresolved", len(pending)))
 
531			if _, err := fmt.Fprintf(progressWriter, "assigning threads, progress: %d messages\n", nassigned); err != nil {
 
532				return fmt.Errorf("writing progress: %v", err)
 
536	if _, err := fmt.Fprintf(progressWriter, "assigning threads, done: %d messages\n", nassigned); err != nil {
 
537		return fmt.Errorf("writing progress: %v", err)
 
540	log.Debug("assigning threads, mostly done, finishing with resolving of cyclic messages", slog.Int("count", nassigned), slog.Int("unresolved", len(pending)))
 
542	if _, err := fmt.Fprintf(progressWriter, "assigning threads, resolving %d cyclic pending message-ids\n", len(pending)); err != nil {
 
543		return fmt.Errorf("writing progress: %v", err)
 
546	// Remaining messages in pending have cycles and possibly tails. The cycle is at
 
547	// the head of the thread. Once we resolve that, the rest of the thread can be
 
548	// resolved too. Ignoring self-references (duplicate messages), there can only be
 
549	// one cycle, and it is at the head. So we look for cycles, ignoring
 
550	// self-references, and resolve a message as soon as we see the cycle.
 
552	parent := map[string]string{} // Child Message-ID pointing to the parent Message-ID, excluding self-references.
 
553	pendlist := []string{}
 
554	for pmsgid, l := range pending {
 
555		pendlist = append(pendlist, pmsgid)
 
556		for _, k := range l {
 
557			if k.MessageID == pmsgid {
 
558				// No self-references for duplicate messages.
 
561			if _, ok := parent[k.MessageID]; !ok {
 
562				parent[k.MessageID] = pmsgid
 
564			// else, this message should be resolved by following pending.
 
567	sort.Strings(pendlist)
 
572		tx, err = a.DB.Begin(ctx, true)
 
574			return fmt.Errorf("begin transaction: %w", err)
 
578	// We walk through all messages of pendlist, but some will already have been
 
579	// resolved by the time we get to them.
 
580	done := map[string]bool{}
 
581	for _, msgid := range pendlist {
 
586		// We walk up to parent, until we see a message-id we've already seen, a cycle.
 
587		seen := map[string]bool{}
 
589			pmsgid, ok := parent[msgid]
 
591				panic(fmt.Sprintf("missing parent message-id %q, not a cycle?", msgid))
 
599			// Cycle detected. Make this message-id the thread root.
 
600			q := bstore.QueryTx[Message](tx)
 
601			q.FilterNonzero(Message{MessageID: msgid})
 
602			q.FilterEqual("ThreadID", int64(0))
 
603			q.FilterEqual("Expunged", false)
 
606			if err == nil && len(l) == 0 {
 
607				err = errors.New("no messages")
 
610				return fmt.Errorf("list message by message-id for cyclic thread root: %v", err)
 
612			for i, m := range l {
 
614				m.ThreadMissingLink = true
 
616					m.ThreadParentIDs = nil
 
617					l[0] = m // For resolvePending below.
 
619					assignParent(&m, l[0], false)
 
621				if slices.Contains(m.ThreadParentIDs, m.ID) {
 
622					panic(fmt.Sprintf("message id %d/%q contains itself in parents %v", m.ID, m.MessageID, m.ThreadParentIDs))
 
624				if err := tx.Update(&m); err != nil {
 
625					return fmt.Errorf("assigning threadid to cyclic thread root: %v", err)
 
629			// Mark all children as done so we don't process these messages again.
 
630			walk := map[string]struct{}{msgid: {}}
 
632				for msgid := range walk {
 
638					for _, mi := range pending[msgid] {
 
639						if !done[mi.MessageID] {
 
640							walk[mi.MessageID] = struct{}{}
 
646			// Resolve all messages in this thread.
 
647			if err := resolvePending(l[0], true); err != nil {
 
648				return fmt.Errorf("resolving cyclic children of cyclic thread root: %v", err)
 
655	// Check that there are no more messages without threadid.
 
656	q := bstore.QueryTx[Message](tx)
 
657	q.FilterEqual("ThreadID", int64(0))
 
658	q.FilterEqual("Expunged", false)
 
660	if err == nil && len(l) > 0 {
 
661		err = errors.New("found messages without threadid")
 
664		return fmt.Errorf("listing messages without threadid: %v", err)
 
671			return fmt.Errorf("commit resolving cyclic thread roots: %v", err)
 
677// lookupThreadMessage tries to find the parent message with messageID, that must
 
678// have a matching subjectBase (unless it is a DSN).
 
680// If the message isn't present (with a valid thread id), a nil message and nil
 
681// error is returned. The bool return value indicates if a message with the
 
682// message-id exists at all.
 
683func lookupThreadMessage(tx *bstore.Tx, mID int64, messageID, subjectBase string, isDSN bool) (*Message, bool, error) {
 
684	q := bstore.QueryTx[Message](tx)
 
685	q.FilterNonzero(Message{MessageID: messageID})
 
687		q.FilterEqual("SubjectBase", subjectBase)
 
689	q.FilterEqual("Expunged", false)
 
690	q.FilterNotEqual("ID", mID)
 
694		return nil, false, fmt.Errorf("message-id %s: %w", messageID, err)
 
697	for _, tm := range l {
 
698		if tm.ThreadID != 0 {
 
699			return &tm, true, nil
 
702	return nil, exists, nil
 
705// lookupThreadMessageSubject looks up a parent/ancestor message for the message
 
706// thread based on a matching subject. The message must have been delivered to the same mailbox originally.
 
708// If no message (with a threadid) is found a nil message and nil error is returned.
 
709func lookupThreadMessageSubject(tx *bstore.Tx, m Message, subjectBase string) (*Message, error) {
 
710	q := bstore.QueryTx[Message](tx)
 
711	q.FilterGreater("Received", m.Received.Add(-4*7*24*time.Hour))
 
712	q.FilterLess("Received", m.Received.Add(1*24*time.Hour))
 
713	q.FilterNonzero(Message{SubjectBase: subjectBase, MailboxOrigID: m.MailboxOrigID})
 
714	q.FilterEqual("Expunged", false)
 
715	q.FilterNotEqual("ID", m.ID)
 
716	q.FilterNotEqual("ThreadID", int64(0))
 
717	q.SortDesc("Received")
 
720	if err == bstore.ErrAbsent {
 
722	} else if err != nil {
 
728func upgradeThreads(ctx context.Context, log mlog.Log, acc *Account, up *Upgrade) error {
 
729	log = log.With(slog.String("account", acc.Name))
 
732		// Step 1 in the threads upgrade is storing the canonicalized Message-ID for each
 
733		// message and the base subject for thread matching. This allows efficient thread
 
734		// lookup in the second step.
 
736		log.Info("upgrading account for threading, step 1/2: updating all messages with message-id and base subject")
 
739		const batchSize = 10000
 
740		total, err := acc.ResetThreading(ctx, log, batchSize, true)
 
742			return fmt.Errorf("resetting message threading fields: %v", err)
 
746		if err := acc.DB.Update(ctx, up); err != nil {
 
748			return fmt.Errorf("saving upgrade process while upgrading account to threads storage, step 1/2: %w", err)
 
750		log.Info("upgrading account for threading, step 1/2: completed", slog.Duration("duration", time.Since(t0)), slog.Int("messages", total))
 
754		// Step 2 of the upgrade is going through all messages and assigning threadid's.
 
755		// Lookup of messageid and base subject is now fast through indexed database
 
758		log.Info("upgrading account for threading, step 2/2: matching messages to threads")
 
761		const batchSize = 10000
 
762		if err := acc.AssignThreads(ctx, log, nil, 1, batchSize, io.Discard); err != nil {
 
763			return fmt.Errorf("upgrading to threads storage, step 2/2: %w", err)
 
766		if err := acc.DB.Update(ctx, up); err != nil {
 
768			return fmt.Errorf("saving upgrade process for thread storage, step 2/2: %w", err)
 
770		log.Info("upgrading account for threading, step 2/2: completed", slog.Duration("duration", time.Since(t0)))
 
773	// Note: Not bumping uidvalidity or setting modseq. Clients haven't been able to
 
774	// use threadid's before, so there is nothing to be out of date.