1package main
2
3import (
4 "bufio"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log"
10 "log/slog"
11 "net"
12 "os"
13 "path/filepath"
14 "runtime/debug"
15 "sort"
16 "strconv"
17 "strings"
18 "time"
19
20 "github.com/mjl-/bstore"
21
22 "github.com/mjl-/mox/dns"
23 "github.com/mjl-/mox/message"
24 "github.com/mjl-/mox/metrics"
25 "github.com/mjl-/mox/mlog"
26 "github.com/mjl-/mox/mox-"
27 "github.com/mjl-/mox/queue"
28 "github.com/mjl-/mox/smtp"
29 "github.com/mjl-/mox/store"
30)
31
32// ctl represents a connection to the ctl unix domain socket of a running mox instance.
33// ctl provides functions to read/write commands/responses/data streams.
34type ctl struct {
35 cmd string // Set for server-side of commands.
36 conn net.Conn
37 r *bufio.Reader // Set for first reader.
38 x any // If set, errors are handled by calling panic(x) instead of log.Fatal.
39 log mlog.Log // If set, along with x, logging is done here.
40}
41
42// xctl opens a ctl connection.
43func xctl() *ctl {
44 p := mox.DataDirPath("ctl")
45 conn, err := net.Dial("unix", p)
46 if err != nil {
47 log.Fatalf("connecting to control socket at %q: %v", p, err)
48 }
49 ctl := &ctl{conn: conn}
50 version := ctl.xread()
51 if version != "ctlv0" {
52 log.Fatalf("ctl protocol mismatch, got %q, expected ctlv0", version)
53 }
54 return ctl
55}
56
57// Interpret msg as an error.
58// If ctl.x is set, the string is also written to the ctl to be interpreted as error by the other party.
59func (c *ctl) xerror(msg string) {
60 if c.x == nil {
61 log.Fatalln(msg)
62 }
63 c.log.Debugx("ctl error", fmt.Errorf("%s", msg), slog.String("cmd", c.cmd))
64 c.xwrite(msg)
65 panic(c.x)
66}
67
68// Check if err is not nil. If so, handle error through ctl.x or log.Fatal. If
69// ctl.x is set, the error string is written to ctl, to be interpreted as an error
70// by the command reading from ctl.
71func (c *ctl) xcheck(err error, msg string) {
72 if err == nil {
73 return
74 }
75 if c.x == nil {
76 log.Fatalf("%s: %s", msg, err)
77 }
78 c.log.Debugx(msg, err, slog.String("cmd", c.cmd))
79 fmt.Fprintf(c.conn, "%s: %s\n", msg, err)
80 panic(c.x)
81}
82
83// Read a line and return it without trailing newline.
84func (c *ctl) xread() string {
85 if c.r == nil {
86 c.r = bufio.NewReader(c.conn)
87 }
88 line, err := c.r.ReadString('\n')
89 c.xcheck(err, "read from ctl")
90 return strings.TrimSuffix(line, "\n")
91}
92
93// Read a line. If not "ok", the string is interpreted as an error.
94func (c *ctl) xreadok() {
95 line := c.xread()
96 if line != "ok" {
97 c.xerror(line)
98 }
99}
100
101// Write a string, typically a command or parameter.
102func (c *ctl) xwrite(text string) {
103 _, err := fmt.Fprintln(c.conn, text)
104 c.xcheck(err, "write")
105}
106
107// Write "ok" to indicate success.
108func (c *ctl) xwriteok() {
109 c.xwrite("ok")
110}
111
112// Copy data from a stream from ctl to dst.
113func (c *ctl) xstreamto(dst io.Writer) {
114 _, err := io.Copy(dst, c.reader())
115 c.xcheck(err, "reading message")
116}
117
118// Copy data from src to a stream to ctl.
119func (c *ctl) xstreamfrom(src io.Reader) {
120 w := c.writer()
121 _, err := io.Copy(w, src)
122 c.xcheck(err, "copying")
123 w.xclose()
124}
125
126// Writer returns an io.Writer for a data stream to ctl.
127// When done writing, caller must call xclose to signal the end of the stream.
128// Behaviour of "x" is copied from ctl.
129func (c *ctl) writer() *ctlwriter {
130 return &ctlwriter{cmd: c.cmd, conn: c.conn, x: c.x, log: c.log}
131}
132
133// Reader returns an io.Reader for a data stream from ctl.
134// Behaviour of "x" is copied from ctl.
135func (c *ctl) reader() *ctlreader {
136 if c.r == nil {
137 c.r = bufio.NewReader(c.conn)
138 }
139 return &ctlreader{cmd: c.cmd, conn: c.conn, r: c.r, x: c.x, log: c.log}
140}
141
142/*
143Ctlwriter and ctlreader implement the writing and reading a data stream. They
144implement the io.Writer and io.Reader interface. In the protocol below each
145non-data message ends with a newline that is typically stripped when
146interpreting.
147
148Zero or more data transactions:
149
150 > "123" (for data size) or an error message
151 > data, 123 bytes
152 < "ok" or an error message
153
154Followed by a end of stream indicated by zero data bytes message:
155
156 > "0"
157*/
158
159type ctlwriter struct {
160 cmd string // Set for server-side of commands.
161 conn net.Conn // Ctl socket from which messages are read.
162 buf []byte // Scratch buffer, for reading response.
163 x any // If not nil, errors in Write and xcheckf are handled with panic(x), otherwise with a log.Fatal.
164 log mlog.Log
165}
166
167func (s *ctlwriter) Write(buf []byte) (int, error) {
168 _, err := fmt.Fprintf(s.conn, "%d\n", len(buf))
169 s.xcheck(err, "write count")
170 _, err = s.conn.Write(buf)
171 s.xcheck(err, "write data")
172 if s.buf == nil {
173 s.buf = make([]byte, 512)
174 }
175 n, err := s.conn.Read(s.buf)
176 s.xcheck(err, "reading response to write")
177 line := strings.TrimSuffix(string(s.buf[:n]), "\n")
178 if line != "ok" {
179 s.xerror(line)
180 }
181 return len(buf), nil
182}
183
184func (s *ctlwriter) xerror(msg string) {
185 if s.x == nil {
186 log.Fatalln(msg)
187 } else {
188 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
189 panic(s.x)
190 }
191}
192
193func (s *ctlwriter) xcheck(err error, msg string) {
194 if err == nil {
195 return
196 }
197 if s.x == nil {
198 log.Fatalf("%s: %s", msg, err)
199 } else {
200 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
201 panic(s.x)
202 }
203}
204
205func (s *ctlwriter) xclose() {
206 _, err := fmt.Fprintf(s.conn, "0\n")
207 s.xcheck(err, "write eof")
208}
209
210type ctlreader struct {
211 cmd string // Set for server-side of command.
212 conn net.Conn // For writing "ok" after reading.
213 r *bufio.Reader // Buffered ctl socket.
214 err error // If set, returned for each read. can also be io.EOF.
215 npending int // Number of bytes that can still be read until a new count line must be read.
216 x any // If set, errors are handled with panic(x) instead of log.Fatal.
217 log mlog.Log // If x is set, logging goes to log.
218}
219
220func (s *ctlreader) Read(buf []byte) (N int, Err error) {
221 if s.err != nil {
222 return 0, s.err
223 }
224 if s.npending == 0 {
225 line, err := s.r.ReadString('\n')
226 s.xcheck(err, "reading count")
227 line = strings.TrimSuffix(line, "\n")
228 n, err := strconv.ParseInt(line, 10, 32)
229 if err != nil {
230 s.xerror(line)
231 }
232 if n == 0 {
233 s.err = io.EOF
234 return 0, s.err
235 }
236 s.npending = int(n)
237 }
238 rn := len(buf)
239 if rn > s.npending {
240 rn = s.npending
241 }
242 n, err := s.r.Read(buf[:rn])
243 s.xcheck(err, "read from ctl")
244 s.npending -= n
245 if s.npending == 0 {
246 _, err = fmt.Fprintln(s.conn, "ok")
247 s.xcheck(err, "writing ok after reading")
248 }
249 return n, err
250}
251
252func (s *ctlreader) xerror(msg string) {
253 if s.x == nil {
254 log.Fatalln(msg)
255 } else {
256 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
257 panic(s.x)
258 }
259}
260
261func (s *ctlreader) xcheck(err error, msg string) {
262 if err == nil {
263 return
264 }
265 if s.x == nil {
266 log.Fatalf("%s: %s", msg, err)
267 } else {
268 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
269 panic(s.x)
270 }
271}
272
273// servectl handles requests on the unix domain socket "ctl", e.g. for graceful shutdown, local mail delivery.
274func servectl(ctx context.Context, log mlog.Log, conn net.Conn, shutdown func()) {
275 log.Debug("ctl connection")
276
277 var stop = struct{}{} // Sentinel value for panic and recover.
278 ctl := &ctl{conn: conn, x: stop, log: log}
279 defer func() {
280 x := recover()
281 if x == nil || x == stop {
282 return
283 }
284 log.Error("servectl panic", slog.Any("err", x), slog.String("cmd", ctl.cmd))
285 debug.PrintStack()
286 metrics.PanicInc(metrics.Ctl)
287 }()
288
289 defer conn.Close()
290
291 ctl.xwrite("ctlv0")
292 for {
293 servectlcmd(ctx, ctl, shutdown)
294 }
295}
296
297func servectlcmd(ctx context.Context, ctl *ctl, shutdown func()) {
298 log := ctl.log
299 cmd := ctl.xread()
300 ctl.cmd = cmd
301 log.Info("ctl command", slog.String("cmd", cmd))
302 switch cmd {
303 case "stop":
304 shutdown()
305 os.Exit(0)
306
307 case "deliver":
308 /* The protocol, double quoted are literals.
309
310 > "deliver"
311 > address
312 < "ok"
313 > stream
314 < "ok"
315 */
316
317 to := ctl.xread()
318 a, addr, err := store.OpenEmail(ctl.log, to)
319 ctl.xcheck(err, "lookup destination address")
320
321 msgFile, err := store.CreateMessageTemp(ctl.log, "ctl-deliver")
322 ctl.xcheck(err, "creating temporary message file")
323 defer store.CloseRemoveTempFile(log, msgFile, "deliver message")
324 mw := message.NewWriter(msgFile)
325 ctl.xwriteok()
326
327 ctl.xstreamto(mw)
328 err = msgFile.Sync()
329 ctl.xcheck(err, "syncing message to storage")
330
331 m := &store.Message{
332 Received: time.Now(),
333 Size: mw.Size,
334 }
335
336 a.WithWLock(func() {
337 err := a.DeliverDestination(log, addr, m, msgFile)
338 ctl.xcheck(err, "delivering message")
339 log.Info("message delivered through ctl", slog.Any("to", to))
340 })
341
342 err = a.Close()
343 ctl.xcheck(err, "closing account")
344 ctl.xwriteok()
345
346 case "setaccountpassword":
347 /* protocol:
348 > "setaccountpassword"
349 > account
350 > password
351 < "ok" or error
352 */
353
354 account := ctl.xread()
355 pw := ctl.xread()
356
357 acc, err := store.OpenAccount(ctl.log, account)
358 ctl.xcheck(err, "open account")
359 defer func() {
360 if acc != nil {
361 err := acc.Close()
362 log.Check(err, "closing account after setting password")
363 }
364 }()
365
366 err = acc.SetPassword(ctl.log, pw)
367 ctl.xcheck(err, "setting password")
368 err = acc.Close()
369 ctl.xcheck(err, "closing account")
370 acc = nil
371 ctl.xwriteok()
372
373 case "queue":
374 /* protocol:
375 > "queue"
376 < "ok"
377 < stream
378 */
379 qmsgs, err := queue.List(ctx)
380 ctl.xcheck(err, "listing queue")
381 ctl.xwriteok()
382
383 xw := ctl.writer()
384 fmt.Fprintln(xw, "queue:")
385 for _, qm := range qmsgs {
386 var lastAttempt string
387 if qm.LastAttempt != nil {
388 lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
389 }
390 fmt.Fprintf(xw, "%5d %s from:%s to:%s next %s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), qm.Sender().LogString(), qm.Recipient().LogString(), -time.Since(qm.NextAttempt).Round(time.Second), lastAttempt, qm.LastError)
391 }
392 if len(qmsgs) == 0 {
393 fmt.Fprint(xw, "(empty)\n")
394 }
395 xw.xclose()
396
397 case "queuekick":
398 /* protocol:
399 > "queuekick"
400 > id
401 > todomain
402 > recipient
403 > transport // if empty, transport is left unchanged; in future, we may want to differtiate between "leave unchanged" and "set to empty string".
404 < count
405 < "ok" or error
406 */
407
408 idstr := ctl.xread()
409 todomain := ctl.xread()
410 recipient := ctl.xread()
411 transport := ctl.xread()
412 id, err := strconv.ParseInt(idstr, 10, 64)
413 if err != nil {
414 ctl.xwrite("0")
415 ctl.xcheck(err, "parsing id")
416 }
417
418 var xtransport *string
419 if transport != "" {
420 xtransport = &transport
421 }
422 count, err := queue.Kick(ctx, id, todomain, recipient, xtransport)
423 ctl.xcheck(err, "kicking queue")
424 ctl.xwrite(fmt.Sprintf("%d", count))
425 ctl.xwriteok()
426
427 case "queuedrop":
428 /* protocol:
429 > "queuedrop"
430 > id
431 > todomain
432 > recipient
433 < count
434 < "ok" or error
435 */
436
437 idstr := ctl.xread()
438 todomain := ctl.xread()
439 recipient := ctl.xread()
440 id, err := strconv.ParseInt(idstr, 10, 64)
441 if err != nil {
442 ctl.xwrite("0")
443 ctl.xcheck(err, "parsing id")
444 }
445
446 count, err := queue.Drop(ctx, ctl.log, id, todomain, recipient)
447 ctl.xcheck(err, "dropping messages from queue")
448 ctl.xwrite(fmt.Sprintf("%d", count))
449 ctl.xwriteok()
450
451 case "queuedump":
452 /* protocol:
453 > "queuedump"
454 > id
455 < "ok" or error
456 < stream
457 */
458
459 idstr := ctl.xread()
460 id, err := strconv.ParseInt(idstr, 10, 64)
461 if err != nil {
462 ctl.xcheck(err, "parsing id")
463 }
464 mr, err := queue.OpenMessage(ctx, id)
465 ctl.xcheck(err, "opening message")
466 defer func() {
467 err := mr.Close()
468 log.Check(err, "closing message from queue")
469 }()
470 ctl.xwriteok()
471 ctl.xstreamfrom(mr)
472
473 case "importmaildir", "importmbox":
474 mbox := cmd == "importmbox"
475 importctl(ctx, ctl, mbox)
476
477 case "domainadd":
478 /* protocol:
479 > "domainadd"
480 > domain
481 > account
482 > localpart
483 < "ok" or error
484 */
485 domain := ctl.xread()
486 account := ctl.xread()
487 localpart := ctl.xread()
488 d, err := dns.ParseDomain(domain)
489 ctl.xcheck(err, "parsing domain")
490 err = mox.DomainAdd(ctx, d, account, smtp.Localpart(localpart))
491 ctl.xcheck(err, "adding domain")
492 ctl.xwriteok()
493
494 case "domainrm":
495 /* protocol:
496 > "domainrm"
497 > domain
498 < "ok" or error
499 */
500 domain := ctl.xread()
501 d, err := dns.ParseDomain(domain)
502 ctl.xcheck(err, "parsing domain")
503 err = mox.DomainRemove(ctx, d)
504 ctl.xcheck(err, "removing domain")
505 ctl.xwriteok()
506
507 case "accountadd":
508 /* protocol:
509 > "accountadd"
510 > account
511 > address
512 < "ok" or error
513 */
514 account := ctl.xread()
515 address := ctl.xread()
516 err := mox.AccountAdd(ctx, account, address)
517 ctl.xcheck(err, "adding account")
518 ctl.xwriteok()
519
520 case "accountrm":
521 /* protocol:
522 > "accountrm"
523 > account
524 < "ok" or error
525 */
526 account := ctl.xread()
527 err := mox.AccountRemove(ctx, account)
528 ctl.xcheck(err, "removing account")
529 ctl.xwriteok()
530
531 case "addressadd":
532 /* protocol:
533 > "addressadd"
534 > address
535 > account
536 < "ok" or error
537 */
538 address := ctl.xread()
539 account := ctl.xread()
540 err := mox.AddressAdd(ctx, address, account)
541 ctl.xcheck(err, "adding address")
542 ctl.xwriteok()
543
544 case "addressrm":
545 /* protocol:
546 > "addressrm"
547 > address
548 < "ok" or error
549 */
550 address := ctl.xread()
551 err := mox.AddressRemove(ctx, address)
552 ctl.xcheck(err, "removing address")
553 ctl.xwriteok()
554
555 case "loglevels":
556 /* protocol:
557 > "loglevels"
558 < "ok"
559 < stream
560 */
561 ctl.xwriteok()
562 l := mox.Conf.LogLevels()
563 keys := []string{}
564 for k := range l {
565 keys = append(keys, k)
566 }
567 sort.Slice(keys, func(i, j int) bool {
568 return keys[i] < keys[j]
569 })
570 s := ""
571 for _, k := range keys {
572 ks := k
573 if ks == "" {
574 ks = "(default)"
575 }
576 s += ks + ": " + mlog.LevelStrings[l[k]] + "\n"
577 }
578 ctl.xstreamfrom(strings.NewReader(s))
579
580 case "setloglevels":
581 /* protocol:
582 > "setloglevels"
583 > pkg
584 > level (if empty, log level for pkg will be unset)
585 < "ok" or error
586 */
587 pkg := ctl.xread()
588 levelstr := ctl.xread()
589 if levelstr == "" {
590 mox.Conf.LogLevelRemove(ctl.log, pkg)
591 } else {
592 level, ok := mlog.Levels[levelstr]
593 if !ok {
594 ctl.xerror("bad level")
595 }
596 mox.Conf.LogLevelSet(ctl.log, pkg, level)
597 }
598 ctl.xwriteok()
599
600 case "retrain":
601 /* protocol:
602 > "retrain"
603 > account
604 < "ok" or error
605 */
606 account := ctl.xread()
607 acc, err := store.OpenAccount(ctl.log, account)
608 ctl.xcheck(err, "open account")
609 defer func() {
610 if acc != nil {
611 err := acc.Close()
612 log.Check(err, "closing account after retraining")
613 }
614 }()
615
616 acc.WithWLock(func() {
617 conf, _ := acc.Conf()
618 if conf.JunkFilter == nil {
619 ctl.xcheck(store.ErrNoJunkFilter, "looking for junk filter")
620 }
621
622 // Remove existing junk filter files.
623 basePath := mox.DataDirPath("accounts")
624 dbPath := filepath.Join(basePath, acc.Name, "junkfilter.db")
625 bloomPath := filepath.Join(basePath, acc.Name, "junkfilter.bloom")
626 err := os.Remove(dbPath)
627 log.Check(err, "removing old junkfilter database file", slog.String("path", dbPath))
628 err = os.Remove(bloomPath)
629 log.Check(err, "removing old junkfilter bloom filter file", slog.String("path", bloomPath))
630
631 // Open junk filter, this creates new files.
632 jf, _, err := acc.OpenJunkFilter(ctx, ctl.log)
633 ctl.xcheck(err, "open new junk filter")
634 defer func() {
635 if jf == nil {
636 return
637 }
638 err := jf.Close()
639 log.Check(err, "closing junk filter during cleanup")
640 }()
641
642 // Read through messages with junk or nonjunk flag set, and train them.
643 var total, trained int
644 q := bstore.QueryDB[store.Message](ctx, acc.DB)
645 q.FilterEqual("Expunged", false)
646 err = q.ForEach(func(m store.Message) error {
647 total++
648 ok, err := acc.TrainMessage(ctx, ctl.log, jf, m)
649 if ok {
650 trained++
651 }
652 return err
653 })
654 ctl.xcheck(err, "training messages")
655 ctl.log.Info("retrained messages", slog.Int("total", total), slog.Int("trained", trained))
656
657 // Close junk filter, marking success.
658 err = jf.Close()
659 jf = nil
660 ctl.xcheck(err, "closing junk filter")
661 })
662 ctl.xwriteok()
663
664 case "recalculatemailboxcounts":
665 /* protocol:
666 > "recalculatemailboxcounts"
667 > account
668 < "ok" or error
669 < stream
670 */
671 account := ctl.xread()
672 acc, err := store.OpenAccount(ctl.log, account)
673 ctl.xcheck(err, "open account")
674 defer func() {
675 if acc != nil {
676 err := acc.Close()
677 log.Check(err, "closing account after recalculating mailbox counts")
678 }
679 }()
680 ctl.xwriteok()
681
682 w := ctl.writer()
683
684 acc.WithWLock(func() {
685 var changes []store.Change
686 err = acc.DB.Write(ctx, func(tx *bstore.Tx) error {
687 var totalSize int64
688 err := bstore.QueryTx[store.Mailbox](tx).ForEach(func(mb store.Mailbox) error {
689 mc, err := mb.CalculateCounts(tx)
690 if err != nil {
691 return fmt.Errorf("calculating counts for mailbox %q: %w", mb.Name, err)
692 }
693 totalSize += mc.Size
694
695 if !mb.HaveCounts || mc != mb.MailboxCounts {
696 _, err := fmt.Fprintf(w, "for %s setting new counts %s (was %s)\n", mb.Name, mc, mb.MailboxCounts)
697 ctl.xcheck(err, "write")
698 mb.HaveCounts = true
699 mb.MailboxCounts = mc
700 if err := tx.Update(&mb); err != nil {
701 return fmt.Errorf("storing new counts for %q: %v", mb.Name, err)
702 }
703 changes = append(changes, mb.ChangeCounts())
704 }
705 return nil
706 })
707 if err != nil {
708 return err
709 }
710
711 du := store.DiskUsage{ID: 1}
712 if err := tx.Get(&du); err != nil {
713 return fmt.Errorf("get disk usage: %v", err)
714 }
715 if du.MessageSize != totalSize {
716 _, err := fmt.Fprintf(w, "setting new total message size %d (was %d)\n", totalSize, du.MessageSize)
717 ctl.xcheck(err, "write")
718 du.MessageSize = totalSize
719 if err := tx.Update(&du); err != nil {
720 return fmt.Errorf("update disk usage: %v", err)
721 }
722 }
723 return nil
724 })
725 ctl.xcheck(err, "write transaction for mailbox counts")
726
727 store.BroadcastChanges(acc, changes)
728 })
729 w.xclose()
730
731 case "fixmsgsize":
732 /* protocol:
733 > "fixmsgsize"
734 > account or empty
735 < "ok" or error
736 < stream
737 */
738
739 accountOpt := ctl.xread()
740 ctl.xwriteok()
741 w := ctl.writer()
742
743 var foundProblem bool
744 const batchSize = 10000
745
746 xfixmsgsize := func(accName string) {
747 acc, err := store.OpenAccount(ctl.log, accName)
748 ctl.xcheck(err, "open account")
749 defer func() {
750 err := acc.Close()
751 log.Check(err, "closing account after fixing message sizes")
752 }()
753
754 total := 0
755 var lastID int64
756 for {
757 var n int
758
759 acc.WithRLock(func() {
760 mailboxCounts := map[int64]store.Mailbox{} // For broadcasting.
761
762 // Don't process all message in one transaction, we could block the account for too long.
763 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
764 q := bstore.QueryTx[store.Message](tx)
765 q.FilterEqual("Expunged", false)
766 q.FilterGreater("ID", lastID)
767 q.Limit(batchSize)
768 q.SortAsc("ID")
769 return q.ForEach(func(m store.Message) error {
770 lastID = m.ID
771 n++
772
773 p := acc.MessagePath(m.ID)
774 st, err := os.Stat(p)
775 if err != nil {
776 mb := store.Mailbox{ID: m.MailboxID}
777 if xerr := tx.Get(&mb); xerr != nil {
778 _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file error: %v\n", mb.ID, xerr)
779 ctl.xcheck(werr, "write")
780 }
781 _, werr := fmt.Fprintf(w, "checking file %s for message %d in mailbox %q (id %d): %v (continuing)\n", p, m.ID, mb.Name, mb.ID, err)
782 ctl.xcheck(werr, "write")
783 return nil
784 }
785 filesize := st.Size()
786 correctSize := int64(len(m.MsgPrefix)) + filesize
787 if m.Size == correctSize {
788 return nil
789 }
790
791 foundProblem = true
792
793 mb := store.Mailbox{ID: m.MailboxID}
794 if err := tx.Get(&mb); err != nil {
795 _, werr := fmt.Fprintf(w, "get mailbox id %d for message with file size mismatch: %v\n", mb.ID, err)
796 ctl.xcheck(werr, "write")
797 }
798 _, err = fmt.Fprintf(w, "fixing message %d in mailbox %q (id %d) with incorrect size %d, should be %d (len msg prefix %d + on-disk file %s size %d)\n", m.ID, mb.Name, mb.ID, m.Size, correctSize, len(m.MsgPrefix), p, filesize)
799 ctl.xcheck(err, "write")
800
801 // We assume that the original message size was accounted as stored in the mailbox
802 // total size. If this isn't correct, the user can always run
803 // recalculatemailboxcounts.
804 mb.Size -= m.Size
805 mb.Size += correctSize
806 if err := tx.Update(&mb); err != nil {
807 return fmt.Errorf("update mailbox counts: %v", err)
808 }
809 mailboxCounts[mb.ID] = mb
810
811 m.Size = correctSize
812
813 mr := acc.MessageReader(m)
814 part, err := message.EnsurePart(log.Logger, false, mr, m.Size)
815 if err != nil {
816 _, werr := fmt.Fprintf(w, "parsing message %d again: %v (continuing)\n", m.ID, err)
817 ctl.xcheck(werr, "write")
818 }
819 m.ParsedBuf, err = json.Marshal(part)
820 if err != nil {
821 return fmt.Errorf("marshal parsed message: %v", err)
822 }
823 total++
824 if err := tx.Update(&m); err != nil {
825 return fmt.Errorf("update message: %v", err)
826 }
827 return nil
828 })
829
830 })
831 ctl.xcheck(err, "find and fix wrong message sizes")
832
833 var changes []store.Change
834 for _, mb := range mailboxCounts {
835 changes = append(changes, mb.ChangeCounts())
836 }
837 store.BroadcastChanges(acc, changes)
838 })
839 if n < batchSize {
840 break
841 }
842 }
843 _, err = fmt.Fprintf(w, "%d message size(s) fixed for account %s\n", total, accName)
844 ctl.xcheck(err, "write")
845 }
846
847 if accountOpt != "" {
848 xfixmsgsize(accountOpt)
849 } else {
850 for i, accName := range mox.Conf.Accounts() {
851 var line string
852 if i > 0 {
853 line = "\n"
854 }
855 _, err := fmt.Fprintf(w, "%sFixing message sizes in account %s...\n", line, accName)
856 ctl.xcheck(err, "write")
857 xfixmsgsize(accName)
858 }
859 }
860 if foundProblem {
861 _, err := fmt.Fprintf(w, "\nProblems were found and fixed. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
862 ctl.xcheck(err, "write")
863 }
864
865 w.xclose()
866
867 case "reparse":
868 /* protocol:
869 > "reparse"
870 > account or empty
871 < "ok" or error
872 < stream
873 */
874
875 accountOpt := ctl.xread()
876 ctl.xwriteok()
877 w := ctl.writer()
878
879 const batchSize = 100
880
881 xreparseAccount := func(accName string) {
882 acc, err := store.OpenAccount(ctl.log, accName)
883 ctl.xcheck(err, "open account")
884 defer func() {
885 err := acc.Close()
886 log.Check(err, "closing account after reparsing messages")
887 }()
888
889 total := 0
890 var lastID int64
891 for {
892 var n int
893 // Don't process all message in one transaction, we could block the account for too long.
894 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
895 q := bstore.QueryTx[store.Message](tx)
896 q.FilterEqual("Expunged", false)
897 q.FilterGreater("ID", lastID)
898 q.Limit(batchSize)
899 q.SortAsc("ID")
900 return q.ForEach(func(m store.Message) error {
901 lastID = m.ID
902 mr := acc.MessageReader(m)
903 p, err := message.EnsurePart(log.Logger, false, mr, m.Size)
904 if err != nil {
905 _, err := fmt.Fprintf(w, "parsing message %d: %v (continuing)\n", m.ID, err)
906 ctl.xcheck(err, "write")
907 }
908 m.ParsedBuf, err = json.Marshal(p)
909 if err != nil {
910 return fmt.Errorf("marshal parsed message: %v", err)
911 }
912 total++
913 n++
914 if err := tx.Update(&m); err != nil {
915 return fmt.Errorf("update message: %v", err)
916 }
917 return nil
918 })
919
920 })
921 ctl.xcheck(err, "update messages with parsed mime structure")
922 if n < batchSize {
923 break
924 }
925 }
926 _, err = fmt.Fprintf(w, "%d message(s) reparsed for account %s\n", total, accName)
927 ctl.xcheck(err, "write")
928 }
929
930 if accountOpt != "" {
931 xreparseAccount(accountOpt)
932 } else {
933 for i, accName := range mox.Conf.Accounts() {
934 var line string
935 if i > 0 {
936 line = "\n"
937 }
938 _, err := fmt.Fprintf(w, "%sReparsing account %s...\n", line, accName)
939 ctl.xcheck(err, "write")
940 xreparseAccount(accName)
941 }
942 }
943 w.xclose()
944
945 case "reassignthreads":
946 /* protocol:
947 > "reassignthreads"
948 > account or empty
949 < "ok" or error
950 < stream
951 */
952
953 accountOpt := ctl.xread()
954 ctl.xwriteok()
955 w := ctl.writer()
956
957 xreassignThreads := func(accName string) {
958 acc, err := store.OpenAccount(ctl.log, accName)
959 ctl.xcheck(err, "open account")
960 defer func() {
961 err := acc.Close()
962 log.Check(err, "closing account after reassigning threads")
963 }()
964
965 // We don't want to step on an existing upgrade process.
966 err = acc.ThreadingWait(ctl.log)
967 ctl.xcheck(err, "waiting for threading upgrade to finish")
968 // todo: should we try to continue if the threading upgrade failed? only if there is a chance it will succeed this time...
969
970 // todo: reassigning isn't atomic (in a single transaction), ideally it would be (bstore would need to be able to handle large updates).
971 const batchSize = 50000
972 total, err := acc.ResetThreading(ctx, ctl.log, batchSize, true)
973 ctl.xcheck(err, "resetting threading fields")
974 _, err = fmt.Fprintf(w, "New thread base subject assigned to %d message(s), starting to reassign threads...\n", total)
975 ctl.xcheck(err, "write")
976
977 // Assign threads again. Ideally we would do this in a single transaction, but
978 // bstore/boltdb cannot handle so many pending changes, so we set a high batchsize.
979 err = acc.AssignThreads(ctx, ctl.log, nil, 0, 50000, w)
980 ctl.xcheck(err, "reassign threads")
981
982 _, err = fmt.Fprintf(w, "Threads reassigned. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
983 ctl.xcheck(err, "write")
984 }
985
986 if accountOpt != "" {
987 xreassignThreads(accountOpt)
988 } else {
989 for i, accName := range mox.Conf.Accounts() {
990 var line string
991 if i > 0 {
992 line = "\n"
993 }
994 _, err := fmt.Fprintf(w, "%sReassigning threads for account %s...\n", line, accName)
995 ctl.xcheck(err, "write")
996 xreassignThreads(accName)
997 }
998 }
999 w.xclose()
1000
1001 case "backup":
1002 backupctl(ctx, ctl)
1003
1004 default:
1005 log.Info("unrecognized command", slog.String("cmd", cmd))
1006 ctl.xwrite("unrecognized command")
1007 return
1008 }
1009}
1010