20 "github.com/mjl-/bstore"
22 "github.com/mjl-/mox/dns"
23 "github.com/mjl-/mox/message"
24 "github.com/mjl-/mox/metrics"
25 "github.com/mjl-/mox/mlog"
26 "github.com/mjl-/mox/mox-"
27 "github.com/mjl-/mox/queue"
28 "github.com/mjl-/mox/smtp"
29 "github.com/mjl-/mox/store"
32// ctl represents a connection to the ctl unix domain socket of a running mox instance.
33// ctl provides functions to read/write commands/responses/data streams.
35 cmd string // Set for server-side of commands.
37 r *bufio.Reader // Set for first reader.
38 x any // If set, errors are handled by calling panic(x) instead of log.Fatal.
39 log mlog.Log // If set, along with x, logging is done here.
42// xctl opens a ctl connection.
44 p := mox.DataDirPath("ctl")
45 conn, err := net.Dial("unix", p)
47 log.Fatalf("connecting to control socket at %q: %v", p, err)
49 ctl := &ctl{conn: conn}
50 version := ctl.xread()
51 if version != "ctlv0" {
52 log.Fatalf("ctl protocol mismatch, got %q, expected ctlv0", version)
57// Interpret msg as an error.
58// If ctl.x is set, the string is also written to the ctl to be interpreted as error by the other party.
59func (c *ctl) xerror(msg string) {
63 c.log.Debugx("ctl error", fmt.Errorf("%s", msg), slog.String("cmd", c.cmd))
68// Check if err is not nil. If so, handle error through ctl.x or log.Fatal. If
69// ctl.x is set, the error string is written to ctl, to be interpreted as an error
70// by the command reading from ctl.
71func (c *ctl) xcheck(err error, msg string) {
76 log.Fatalf("%s: %s", msg, err)
78 c.log.Debugx(msg, err, slog.String("cmd", c.cmd))
79 fmt.Fprintf(c.conn, "%s: %s\n", msg, err)
83// Read a line and return it without trailing newline.
84func (c *ctl) xread() string {
86 c.r = bufio.NewReader(c.conn)
88 line, err := c.r.ReadString('\n')
89 c.xcheck(err, "read from ctl")
90 return strings.TrimSuffix(line, "\n")
93// Read a line. If not "ok", the string is interpreted as an error.
94func (c *ctl) xreadok() {
101// Write a string, typically a command or parameter.
102func (c *ctl) xwrite(text string) {
103 _, err := fmt.Fprintln(c.conn, text)
104 c.xcheck(err, "write")
107// Write "ok" to indicate success.
108func (c *ctl) xwriteok() {
112// Copy data from a stream from ctl to dst.
113func (c *ctl) xstreamto(dst io.Writer) {
114 _, err := io.Copy(dst, c.reader())
115 c.xcheck(err, "reading message")
118// Copy data from src to a stream to ctl.
119func (c *ctl) xstreamfrom(src io.Reader) {
121 _, err := io.Copy(w, src)
122 c.xcheck(err, "copying")
126// Writer returns an io.Writer for a data stream to ctl.
127// When done writing, caller must call xclose to signal the end of the stream.
128// Behaviour of "x" is copied from ctl.
129func (c *ctl) writer() *ctlwriter {
130 return &ctlwriter{cmd: c.cmd, conn: c.conn, x: c.x, log: c.log}
133// Reader returns an io.Reader for a data stream from ctl.
134// Behaviour of "x" is copied from ctl.
135func (c *ctl) reader() *ctlreader {
137 c.r = bufio.NewReader(c.conn)
139 return &ctlreader{cmd: c.cmd, conn: c.conn, r: c.r, x: c.x, log: c.log}
143Ctlwriter and ctlreader implement the writing and reading a data stream. They
144implement the io.Writer and io.Reader interface. In the protocol below each
145non-data message ends with a newline that is typically stripped when
148Zero or more data transactions:
150 > "123" (for data size) or an error message
152 < "ok" or an error message
154Followed by a end of stream indicated by zero data bytes message:
159type ctlwriter struct {
160 cmd string // Set for server-side of commands.
161 conn net.Conn // Ctl socket from which messages are read.
162 buf []byte // Scratch buffer, for reading response.
163 x any // If not nil, errors in Write and xcheckf are handled with panic(x), otherwise with a log.Fatal.
167func (s *ctlwriter) Write(buf []byte) (int, error) {
168 _, err := fmt.Fprintf(s.conn, "%d\n", len(buf))
169 s.xcheck(err, "write count")
170 _, err = s.conn.Write(buf)
171 s.xcheck(err, "write data")
173 s.buf = make([]byte, 512)
175 n, err := s.conn.Read(s.buf)
176 s.xcheck(err, "reading response to write")
177 line := strings.TrimSuffix(string(s.buf[:n]), "\n")
184func (s *ctlwriter) xerror(msg string) {
188 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
193func (s *ctlwriter) xcheck(err error, msg string) {
198 log.Fatalf("%s: %s", msg, err)
200 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
205func (s *ctlwriter) xclose() {
206 _, err := fmt.Fprintf(s.conn, "0\n")
207 s.xcheck(err, "write eof")
210type ctlreader struct {
211 cmd string // Set for server-side of command.
212 conn net.Conn // For writing "ok" after reading.
213 r *bufio.Reader // Buffered ctl socket.
214 err error // If set, returned for each read. can also be io.EOF.
215 npending int // Number of bytes that can still be read until a new count line must be read.
216 x any // If set, errors are handled with panic(x) instead of log.Fatal.
217 log mlog.Log // If x is set, logging goes to log.
220func (s *ctlreader) Read(buf []byte) (N int, Err error) {
225 line, err := s.r.ReadString('\n')
226 s.xcheck(err, "reading count")
227 line = strings.TrimSuffix(line, "\n")
228 n, err := strconv.ParseInt(line, 10, 32)
242 n, err := s.r.Read(buf[:rn])
243 s.xcheck(err, "read from ctl")
246 _, err = fmt.Fprintln(s.conn, "ok")
247 s.xcheck(err, "writing ok after reading")
252func (s *ctlreader) xerror(msg string) {
256 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
261func (s *ctlreader) xcheck(err error, msg string) {
266 log.Fatalf("%s: %s", msg, err)
268 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
273// servectl handles requests on the unix domain socket "ctl", e.g. for graceful shutdown, local mail delivery.
274func servectl(ctx context.Context, log mlog.Log, conn net.Conn, shutdown func()) {
275 log.Debug("ctl connection")
277 var stop = struct{}{} // Sentinel value for panic and recover.
278 ctl := &ctl{conn: conn, x: stop, log: log}
281 if x == nil || x == stop {
284 log.Error("servectl panic", slog.Any("err", x), slog.String("cmd", ctl.cmd))
286 metrics.PanicInc(metrics.Ctl)
293 servectlcmd(ctx, ctl, shutdown)
297func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
301 log.Info("ctl command", slog.String("cmd", cmd))
308 /* The protocol, double quoted are literals.
318 a, addr, err := store.OpenEmail(ctl.log, to)
319 ctl.xcheck(err, "lookup destination address")
321 msgFile, err := store.CreateMessageTemp(ctl.log, "ctl-deliver")
322 ctl.xcheck(err, "creating temporary message file")
323 defer store.CloseRemoveTempFile(log, msgFile, "deliver message")
324 mw := message.NewWriter(msgFile)
329 ctl.xcheck(err, "syncing message to storage")
332 Received: time.Now(),
337 err := a.DeliverDestination(log, addr, m, msgFile)
338 ctl.xcheck(err, "delivering message")
339 log.Info("message delivered through ctl", slog.Any("to", to))
343 ctl.xcheck(err, "closing account")
346 case "setaccountpassword":
348 > "setaccountpassword"
354 account := ctl.xread()
357 acc, err := store.OpenAccount(ctl.log, account)
358 ctl.xcheck(err, "open account")
362 log.Check(err, "closing account after setting password")
366 err = acc.SetPassword(ctl.log, pw)
367 ctl.xcheck(err, "setting password")
369 ctl.xcheck(err, "closing account")
379 qmsgs, err := queue.List(ctx)
380 ctl.xcheck(err, "listing queue")
384 fmt.Fprintln(xw, "queue:")
385 for _, qm := range qmsgs {
386 var lastAttempt string
387 if qm.LastAttempt != nil {
388 lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
390 fmt.Fprintf(xw, "%5d %s from:%s to:%s next %s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), qm.Sender().LogString(), qm.Recipient().LogString(), -time.Since(qm.NextAttempt).Round(time.Second), lastAttempt, qm.LastError)
393 fmt.Fprint(xw, "(empty)\n")
403 > transport // if empty, transport is left unchanged; in future, we may want to differtiate between "leave unchanged" and "set to empty string".
409 todomain := ctl.xread()
410 recipient := ctl.xread()
411 transport := ctl.xread()
412 id, err := strconv.ParseInt(idstr, 10, 64)
415 ctl.xcheck(err, "parsing id")
418 var xtransport *string
420 xtransport = &transport
422 count, err := queue.Kick(ctx, id, todomain, recipient, xtransport)
423 ctl.xcheck(err, "kicking queue")
424 ctl.xwrite(fmt.Sprintf("%d", count))
438 todomain := ctl.xread()
439 recipient := ctl.xread()
440 id, err := strconv.ParseInt(idstr, 10, 64)
443 ctl.xcheck(err, "parsing id")
446 count, err := queue.Drop(ctx, ctl.log, id, todomain, recipient)
447 ctl.xcheck(err, "dropping messages from queue")
448 ctl.xwrite(fmt.Sprintf("%d", count))
460 id, err := strconv.ParseInt(idstr, 10, 64)
462 ctl.xcheck(err, "parsing id")
464 mr, err := queue.OpenMessage(ctx, id)
465 ctl.xcheck(err, "opening message")
468 log.Check(err, "closing message from queue")
473 case "importmaildir", "importmbox":
474 mbox := cmd == "importmbox"
475 importctl(ctx, ctl, mbox)
485 domain := ctl.xread()
486 account := ctl.xread()
487 localpart := ctl.xread()
488 d, err := dns.ParseDomain(domain)
489 ctl.xcheck(err, "parsing domain")
490 err = mox.DomainAdd(ctx, d, account, smtp.Localpart(localpart))
491 ctl.xcheck(err, "adding domain")
500 domain := ctl.xread()
501 d, err := dns.ParseDomain(domain)
502 ctl.xcheck(err, "parsing domain")
503 err = mox.DomainRemove(ctx, d)
504 ctl.xcheck(err, "removing domain")
514 account := ctl.xread()
515 address := ctl.xread()
516 err := mox.AccountAdd(ctx, account, address)
517 ctl.xcheck(err, "adding account")
526 account := ctl.xread()
527 err := mox.AccountRemove(ctx, account)
528 ctl.xcheck(err, "removing account")
538 address := ctl.xread()
539 account := ctl.xread()
540 err := mox.AddressAdd(ctx, address, account)
541 ctl.xcheck(err, "adding address")
550 address := ctl.xread()
551 err := mox.AddressRemove(ctx, address)
552 ctl.xcheck(err, "removing address")
562 l := mox.Conf.LogLevels()
565 keys = append(keys, k)
567 sort.Slice(keys, func(i, j int) bool {
568 return keys[i] < keys[j]
571 for _, k := range keys {
576 s += ks + ": " + mlog.LevelStrings[l[k]] + "\n"
578 ctl.xstreamfrom(strings.NewReader(s))
584 > level (if empty, log level for pkg will be unset)
588 levelstr := ctl.xread()
590 mox.Conf.LogLevelRemove(ctl.log, pkg)
592 level, ok := mlog.Levels[levelstr]
594 ctl.xerror("bad level")
596 mox.Conf.LogLevelSet(ctl.log, pkg, level)
606 account := ctl.xread()
607 acc, err := store.OpenAccount(ctl.log, account)
608 ctl.xcheck(err, "open account")
612 log.Check(err, "closing account after retraining")
616 acc.WithWLock(func() {
617 conf, _ := acc.Conf()
618 if conf.JunkFilter == nil {
619 ctl.xcheck(store.ErrNoJunkFilter, "looking for junk filter")
622 // Remove existing junk filter files.
623 basePath := mox.DataDirPath("accounts")
624 dbPath := filepath.Join(basePath, acc.Name, "junkfilter.db")
625 bloomPath := filepath.Join(basePath, acc.Name, "junkfilter.bloom")
626 err := os.Remove(dbPath)
627 log.Check(err, "removing old junkfilter database file", slog.String("path", dbPath))
628 err = os.Remove(bloomPath)
629 log.Check(err, "removing old junkfilter bloom filter file", slog.String("path", bloomPath))
631 // Open junk filter, this creates new files.
632 jf, _, err := acc.OpenJunkFilter(ctx, ctl.log)
633 ctl.xcheck(err, "open new junk filter")
639 log.Check(err, "closing junk filter during cleanup")
642 // Read through messages with junk or nonjunk flag set, and train them.
643 var total, trained int
644 q := bstore.QueryDB[store.Message](ctx, acc.DB)
645 q.FilterEqual("Expunged", false)
646 err = q.ForEach(func(m store.Message) error {
648 ok, err := acc.TrainMessage(ctx, ctl.log, jf, m)
654 ctl.xcheck(err, "training messages")
655 ctl.log.Info("retrained messages", slog.Int("total", total), slog.Int("trained", trained))
657 // Close junk filter, marking success.
660 ctl.xcheck(err, "closing junk filter")
664 case "recalculatemailboxcounts":
666 > "recalculatemailboxcounts"
671 account := ctl.xread()
672 acc, err := store.OpenAccount(ctl.log, account)
673 ctl.xcheck(err, "open account")
677 log.Check(err, "closing account after recalculating mailbox counts")
684 acc.WithWLock(func() {
685 var changes []store.Change
686 err = acc.DB.Write(ctx, func(tx *bstore.Tx) error {
688 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
689 mc, err := mb.CalculateCounts(tx)
691 return fmt.Errorf("calculating counts for mailbox %q: %w", mb.Name, err)
695 if !mb.HaveCounts || mc != mb.MailboxCounts {
696 _, err := fmt.Fprintf(w, "for %s setting new counts %s (was %s)\n", mb.Name, mc, mb.MailboxCounts)
697 ctl.xcheck(err, "write")
699 mb.MailboxCounts = mc
700 if err := tx.Update(&mb); err != nil {
701 return fmt.Errorf("storing new counts for %q: %v", mb.Name, err)
703 changes = append(changes, mb.ChangeCounts())
711 du := store.DiskUsage{ID: 1}
712 if err := tx.Get(&du); err != nil {
713 return fmt.Errorf("get disk usage: %v", err)
715 if du.MessageSize != totalSize {
716 _, err := fmt.Fprintf(w, "setting new total message size %d (was %d)\n", totalSize, du.MessageSize)
717 ctl.xcheck(err, "write")
718 du.MessageSize = totalSize
719 if err := tx.Update(&du); err != nil {
720 return fmt.Errorf("update disk usage: %v", err)
725 ctl.xcheck(err, "write transaction for mailbox counts")
727 store.BroadcastChanges(acc, changes)
739 accountOpt := ctl.xread()
743 var foundProblem bool
744 const batchSize = 10000
746 xfixmsgsize := func(accName string) {
747 acc, err := store.OpenAccount(ctl.log, accName)
748 ctl.xcheck(err, "open account")
751 log.Check(err, "closing account after fixing message sizes")
759 acc.WithRLock(func() {
760 mailboxCounts := map[int64]store.Mailbox{} // For broadcasting.
762 // Don't process all message in one transaction, we could block the account for too long.
763 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
764 q := bstore.QueryTx[store.Message](tx)
765 q.FilterEqual("Expunged", false)
766 q.FilterGreater("ID", lastID)
769 return q.ForEach(func(m store.Message) error {
773 p := acc.MessagePath(m.ID)
774 st, err := os.Stat(p)
776 mb := store.Mailbox{ID: m.MailboxID}
777 if xerr := tx.Get(&mb); xerr != nil {
778 _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file error: %v\n", mb.ID, xerr)
779 ctl.xcheck(werr, "write")
781 _, werr := fmt.Fprintf(w, "checking file %s for message %d in mailbox %q (id %d): %v (continuing)\n", p, m.ID, mb.Name, mb.ID, err)
782 ctl.xcheck(werr, "write")
785 filesize := st.Size()
786 correctSize := int64(len(m.MsgPrefix)) + filesize
787 if m.Size == correctSize {
793 mb := store.Mailbox{ID: m.MailboxID}
794 if err := tx.Get(&mb); err != nil {
795 _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file size mismatch: %v\n", mb.ID, err)
796 ctl.xcheck(werr, "write")
798 _, err = fmt.Fprintf(w, "fixing message %d in mailbox %q (id %d) with incorrect size %d, should be %d (len msg prefix %d + on-disk file %s size %d)\n", m.ID, mb.Name, mb.ID, m.Size, correctSize, len(m.MsgPrefix), p, filesize)
799 ctl.xcheck(err, "write")
801 // We assume that the original message size was accounted as stored in the mailbox
802 // total size. If this isn't correct, the user can always run
803 // recalculatemailboxcounts.
805 mb.Size += correctSize
806 if err := tx.Update(&mb); err != nil {
807 return fmt.Errorf("update mailbox counts: %v", err)
809 mailboxCounts[mb.ID] = mb
813 mr := acc.MessageReader(m)
814 part, err := message.EnsurePart(log.Logger, false, mr, m.Size)
816 _, werr := fmt.Fprintf(w, "parsing message %d again: %v (continuing)\n", m.ID, err)
817 ctl.xcheck(werr, "write")
819 m.ParsedBuf, err = json.Marshal(part)
821 return fmt.Errorf("marshal parsed message: %v", err)
824 if err := tx.Update(&m); err != nil {
825 return fmt.Errorf("update message: %v", err)
831 ctl.xcheck(err, "find and fix wrong message sizes")
833 var changes []store.Change
834 for _, mb := range mailboxCounts {
835 changes = append(changes, mb.ChangeCounts())
837 store.BroadcastChanges(acc, changes)
843 _, err = fmt.Fprintf(w, "%d message size(s) fixed for account %s\n", total, accName)
844 ctl.xcheck(err, "write")
847 if accountOpt != "" {
848 xfixmsgsize(accountOpt)
850 for i, accName := range mox.Conf.Accounts() {
855 _, err := fmt.Fprintf(w, "%sFixing message sizes in account %s...\n", line, accName)
856 ctl.xcheck(err, "write")
861 _, err := fmt.Fprintf(w, "\nProblems were found and fixed. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
862 ctl.xcheck(err, "write")
875 accountOpt := ctl.xread()
879 const batchSize = 100
881 xreparseAccount := func(accName string) {
882 acc, err := store.OpenAccount(ctl.log, accName)
883 ctl.xcheck(err, "open account")
886 log.Check(err, "closing account after reparsing messages")
893 // Don't process all message in one transaction, we could block the account for too long.
894 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
895 q := bstore.QueryTx[store.Message](tx)
896 q.FilterEqual("Expunged", false)
897 q.FilterGreater("ID", lastID)
900 return q.ForEach(func(m store.Message) error {
902 mr := acc.MessageReader(m)
903 p, err := message.EnsurePart(log.Logger, false, mr, m.Size)
905 _, err := fmt.Fprintf(w, "parsing message %d: %v (continuing)\n", m.ID, err)
906 ctl.xcheck(err, "write")
908 m.ParsedBuf, err = json.Marshal(p)
910 return fmt.Errorf("marshal parsed message: %v", err)
914 if err := tx.Update(&m); err != nil {
915 return fmt.Errorf("update message: %v", err)
921 ctl.xcheck(err, "update messages with parsed mime structure")
926 _, err = fmt.Fprintf(w, "%d message(s) reparsed for account %s\n", total, accName)
927 ctl.xcheck(err, "write")
930 if accountOpt != "" {
931 xreparseAccount(accountOpt)
933 for i, accName := range mox.Conf.Accounts() {
938 _, err := fmt.Fprintf(w, "%sReparsing account %s...\n", line, accName)
939 ctl.xcheck(err, "write")
940 xreparseAccount(accName)
945 case "reassignthreads":
953 accountOpt := ctl.xread()
957 xreassignThreads := func(accName string) {
958 acc, err := store.OpenAccount(ctl.log, accName)
959 ctl.xcheck(err, "open account")
962 log.Check(err, "closing account after reassigning threads")
965 // We don't want to step on an existing upgrade process.
966 err = acc.ThreadingWait(ctl.log)
967 ctl.xcheck(err, "waiting for threading upgrade to finish")
968 // todo: should we try to continue if the threading upgrade failed? only if there is a chance it will succeed this time...
970 // todo: reassigning isn't atomic (in a single transaction), ideally it would be (bstore would need to be able to handle large updates).
971 const batchSize = 50000
972 total, err := acc.ResetThreading(ctx, ctl.log, batchSize, true)
973 ctl.xcheck(err, "resetting threading fields")
974 _, err = fmt.Fprintf(w, "New thread base subject assigned to %d message(s), starting to reassign threads...\n", total)
975 ctl.xcheck(err, "write")
977 // Assign threads again. Ideally we would do this in a single transaction, but
978 // bstore/boltdb cannot handle so many pending changes, so we set a high batchsize.
979 err = acc.AssignThreads(ctx, ctl.log, nil, 0, 50000, w)
980 ctl.xcheck(err, "reassign threads")
982 _, err = fmt.Fprintf(w, "Threads reassigned. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
983 ctl.xcheck(err, "write")
986 if accountOpt != "" {
987 xreassignThreads(accountOpt)
989 for i, accName := range mox.Conf.Accounts() {
994 _, err := fmt.Fprintf(w, "%sReassigning threads for account %s...\n", line, accName)
995 ctl.xcheck(err, "write")
996 xreassignThreads(accName)
1005 log.Info("unrecognized command", slog.String("cmd", cmd))
1006 ctl.xwrite("unrecognized command")