1package main
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "encoding/json"
8 "errors"
9 "fmt"
10 "io"
11 "log"
12 "log/slog"
13 "maps"
14 "net"
15 "os"
16 "path/filepath"
17 "runtime/debug"
18 "slices"
19 "strconv"
20 "strings"
21 "time"
22
23 "github.com/mjl-/bstore"
24
25 "github.com/mjl-/mox/admin"
26 "github.com/mjl-/mox/config"
27 "github.com/mjl-/mox/dns"
28 "github.com/mjl-/mox/imapserver"
29 "github.com/mjl-/mox/message"
30 "github.com/mjl-/mox/metrics"
31 "github.com/mjl-/mox/mlog"
32 "github.com/mjl-/mox/mox-"
33 "github.com/mjl-/mox/queue"
34 "github.com/mjl-/mox/smtp"
35 "github.com/mjl-/mox/store"
36 "github.com/mjl-/mox/webapi"
37)
38
39// ctl represents a connection to the ctl unix domain socket of a running mox instance.
40// ctl provides functions to read/write commands/responses/data streams.
41type ctl struct {
42 cmd string // Set for server-side of commands.
43 conn net.Conn
44 r *bufio.Reader // Set for first reader.
45 x any // If set, errors are handled by calling panic(x) instead of log.Fatal.
46 log mlog.Log // If set, along with x, logging is done here.
47}
48
49// xctl opens a ctl connection.
50func xctl() *ctl {
51 p := mox.DataDirPath("ctl")
52 conn, err := net.Dial("unix", p)
53 if err != nil {
54 log.Fatalf("connecting to control socket at %q: %v", p, err)
55 }
56 ctl := &ctl{conn: conn}
57 version := ctl.xread()
58 if version != "ctlv0" {
59 log.Fatalf("ctl protocol mismatch, got %q, expected ctlv0", version)
60 }
61 return ctl
62}
63
64// Interpret msg as an error.
65// If ctl.x is set, the string is also written to the ctl to be interpreted as error by the other party.
66func (c *ctl) xerror(msg string) {
67 if c.x == nil {
68 log.Fatalln(msg)
69 }
70 c.log.Debugx("ctl error", fmt.Errorf("%s", msg), slog.String("cmd", c.cmd))
71 c.xwrite(msg)
72 panic(c.x)
73}
74
75// Check if err is not nil. If so, handle error through ctl.x or log.Fatal. If
76// ctl.x is set, the error string is written to ctl, to be interpreted as an error
77// by the command reading from ctl.
78func (c *ctl) xcheck(err error, msg string) {
79 if err == nil {
80 return
81 }
82 if c.x == nil {
83 log.Fatalf("%s: %s", msg, err)
84 }
85 c.log.Debugx(msg, err, slog.String("cmd", c.cmd))
86 fmt.Fprintf(c.conn, "%s: %s\n", msg, err)
87 panic(c.x)
88}
89
90// Read a line and return it without trailing newline.
91func (c *ctl) xread() string {
92 if c.r == nil {
93 c.r = bufio.NewReader(c.conn)
94 }
95 line, err := c.r.ReadString('\n')
96 c.xcheck(err, "read from ctl")
97 return strings.TrimSuffix(line, "\n")
98}
99
100// Read a line. If not "ok", the string is interpreted as an error.
101func (c *ctl) xreadok() {
102 line := c.xread()
103 if line != "ok" {
104 c.xerror(line)
105 }
106}
107
108// Write a string, typically a command or parameter.
109func (c *ctl) xwrite(text string) {
110 _, err := fmt.Fprintln(c.conn, text)
111 c.xcheck(err, "write")
112}
113
114// Write "ok" to indicate success.
115func (c *ctl) xwriteok() {
116 c.xwrite("ok")
117}
118
119// Copy data from a stream from ctl to dst.
120func (c *ctl) xstreamto(dst io.Writer) {
121 _, err := io.Copy(dst, c.reader())
122 c.xcheck(err, "reading message")
123}
124
125// Copy data from src to a stream to ctl.
126func (c *ctl) xstreamfrom(src io.Reader) {
127 xw := c.writer()
128 _, err := io.Copy(xw, src)
129 c.xcheck(err, "copying")
130 xw.xclose()
131}
132
133// Writer returns an io.Writer for a data stream to ctl.
134// When done writing, caller must call xclose to signal the end of the stream.
135// Behaviour of "x" is copied from ctl.
136func (c *ctl) writer() *ctlwriter {
137 return &ctlwriter{cmd: c.cmd, conn: c.conn, x: c.x, log: c.log}
138}
139
140// Reader returns an io.Reader for a data stream from ctl.
141// Behaviour of "x" is copied from ctl.
142func (c *ctl) reader() *ctlreader {
143 if c.r == nil {
144 c.r = bufio.NewReader(c.conn)
145 }
146 return &ctlreader{cmd: c.cmd, conn: c.conn, r: c.r, x: c.x, log: c.log}
147}
148
149/*
150Ctlwriter and ctlreader implement the writing and reading a data stream. They
151implement the io.Writer and io.Reader interface. In the protocol below each
152non-data message ends with a newline that is typically stripped when
153interpreting.
154
155Zero or more data transactions:
156
157 > "123" (for data size) or an error message
158 > data, 123 bytes
159 < "ok" or an error message
160
161Followed by a end of stream indicated by zero data bytes message:
162
163 > "0"
164*/
165
166type ctlwriter struct {
167 cmd string // Set for server-side of commands.
168 conn net.Conn // Ctl socket from which messages are read.
169 buf []byte // Scratch buffer, for reading response.
170 x any // If not nil, errors in Write and xcheckf are handled with panic(x), otherwise with a log.Fatal.
171 log mlog.Log
172}
173
174// Write implements io.Writer. Errors other than EOF are handled through behaviour
175// for s.x, either a panic or log.Fatal.
176func (s *ctlwriter) Write(buf []byte) (int, error) {
177 _, err := fmt.Fprintf(s.conn, "%d\n", len(buf))
178 s.xcheck(err, "write count")
179 _, err = s.conn.Write(buf)
180 s.xcheck(err, "write data")
181 if s.buf == nil {
182 s.buf = make([]byte, 512)
183 }
184 n, err := s.conn.Read(s.buf)
185 s.xcheck(err, "reading response to write")
186 line := strings.TrimSuffix(string(s.buf[:n]), "\n")
187 if line != "ok" {
188 s.xerror(line)
189 }
190 return len(buf), nil
191}
192
193func (s *ctlwriter) xerror(msg string) {
194 if s.x == nil {
195 log.Fatalln(msg)
196 } else {
197 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
198 panic(s.x)
199 }
200}
201
202func (s *ctlwriter) xcheck(err error, msg string) {
203 if err == nil {
204 return
205 }
206 if s.x == nil {
207 log.Fatalf("%s: %s", msg, err)
208 } else {
209 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
210 panic(s.x)
211 }
212}
213
214func (s *ctlwriter) xclose() {
215 _, err := fmt.Fprintf(s.conn, "0\n")
216 s.xcheck(err, "write eof")
217}
218
219type ctlreader struct {
220 cmd string // Set for server-side of command.
221 conn net.Conn // For writing "ok" after reading.
222 r *bufio.Reader // Buffered ctl socket.
223 err error // If set, returned for each read. can also be io.EOF.
224 npending int // Number of bytes that can still be read until a new count line must be read.
225 x any // If set, errors are handled with panic(x) instead of log.Fatal.
226 log mlog.Log // If x is set, logging goes to log.
227}
228
229// Read implements io.Reader. Errors other than EOF are handled through behaviour
230// for s.x, either a panic or log.Fatal.
231func (s *ctlreader) Read(buf []byte) (N int, Err error) {
232 if s.err != nil {
233 return 0, s.err
234 }
235 if s.npending == 0 {
236 line, err := s.r.ReadString('\n')
237 s.xcheck(err, "reading count")
238 line = strings.TrimSuffix(line, "\n")
239 n, err := strconv.ParseInt(line, 10, 32)
240 if err != nil {
241 s.xerror(line)
242 }
243 if n == 0 {
244 s.err = io.EOF
245 return 0, s.err
246 }
247 s.npending = int(n)
248 }
249 rn := min(len(buf), s.npending)
250 n, err := s.r.Read(buf[:rn])
251 s.xcheck(err, "read from ctl")
252 s.npending -= n
253 if s.npending == 0 {
254 _, err = fmt.Fprintln(s.conn, "ok")
255 s.xcheck(err, "writing ok after reading")
256 }
257 return n, err
258}
259
260func (s *ctlreader) xerror(msg string) {
261 if s.x == nil {
262 log.Fatalln(msg)
263 } else {
264 s.log.Debugx("error", fmt.Errorf("%s", msg), slog.String("cmd", s.cmd))
265 panic(s.x)
266 }
267}
268
269func (s *ctlreader) xcheck(err error, msg string) {
270 if err == nil {
271 return
272 }
273 if s.x == nil {
274 log.Fatalf("%s: %s", msg, err)
275 } else {
276 s.log.Debugx(msg, err, slog.String("cmd", s.cmd))
277 panic(s.x)
278 }
279}
280
281// servectl handles requests on the unix domain socket "ctl", e.g. for graceful shutdown, local mail delivery.
282func servectl(ctx context.Context, cid int64, log mlog.Log, conn net.Conn, shutdown func()) {
283 log.Debug("ctl connection")
284
285 var stop = struct{}{} // Sentinel value for panic and recover.
286 xctl := &ctl{conn: conn, x: stop, log: log}
287 defer func() {
288 x := recover()
289 if x == nil || x == stop {
290 return
291 }
292 log.Error("servectl panic", slog.Any("err", x), slog.String("cmd", xctl.cmd))
293 debug.PrintStack()
294 metrics.PanicInc(metrics.Ctl)
295 }()
296
297 defer func() {
298 err := conn.Close()
299 log.Check(err, "close ctl connection")
300 }()
301
302 xctl.xwrite("ctlv0")
303 for {
304 servectlcmd(ctx, xctl, cid, shutdown)
305 }
306}
307
308func xparseJSON(xctl *ctl, s string, v any) {
309 dec := json.NewDecoder(strings.NewReader(s))
310 dec.DisallowUnknownFields()
311 err := dec.Decode(v)
312 xctl.xcheck(err, "parsing from ctl as json")
313}
314
315func servectlcmd(ctx context.Context, xctl *ctl, cid int64, shutdown func()) {
316 log := xctl.log
317 cmd := xctl.xread()
318 xctl.cmd = cmd
319 log.Info("ctl command", slog.String("cmd", cmd))
320 switch cmd {
321 case "stop":
322 shutdown()
323 os.Exit(0)
324
325 case "deliver":
326 /* The protocol, double quoted are literals.
327
328 > "deliver"
329 > address
330 < "ok"
331 > stream
332 < "ok"
333 */
334
335 to := xctl.xread()
336 a, _, addr, err := store.OpenEmail(log, to, false)
337 xctl.xcheck(err, "lookup destination address")
338
339 msgFile, err := store.CreateMessageTemp(log, "ctl-deliver")
340 xctl.xcheck(err, "creating temporary message file")
341 defer store.CloseRemoveTempFile(log, msgFile, "deliver message")
342 mw := message.NewWriter(msgFile)
343 xctl.xwriteok()
344
345 xctl.xstreamto(mw)
346 err = msgFile.Sync()
347 xctl.xcheck(err, "syncing message to storage")
348
349 m := store.Message{
350 Received: time.Now(),
351 Size: mw.Size,
352 }
353
354 a.WithWLock(func() {
355 err := a.DeliverDestination(log, addr, &m, msgFile)
356 xctl.xcheck(err, "delivering message")
357 log.Info("message delivered through ctl", slog.Any("to", to))
358 })
359
360 err = a.Close()
361 xctl.xcheck(err, "closing account")
362 xctl.xwriteok()
363
364 case "setaccountpassword":
365 /* protocol:
366 > "setaccountpassword"
367 > account
368 > password
369 < "ok" or error
370 */
371
372 account := xctl.xread()
373 pw := xctl.xread()
374
375 acc, err := store.OpenAccount(log, account, false)
376 xctl.xcheck(err, "open account")
377 defer func() {
378 if acc != nil {
379 err := acc.Close()
380 log.Check(err, "closing account after setting password")
381 }
382 }()
383
384 err = acc.SetPassword(log, pw)
385 xctl.xcheck(err, "setting password")
386 err = acc.Close()
387 xctl.xcheck(err, "closing account")
388 acc = nil
389 xctl.xwriteok()
390
391 case "queueholdruleslist":
392 /* protocol:
393 > "queueholdruleslist"
394 < "ok"
395 < stream
396 */
397 l, err := queue.HoldRuleList(ctx)
398 xctl.xcheck(err, "listing hold rules")
399 xctl.xwriteok()
400 xw := xctl.writer()
401 fmt.Fprintln(xw, "hold rules:")
402 for _, hr := range l {
403 var elems []string
404 if hr.Account != "" {
405 elems = append(elems, fmt.Sprintf("account %q", hr.Account))
406 }
407 var zerodom dns.Domain
408 if hr.SenderDomain != zerodom {
409 elems = append(elems, fmt.Sprintf("sender domain %q", hr.SenderDomain.Name()))
410 }
411 if hr.RecipientDomain != zerodom {
412 elems = append(elems, fmt.Sprintf("sender domain %q", hr.RecipientDomain.Name()))
413 }
414 if len(elems) == 0 {
415 fmt.Fprintf(xw, "id %d: all messages\n", hr.ID)
416 } else {
417 fmt.Fprintf(xw, "id %d: %s\n", hr.ID, strings.Join(elems, ", "))
418 }
419 }
420 if len(l) == 0 {
421 fmt.Fprint(xw, "(none)\n")
422 }
423 xw.xclose()
424
425 case "queueholdrulesadd":
426 /* protocol:
427 > "queueholdrulesadd"
428 > account
429 > senderdomainstr
430 > recipientdomainstr
431 < "ok" or error
432 */
433 var hr queue.HoldRule
434 hr.Account = xctl.xread()
435 senderdomstr := xctl.xread()
436 rcptdomstr := xctl.xread()
437 var err error
438 hr.SenderDomain, err = dns.ParseDomain(senderdomstr)
439 xctl.xcheck(err, "parsing sender domain")
440 hr.RecipientDomain, err = dns.ParseDomain(rcptdomstr)
441 xctl.xcheck(err, "parsing recipient domain")
442 hr, err = queue.HoldRuleAdd(ctx, log, hr)
443 xctl.xcheck(err, "add hold rule")
444 xctl.xwriteok()
445
446 case "queueholdrulesremove":
447 /* protocol:
448 > "queueholdrulesremove"
449 > id
450 < "ok" or error
451 */
452 idstr := xctl.xread()
453 id, err := strconv.ParseInt(idstr, 10, 64)
454 xctl.xcheck(err, "parsing id")
455 err = queue.HoldRuleRemove(ctx, log, id)
456 xctl.xcheck(err, "remove hold rule")
457 xctl.xwriteok()
458
459 case "queuelist":
460 /* protocol:
461 > "queuelist"
462 > filters as json
463 > sort as json
464 < "ok"
465 < stream
466 */
467 filterline := xctl.xread()
468 sortline := xctl.xread()
469 var f queue.Filter
470 xparseJSON(xctl, filterline, &f)
471 var s queue.Sort
472 xparseJSON(xctl, sortline, &s)
473 qmsgs, err := queue.List(ctx, f, s)
474 xctl.xcheck(err, "listing queue")
475 xctl.xwriteok()
476
477 xw := xctl.writer()
478 fmt.Fprintln(xw, "messages:")
479 for _, qm := range qmsgs {
480 var lastAttempt string
481 if qm.LastAttempt != nil {
482 lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
483 }
484 fmt.Fprintf(xw, "%5d %s from:%s to:%s next %s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), qm.Sender().LogString(), qm.Recipient().LogString(), -time.Since(qm.NextAttempt).Round(time.Second), lastAttempt, qm.LastResult().Error)
485 }
486 if len(qmsgs) == 0 {
487 fmt.Fprint(xw, "(none)\n")
488 }
489 xw.xclose()
490
491 case "queueholdset":
492 /* protocol:
493 > "queueholdset"
494 > queuefilters as json
495 > "true" or "false"
496 < "ok" or error
497 < count
498 */
499
500 filterline := xctl.xread()
501 hold := xctl.xread() == "true"
502 var f queue.Filter
503 xparseJSON(xctl, filterline, &f)
504 count, err := queue.HoldSet(ctx, f, hold)
505 xctl.xcheck(err, "setting on hold status for messages")
506 xctl.xwriteok()
507 xctl.xwrite(fmt.Sprintf("%d", count))
508
509 case "queueschedule":
510 /* protocol:
511 > "queueschedule"
512 > queuefilters as json
513 > relative to now
514 > duration
515 < "ok" or error
516 < count
517 */
518
519 filterline := xctl.xread()
520 relnow := xctl.xread()
521 duration := xctl.xread()
522 var f queue.Filter
523 xparseJSON(xctl, filterline, &f)
524 d, err := time.ParseDuration(duration)
525 xctl.xcheck(err, "parsing duration for next delivery attempt")
526 var count int
527 if relnow == "" {
528 count, err = queue.NextAttemptAdd(ctx, f, d)
529 } else {
530 count, err = queue.NextAttemptSet(ctx, f, time.Now().Add(d))
531 }
532 xctl.xcheck(err, "setting next delivery attempts in queue")
533 xctl.xwriteok()
534 xctl.xwrite(fmt.Sprintf("%d", count))
535
536 case "queuetransport":
537 /* protocol:
538 > "queuetransport"
539 > queuefilters as json
540 > transport
541 < "ok" or error
542 < count
543 */
544
545 filterline := xctl.xread()
546 transport := xctl.xread()
547 var f queue.Filter
548 xparseJSON(xctl, filterline, &f)
549 count, err := queue.TransportSet(ctx, f, transport)
550 xctl.xcheck(err, "adding to next delivery attempts in queue")
551 xctl.xwriteok()
552 xctl.xwrite(fmt.Sprintf("%d", count))
553
554 case "queuerequiretls":
555 /* protocol:
556 > "queuerequiretls"
557 > queuefilters as json
558 > reqtls (empty string, "true" or "false")
559 < "ok" or error
560 < count
561 */
562
563 filterline := xctl.xread()
564 reqtls := xctl.xread()
565 var req *bool
566 switch reqtls {
567 case "":
568 case "true":
569 v := true
570 req = &v
571 case "false":
572 v := false
573 req = &v
574 default:
575 xctl.xcheck(fmt.Errorf("unknown value %q", reqtls), "parsing value")
576 }
577 var f queue.Filter
578 xparseJSON(xctl, filterline, &f)
579 count, err := queue.RequireTLSSet(ctx, f, req)
580 xctl.xcheck(err, "setting tls requirements on messages in queue")
581 xctl.xwriteok()
582 xctl.xwrite(fmt.Sprintf("%d", count))
583
584 case "queuefail":
585 /* protocol:
586 > "queuefail"
587 > queuefilters as json
588 < "ok" or error
589 < count
590 */
591
592 filterline := xctl.xread()
593 var f queue.Filter
594 xparseJSON(xctl, filterline, &f)
595 count, err := queue.Fail(ctx, log, f)
596 xctl.xcheck(err, "marking messages from queue as failed")
597 xctl.xwriteok()
598 xctl.xwrite(fmt.Sprintf("%d", count))
599
600 case "queuedrop":
601 /* protocol:
602 > "queuedrop"
603 > queuefilters as json
604 < "ok" or error
605 < count
606 */
607
608 filterline := xctl.xread()
609 var f queue.Filter
610 xparseJSON(xctl, filterline, &f)
611 count, err := queue.Drop(ctx, log, f)
612 xctl.xcheck(err, "dropping messages from queue")
613 xctl.xwriteok()
614 xctl.xwrite(fmt.Sprintf("%d", count))
615
616 case "queuedump":
617 /* protocol:
618 > "queuedump"
619 > id
620 < "ok" or error
621 < stream
622 */
623
624 idstr := xctl.xread()
625 id, err := strconv.ParseInt(idstr, 10, 64)
626 if err != nil {
627 xctl.xcheck(err, "parsing id")
628 }
629 mr, err := queue.OpenMessage(ctx, id)
630 xctl.xcheck(err, "opening message")
631 defer func() {
632 err := mr.Close()
633 log.Check(err, "closing message from queue")
634 }()
635 xctl.xwriteok()
636 xctl.xstreamfrom(mr)
637
638 case "queueretiredlist":
639 /* protocol:
640 > "queueretiredlist"
641 > filters as json
642 > sort as json
643 < "ok"
644 < stream
645 */
646 filterline := xctl.xread()
647 sortline := xctl.xread()
648 var f queue.RetiredFilter
649 xparseJSON(xctl, filterline, &f)
650 var s queue.RetiredSort
651 xparseJSON(xctl, sortline, &s)
652 qmsgs, err := queue.RetiredList(ctx, f, s)
653 xctl.xcheck(err, "listing retired queue")
654 xctl.xwriteok()
655
656 xw := xctl.writer()
657 fmt.Fprintln(xw, "retired messages:")
658 for _, qm := range qmsgs {
659 var lastAttempt string
660 if qm.LastAttempt != nil {
661 lastAttempt = time.Since(*qm.LastAttempt).Round(time.Second).String()
662 }
663 result := "failure"
664 if qm.Success {
665 result = "success"
666 }
667 sender, err := qm.Sender()
668 xcheckf(err, "parsing sender")
669 fmt.Fprintf(xw, "%5d %s %s from:%s to:%s last %s error %q\n", qm.ID, qm.Queued.Format(time.RFC3339), result, sender.LogString(), qm.Recipient().LogString(), lastAttempt, qm.LastResult().Error)
670 }
671 if len(qmsgs) == 0 {
672 fmt.Fprint(xw, "(none)\n")
673 }
674 xw.xclose()
675
676 case "queueretiredprint":
677 /* protocol:
678 > "queueretiredprint"
679 > id
680 < "ok"
681 < stream
682 */
683 idstr := xctl.xread()
684 id, err := strconv.ParseInt(idstr, 10, 64)
685 if err != nil {
686 xctl.xcheck(err, "parsing id")
687 }
688 l, err := queue.RetiredList(ctx, queue.RetiredFilter{IDs: []int64{id}}, queue.RetiredSort{})
689 xctl.xcheck(err, "getting retired messages")
690 if len(l) == 0 {
691 xctl.xcheck(errors.New("not found"), "getting retired message")
692 }
693 m := l[0]
694 xctl.xwriteok()
695 xw := xctl.writer()
696 enc := json.NewEncoder(xw)
697 enc.SetIndent("", "\t")
698 err = enc.Encode(m)
699 xctl.xcheck(err, "encode retired message")
700 xw.xclose()
701
702 case "queuehooklist":
703 /* protocol:
704 > "queuehooklist"
705 > filters as json
706 > sort as json
707 < "ok"
708 < stream
709 */
710 filterline := xctl.xread()
711 sortline := xctl.xread()
712 var f queue.HookFilter
713 xparseJSON(xctl, filterline, &f)
714 var s queue.HookSort
715 xparseJSON(xctl, sortline, &s)
716 hooks, err := queue.HookList(ctx, f, s)
717 xctl.xcheck(err, "listing webhooks")
718 xctl.xwriteok()
719
720 xw := xctl.writer()
721 fmt.Fprintln(xw, "webhooks:")
722 for _, h := range hooks {
723 var lastAttempt string
724 if len(h.Results) > 0 {
725 lastAttempt = time.Since(h.LastResult().Start).Round(time.Second).String()
726 }
727 fmt.Fprintf(xw, "%5d %s account:%s next %s last %s error %q url %s\n", h.ID, h.Submitted.Format(time.RFC3339), h.Account, time.Until(h.NextAttempt).Round(time.Second), lastAttempt, h.LastResult().Error, h.URL)
728 }
729 if len(hooks) == 0 {
730 fmt.Fprint(xw, "(none)\n")
731 }
732 xw.xclose()
733
734 case "queuehookschedule":
735 /* protocol:
736 > "queuehookschedule"
737 > hookfilters as json
738 > relative to now
739 > duration
740 < "ok" or error
741 < count
742 */
743
744 filterline := xctl.xread()
745 relnow := xctl.xread()
746 duration := xctl.xread()
747 var f queue.HookFilter
748 xparseJSON(xctl, filterline, &f)
749 d, err := time.ParseDuration(duration)
750 xctl.xcheck(err, "parsing duration for next delivery attempt")
751 var count int
752 if relnow == "" {
753 count, err = queue.HookNextAttemptAdd(ctx, f, d)
754 } else {
755 count, err = queue.HookNextAttemptSet(ctx, f, time.Now().Add(d))
756 }
757 xctl.xcheck(err, "setting next delivery attempts in queue")
758 xctl.xwriteok()
759 xctl.xwrite(fmt.Sprintf("%d", count))
760
761 case "queuehookcancel":
762 /* protocol:
763 > "queuehookcancel"
764 > hookfilters as json
765 < "ok" or error
766 < count
767 */
768
769 filterline := xctl.xread()
770 var f queue.HookFilter
771 xparseJSON(xctl, filterline, &f)
772 count, err := queue.HookCancel(ctx, log, f)
773 xctl.xcheck(err, "canceling webhooks in queue")
774 xctl.xwriteok()
775 xctl.xwrite(fmt.Sprintf("%d", count))
776
777 case "queuehookprint":
778 /* protocol:
779 > "queuehookprint"
780 > id
781 < "ok"
782 < stream
783 */
784 idstr := xctl.xread()
785 id, err := strconv.ParseInt(idstr, 10, 64)
786 if err != nil {
787 xctl.xcheck(err, "parsing id")
788 }
789 l, err := queue.HookList(ctx, queue.HookFilter{IDs: []int64{id}}, queue.HookSort{})
790 xctl.xcheck(err, "getting webhooks")
791 if len(l) == 0 {
792 xctl.xcheck(errors.New("not found"), "getting webhook")
793 }
794 h := l[0]
795 xctl.xwriteok()
796 xw := xctl.writer()
797 enc := json.NewEncoder(xw)
798 enc.SetIndent("", "\t")
799 err = enc.Encode(h)
800 xctl.xcheck(err, "encode webhook")
801 xw.xclose()
802
803 case "queuehookretiredlist":
804 /* protocol:
805 > "queuehookretiredlist"
806 > filters as json
807 > sort as json
808 < "ok"
809 < stream
810 */
811 filterline := xctl.xread()
812 sortline := xctl.xread()
813 var f queue.HookRetiredFilter
814 xparseJSON(xctl, filterline, &f)
815 var s queue.HookRetiredSort
816 xparseJSON(xctl, sortline, &s)
817 l, err := queue.HookRetiredList(ctx, f, s)
818 xctl.xcheck(err, "listing retired webhooks")
819 xctl.xwriteok()
820
821 xw := xctl.writer()
822 fmt.Fprintln(xw, "retired webhooks:")
823 for _, h := range l {
824 var lastAttempt string
825 if len(h.Results) > 0 {
826 lastAttempt = time.Since(h.LastResult().Start).Round(time.Second).String()
827 }
828 result := "success"
829 if !h.Success {
830 result = "failure"
831 }
832 fmt.Fprintf(xw, "%5d %s %s account:%s last %s error %q url %s\n", h.ID, h.Submitted.Format(time.RFC3339), result, h.Account, lastAttempt, h.LastResult().Error, h.URL)
833 }
834 if len(l) == 0 {
835 fmt.Fprint(xw, "(none)\n")
836 }
837 xw.xclose()
838
839 case "queuehookretiredprint":
840 /* protocol:
841 > "queuehookretiredprint"
842 > id
843 < "ok"
844 < stream
845 */
846 idstr := xctl.xread()
847 id, err := strconv.ParseInt(idstr, 10, 64)
848 if err != nil {
849 xctl.xcheck(err, "parsing id")
850 }
851 l, err := queue.HookRetiredList(ctx, queue.HookRetiredFilter{IDs: []int64{id}}, queue.HookRetiredSort{})
852 xctl.xcheck(err, "getting retired webhooks")
853 if len(l) == 0 {
854 xctl.xcheck(errors.New("not found"), "getting retired webhook")
855 }
856 h := l[0]
857 xctl.xwriteok()
858 xw := xctl.writer()
859 enc := json.NewEncoder(xw)
860 enc.SetIndent("", "\t")
861 err = enc.Encode(h)
862 xctl.xcheck(err, "encode retired webhook")
863 xw.xclose()
864
865 case "queuesuppresslist":
866 /* protocol:
867 > "queuesuppresslist"
868 > account (or empty)
869 < "ok" or error
870 < stream
871 */
872
873 account := xctl.xread()
874 l, err := queue.SuppressionList(ctx, account)
875 xctl.xcheck(err, "listing suppressions")
876 xctl.xwriteok()
877 xw := xctl.writer()
878 fmt.Fprintln(xw, "suppressions (account, address, manual, time added, base adddress, reason):")
879 for _, sup := range l {
880 manual := "No"
881 if sup.Manual {
882 manual = "Yes"
883 }
884 fmt.Fprintf(xw, "%q\t%q\t%s\t%s\t%q\t%q\n", sup.Account, sup.OriginalAddress, manual, sup.Created.Round(time.Second), sup.BaseAddress, sup.Reason)
885 }
886 if len(l) == 0 {
887 fmt.Fprintln(xw, "(none)")
888 }
889 xw.xclose()
890
891 case "queuesuppressadd":
892 /* protocol:
893 > "queuesuppressadd"
894 > account
895 > address
896 < "ok" or error
897 */
898
899 account := xctl.xread()
900 address := xctl.xread()
901 _, ok := mox.Conf.Account(account)
902 if !ok {
903 xctl.xcheck(errors.New("unknown account"), "looking up account")
904 }
905 addr, err := smtp.ParseAddress(address)
906 xctl.xcheck(err, "parsing address")
907 sup := webapi.Suppression{
908 Account: account,
909 Manual: true,
910 Reason: "added through mox cli",
911 }
912 err = queue.SuppressionAdd(ctx, addr.Path(), &sup)
913 xctl.xcheck(err, "adding suppression")
914 xctl.xwriteok()
915
916 case "queuesuppressremove":
917 /* protocol:
918 > "queuesuppressremove"
919 > account
920 > address
921 < "ok" or error
922 */
923
924 account := xctl.xread()
925 address := xctl.xread()
926 addr, err := smtp.ParseAddress(address)
927 xctl.xcheck(err, "parsing address")
928 err = queue.SuppressionRemove(ctx, account, addr.Path())
929 xctl.xcheck(err, "removing suppression")
930 xctl.xwriteok()
931
932 case "queuesuppresslookup":
933 /* protocol:
934 > "queuesuppresslookup"
935 > account or empty
936 > address
937 < "ok" or error
938 < stream
939 */
940
941 account := xctl.xread()
942 address := xctl.xread()
943 if account != "" {
944 _, ok := mox.Conf.Account(account)
945 if !ok {
946 xctl.xcheck(errors.New("unknown account"), "looking up account")
947 }
948 }
949 addr, err := smtp.ParseAddress(address)
950 xctl.xcheck(err, "parsing address")
951 sup, err := queue.SuppressionLookup(ctx, account, addr.Path())
952 xctl.xcheck(err, "looking up suppression")
953 xctl.xwriteok()
954 xw := xctl.writer()
955 if sup == nil {
956 fmt.Fprintln(xw, "not present")
957 } else {
958 manual := "no"
959 if sup.Manual {
960 manual = "yes"
961 }
962 fmt.Fprintf(xw, "present\nadded: %s\nmanual: %s\nbase address: %s\nreason: %q\n", sup.Created.Round(time.Second), manual, sup.BaseAddress, sup.Reason)
963 }
964 xw.xclose()
965
966 case "importmaildir", "importmbox":
967 mbox := cmd == "importmbox"
968 ximportctl(ctx, xctl, mbox)
969
970 case "domainadd":
971 /* protocol:
972 > "domainadd"
973 > disabled as "true" or "false"
974 > domain
975 > account
976 > localpart
977 < "ok" or error
978 */
979 var disabled bool
980 switch s := xctl.xread(); s {
981 case "true":
982 disabled = true
983 case "false":
984 disabled = false
985 default:
986 xctl.xcheck(fmt.Errorf("invalid value %q", s), "parsing disabled boolean")
987 }
988
989 domain := xctl.xread()
990 account := xctl.xread()
991 localpart := xctl.xread()
992 d, err := dns.ParseDomain(domain)
993 xctl.xcheck(err, "parsing domain")
994 err = admin.DomainAdd(ctx, disabled, d, account, smtp.Localpart(localpart))
995 xctl.xcheck(err, "adding domain")
996 xctl.xwriteok()
997
998 case "domainrm":
999 /* protocol:
1000 > "domainrm"
1001 > domain
1002 < "ok" or error
1003 */
1004 domain := xctl.xread()
1005 d, err := dns.ParseDomain(domain)
1006 xctl.xcheck(err, "parsing domain")
1007 err = admin.DomainRemove(ctx, d)
1008 xctl.xcheck(err, "removing domain")
1009 xctl.xwriteok()
1010
1011 case "domaindisabled":
1012 /* protocol:
1013 > "domaindisabled"
1014 > "true" or "false"
1015 > domain
1016 < "ok" or error
1017 */
1018 domain := xctl.xread()
1019 var disabled bool
1020 switch s := xctl.xread(); s {
1021 case "true":
1022 disabled = true
1023 case "false":
1024 disabled = false
1025 default:
1026 xctl.xerror("bad boolean value")
1027 }
1028 err := admin.DomainSave(ctx, domain, func(d *config.Domain) error {
1029 d.Disabled = disabled
1030 return nil
1031 })
1032 xctl.xcheck(err, "saving domain")
1033 xctl.xwriteok()
1034
1035 case "accountadd":
1036 /* protocol:
1037 > "accountadd"
1038 > account
1039 > address
1040 < "ok" or error
1041 */
1042 account := xctl.xread()
1043 address := xctl.xread()
1044 err := admin.AccountAdd(ctx, account, address)
1045 xctl.xcheck(err, "adding account")
1046 xctl.xwriteok()
1047
1048 case "accountrm":
1049 /* protocol:
1050 > "accountrm"
1051 > account
1052 < "ok" or error
1053 */
1054 account := xctl.xread()
1055 err := admin.AccountRemove(ctx, account)
1056 xctl.xcheck(err, "removing account")
1057 xctl.xwriteok()
1058
1059 case "accountlist":
1060 /* protocol:
1061 > "accountlist"
1062 < "ok" or error
1063 < stream
1064 */
1065 xctl.xwriteok()
1066 xw := xctl.writer()
1067 all, disabled := mox.Conf.AccountsDisabled()
1068 slices.Sort(all)
1069 for _, account := range all {
1070 var extra string
1071 if slices.Contains(disabled, account) {
1072 extra += "\t(disabled)"
1073 }
1074 fmt.Fprintf(xw, "%s%s\n", account, extra)
1075 }
1076 xw.xclose()
1077
1078 case "accountdisabled":
1079 /* protocol:
1080 > "accountdisabled"
1081 > account
1082 > message (if empty, then enabled)
1083 < "ok" or error
1084 */
1085 account := xctl.xread()
1086 message := xctl.xread()
1087
1088 acc, err := store.OpenAccount(log, account, false)
1089 xctl.xcheck(err, "open account")
1090 defer func() {
1091 err := acc.Close()
1092 log.Check(err, "closing account")
1093 }()
1094
1095 err = admin.AccountSave(ctx, account, func(acc *config.Account) {
1096 acc.LoginDisabled = message
1097 })
1098 xctl.xcheck(err, "saving account")
1099
1100 err = acc.SessionsClear(ctx, xctl.log)
1101 xctl.xcheck(err, "clearing active web sessions")
1102
1103 xctl.xwriteok()
1104
1105 case "accountenable":
1106 /* protocol:
1107 > "accountenable"
1108 > account
1109 < "ok" or error
1110 */
1111 account := xctl.xread()
1112 err := admin.AccountSave(ctx, account, func(acc *config.Account) {
1113 acc.LoginDisabled = ""
1114 })
1115 xctl.xcheck(err, "enabling account")
1116 xctl.xwriteok()
1117
1118 case "tlspubkeylist":
1119 /* protocol:
1120 > "tlspubkeylist"
1121 > account (or empty)
1122 < "ok" or error
1123 < stream
1124 */
1125 accountOpt := xctl.xread()
1126 tlspubkeys, err := store.TLSPublicKeyList(ctx, accountOpt)
1127 xctl.xcheck(err, "list tls public keys")
1128 xctl.xwriteok()
1129 xw := xctl.writer()
1130 fmt.Fprintf(xw, "# fingerprint, type, name, account, login address, no imap preauth (%d)\n", len(tlspubkeys))
1131 for _, k := range tlspubkeys {
1132 fmt.Fprintf(xw, "%s\t%s\t%q\t%s\t%s\t%v\n", k.Fingerprint, k.Type, k.Name, k.Account, k.LoginAddress, k.NoIMAPPreauth)
1133 }
1134 xw.xclose()
1135
1136 case "tlspubkeyget":
1137 /* protocol:
1138 > "tlspubkeyget"
1139 > fingerprint
1140 < "ok" or error
1141 < type
1142 < name
1143 < account
1144 < address
1145 < noimappreauth (true/false)
1146 < stream (certder)
1147 */
1148 fp := xctl.xread()
1149 tlspubkey, err := store.TLSPublicKeyGet(ctx, fp)
1150 xctl.xcheck(err, "looking tls public key")
1151 xctl.xwriteok()
1152 xctl.xwrite(tlspubkey.Type)
1153 xctl.xwrite(tlspubkey.Name)
1154 xctl.xwrite(tlspubkey.Account)
1155 xctl.xwrite(tlspubkey.LoginAddress)
1156 xctl.xwrite(fmt.Sprintf("%v", tlspubkey.NoIMAPPreauth))
1157 xctl.xstreamfrom(bytes.NewReader(tlspubkey.CertDER))
1158
1159 case "tlspubkeyadd":
1160 /* protocol:
1161 > "tlspubkeyadd"
1162 > loginaddress
1163 > name (or empty)
1164 > noimappreauth (true/false)
1165 > stream (certder)
1166 < "ok" or error
1167 */
1168 loginAddress := xctl.xread()
1169 name := xctl.xread()
1170 noimappreauth := xctl.xread()
1171 if noimappreauth != "true" && noimappreauth != "false" {
1172 xctl.xcheck(fmt.Errorf("bad value %q", noimappreauth), "parsing noimappreauth")
1173 }
1174 var b bytes.Buffer
1175 xctl.xstreamto(&b)
1176 tlspubkey, err := store.ParseTLSPublicKeyCert(b.Bytes())
1177 xctl.xcheck(err, "parsing certificate")
1178 if name != "" {
1179 tlspubkey.Name = name
1180 }
1181 acc, _, _, err := store.OpenEmail(xctl.log, loginAddress, false)
1182 xctl.xcheck(err, "open account for address")
1183 defer func() {
1184 err := acc.Close()
1185 xctl.log.Check(err, "close account")
1186 }()
1187 tlspubkey.Account = acc.Name
1188 tlspubkey.LoginAddress = loginAddress
1189 tlspubkey.NoIMAPPreauth = noimappreauth == "true"
1190
1191 err = store.TLSPublicKeyAdd(ctx, &tlspubkey)
1192 xctl.xcheck(err, "adding tls public key")
1193 xctl.xwriteok()
1194
1195 case "tlspubkeyrm":
1196 /* protocol:
1197 > "tlspubkeyadd"
1198 > fingerprint
1199 < "ok" or error
1200 */
1201 fp := xctl.xread()
1202 err := store.TLSPublicKeyRemove(ctx, fp)
1203 xctl.xcheck(err, "removing tls public key")
1204 xctl.xwriteok()
1205
1206 case "addressadd":
1207 /* protocol:
1208 > "addressadd"
1209 > address
1210 > account
1211 < "ok" or error
1212 */
1213 address := xctl.xread()
1214 account := xctl.xread()
1215 err := admin.AddressAdd(ctx, address, account)
1216 xctl.xcheck(err, "adding address")
1217 xctl.xwriteok()
1218
1219 case "addressrm":
1220 /* protocol:
1221 > "addressrm"
1222 > address
1223 < "ok" or error
1224 */
1225 address := xctl.xread()
1226 err := admin.AddressRemove(ctx, address)
1227 xctl.xcheck(err, "removing address")
1228 xctl.xwriteok()
1229
1230 case "aliaslist":
1231 /* protocol:
1232 > "aliaslist"
1233 > domain
1234 < "ok" or error
1235 < stream
1236 */
1237 domain := xctl.xread()
1238 d, err := dns.ParseDomain(domain)
1239 xctl.xcheck(err, "parsing domain")
1240 dc, ok := mox.Conf.Domain(d)
1241 if !ok {
1242 xctl.xcheck(errors.New("no such domain"), "listing aliases")
1243 }
1244 xctl.xwriteok()
1245 xw := xctl.writer()
1246 for _, a := range dc.Aliases {
1247 lp, err := smtp.ParseLocalpart(a.LocalpartStr)
1248 xctl.xcheck(err, "parsing alias localpart")
1249 fmt.Fprintln(xw, smtp.NewAddress(lp, a.Domain).Pack(true))
1250 }
1251 xw.xclose()
1252
1253 case "aliasprint":
1254 /* protocol:
1255 > "aliasprint"
1256 > address
1257 < "ok" or error
1258 < stream
1259 */
1260 address := xctl.xread()
1261 _, alias, ok := mox.Conf.AccountDestination(address)
1262 if !ok {
1263 xctl.xcheck(errors.New("no such address"), "looking up alias")
1264 } else if alias == nil {
1265 xctl.xcheck(errors.New("address not an alias"), "looking up alias")
1266 }
1267 xctl.xwriteok()
1268 xw := xctl.writer()
1269 fmt.Fprintf(xw, "# postpublic %v\n", alias.PostPublic)
1270 fmt.Fprintf(xw, "# listmembers %v\n", alias.ListMembers)
1271 fmt.Fprintf(xw, "# allowmsgfrom %v\n", alias.AllowMsgFrom)
1272 fmt.Fprintln(xw, "# members:")
1273 for _, a := range alias.Addresses {
1274 fmt.Fprintln(xw, a)
1275 }
1276 xw.xclose()
1277
1278 case "aliasadd":
1279 /* protocol:
1280 > "aliasadd"
1281 > address
1282 > json alias
1283 < "ok" or error
1284 */
1285 address := xctl.xread()
1286 line := xctl.xread()
1287 addr, err := smtp.ParseAddress(address)
1288 xctl.xcheck(err, "parsing address")
1289 var alias config.Alias
1290 xparseJSON(xctl, line, &alias)
1291 err = admin.AliasAdd(ctx, addr, alias)
1292 xctl.xcheck(err, "adding alias")
1293 xctl.xwriteok()
1294
1295 case "aliasupdate":
1296 /* protocol:
1297 > "aliasupdate"
1298 > alias
1299 > "true" or "false" for postpublic
1300 > "true" or "false" for listmembers
1301 > "true" or "false" for allowmsgfrom
1302 < "ok" or error
1303 */
1304 address := xctl.xread()
1305 postpublic := xctl.xread()
1306 listmembers := xctl.xread()
1307 allowmsgfrom := xctl.xread()
1308 addr, err := smtp.ParseAddress(address)
1309 xctl.xcheck(err, "parsing address")
1310 err = admin.DomainSave(ctx, addr.Domain.Name(), func(d *config.Domain) error {
1311 a, ok := d.Aliases[addr.Localpart.String()]
1312 if !ok {
1313 return fmt.Errorf("alias does not exist")
1314 }
1315
1316 switch postpublic {
1317 case "false":
1318 a.PostPublic = false
1319 case "true":
1320 a.PostPublic = true
1321 }
1322 switch listmembers {
1323 case "false":
1324 a.ListMembers = false
1325 case "true":
1326 a.ListMembers = true
1327 }
1328 switch allowmsgfrom {
1329 case "false":
1330 a.AllowMsgFrom = false
1331 case "true":
1332 a.AllowMsgFrom = true
1333 }
1334
1335 d.Aliases = maps.Clone(d.Aliases)
1336 d.Aliases[addr.Localpart.String()] = a
1337 return nil
1338 })
1339 xctl.xcheck(err, "saving alias")
1340 xctl.xwriteok()
1341
1342 case "aliasrm":
1343 /* protocol:
1344 > "aliasrm"
1345 > alias
1346 < "ok" or error
1347 */
1348 address := xctl.xread()
1349 addr, err := smtp.ParseAddress(address)
1350 xctl.xcheck(err, "parsing address")
1351 err = admin.AliasRemove(ctx, addr)
1352 xctl.xcheck(err, "removing alias")
1353 xctl.xwriteok()
1354
1355 case "aliasaddaddr":
1356 /* protocol:
1357 > "aliasaddaddr"
1358 > alias
1359 > addresses as json
1360 < "ok" or error
1361 */
1362 address := xctl.xread()
1363 line := xctl.xread()
1364 addr, err := smtp.ParseAddress(address)
1365 xctl.xcheck(err, "parsing address")
1366 var addresses []string
1367 xparseJSON(xctl, line, &addresses)
1368 err = admin.AliasAddressesAdd(ctx, addr, addresses)
1369 xctl.xcheck(err, "adding addresses to alias")
1370 xctl.xwriteok()
1371
1372 case "aliasrmaddr":
1373 /* protocol:
1374 > "aliasrmaddr"
1375 > alias
1376 > addresses as json
1377 < "ok" or error
1378 */
1379 address := xctl.xread()
1380 line := xctl.xread()
1381 addr, err := smtp.ParseAddress(address)
1382 xctl.xcheck(err, "parsing address")
1383 var addresses []string
1384 xparseJSON(xctl, line, &addresses)
1385 err = admin.AliasAddressesRemove(ctx, addr, addresses)
1386 xctl.xcheck(err, "removing addresses to alias")
1387 xctl.xwriteok()
1388
1389 case "loglevels":
1390 /* protocol:
1391 > "loglevels"
1392 < "ok"
1393 < stream
1394 */
1395 xctl.xwriteok()
1396 l := mox.Conf.LogLevels()
1397 keys := []string{}
1398 for k := range l {
1399 keys = append(keys, k)
1400 }
1401 slices.Sort(keys)
1402 s := ""
1403 for _, k := range keys {
1404 ks := k
1405 if ks == "" {
1406 ks = "(default)"
1407 }
1408 s += ks + ": " + mlog.LevelStrings[l[k]] + "\n"
1409 }
1410 xctl.xstreamfrom(strings.NewReader(s))
1411
1412 case "setloglevels":
1413 /* protocol:
1414 > "setloglevels"
1415 > pkg
1416 > level (if empty, log level for pkg will be unset)
1417 < "ok" or error
1418 */
1419 pkg := xctl.xread()
1420 levelstr := xctl.xread()
1421 if levelstr == "" {
1422 mox.Conf.LogLevelRemove(log, pkg)
1423 } else {
1424 level, ok := mlog.Levels[levelstr]
1425 if !ok {
1426 xctl.xerror("bad level")
1427 }
1428 mox.Conf.LogLevelSet(log, pkg, level)
1429 }
1430 xctl.xwriteok()
1431
1432 case "retrain":
1433 /* protocol:
1434 > "retrain"
1435 > account or empty
1436 < "ok" or error
1437 */
1438 account := xctl.xread()
1439
1440 xretrain := func(name string) {
1441 acc, err := store.OpenAccount(log, name, false)
1442 xctl.xcheck(err, "open account")
1443 defer func() {
1444 if acc != nil {
1445 err := acc.Close()
1446 log.Check(err, "closing account after retraining")
1447 }
1448 }()
1449
1450 // todo: can we retrain an account without holding a write lock? perhaps by writing a junkfilter to a new location, and staying informed of message changes while we go through all messages in the account?
1451
1452 acc.WithWLock(func() {
1453 conf, _ := acc.Conf()
1454 if conf.JunkFilter == nil {
1455 xctl.xcheck(store.ErrNoJunkFilter, "looking for junk filter")
1456 }
1457
1458 // Remove existing junk filter files.
1459 basePath := mox.DataDirPath("accounts")
1460 dbPath := filepath.Join(basePath, acc.Name, "junkfilter.db")
1461 bloomPath := filepath.Join(basePath, acc.Name, "junkfilter.bloom")
1462 err := os.Remove(dbPath)
1463 log.Check(err, "removing old junkfilter database file", slog.String("path", dbPath))
1464 err = os.Remove(bloomPath)
1465 log.Check(err, "removing old junkfilter bloom filter file", slog.String("path", bloomPath))
1466
1467 // Open junk filter, this creates new files.
1468 jf, _, err := acc.OpenJunkFilter(ctx, log)
1469 xctl.xcheck(err, "open new junk filter")
1470 defer func() {
1471 if jf == nil {
1472 return
1473 }
1474 err := jf.CloseDiscard()
1475 log.Check(err, "closing junk filter during cleanup")
1476 }()
1477
1478 // Read through messages with either junk or nonjunk flag set, and train them.
1479 var total, trained int
1480 err = acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1481 q := bstore.QueryTx[store.Message](tx)
1482 q.FilterEqual("Expunged", false)
1483 return q.ForEach(func(m store.Message) error {
1484 total++
1485 if m.Junk == m.Notjunk {
1486 return nil
1487 }
1488 ok, err := acc.TrainMessage(ctx, log, jf, m.Notjunk, m)
1489 if ok {
1490 trained++
1491 }
1492 if m.TrainedJunk == nil || *m.TrainedJunk != m.Junk {
1493 m.TrainedJunk = &m.Junk
1494 if err := tx.Update(&m); err != nil {
1495 return fmt.Errorf("marking message as trained: %v", err)
1496 }
1497 }
1498 return err
1499 })
1500 })
1501 xctl.xcheck(err, "training messages")
1502 log.Info("retrained messages", slog.Int("total", total), slog.Int("trained", trained))
1503
1504 // Close junk filter, marking success.
1505 err = jf.Close()
1506 jf = nil
1507 xctl.xcheck(err, "closing junk filter")
1508 })
1509 }
1510
1511 if account == "" {
1512 for _, name := range mox.Conf.Accounts() {
1513 xretrain(name)
1514 }
1515 } else {
1516 xretrain(account)
1517 }
1518 xctl.xwriteok()
1519
1520 case "recalculatemailboxcounts":
1521 /* protocol:
1522 > "recalculatemailboxcounts"
1523 > account
1524 < "ok" or error
1525 < stream
1526 */
1527 account := xctl.xread()
1528 acc, err := store.OpenAccount(log, account, false)
1529 xctl.xcheck(err, "open account")
1530 defer func() {
1531 if acc != nil {
1532 err := acc.Close()
1533 log.Check(err, "closing account after recalculating mailbox counts")
1534 }
1535 }()
1536 xctl.xwriteok()
1537
1538 xw := xctl.writer()
1539
1540 acc.WithWLock(func() {
1541 var changes []store.Change
1542 err = acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1543 var totalSize int64
1544 err := bstore.QueryTx[store.Mailbox](tx).FilterEqual("Expunged", false).ForEach(func(mb store.Mailbox) error {
1545 mc, err := mb.CalculateCounts(tx)
1546 if err != nil {
1547 return fmt.Errorf("calculating counts for mailbox %q: %w", mb.Name, err)
1548 }
1549 totalSize += mc.Size
1550
1551 if mc != mb.MailboxCounts {
1552 fmt.Fprintf(xw, "for %s setting new counts %s (was %s)\n", mb.Name, mc, mb.MailboxCounts)
1553 mb.HaveCounts = true
1554 mb.MailboxCounts = mc
1555 if err := tx.Update(&mb); err != nil {
1556 return fmt.Errorf("storing new counts for %q: %v", mb.Name, err)
1557 }
1558 changes = append(changes, mb.ChangeCounts())
1559 }
1560 return nil
1561 })
1562 if err != nil {
1563 return err
1564 }
1565
1566 du := store.DiskUsage{ID: 1}
1567 if err := tx.Get(&du); err != nil {
1568 return fmt.Errorf("get disk usage: %v", err)
1569 }
1570 if du.MessageSize != totalSize {
1571 fmt.Fprintf(xw, "setting new total message size %d (was %d)\n", totalSize, du.MessageSize)
1572 du.MessageSize = totalSize
1573 if err := tx.Update(&du); err != nil {
1574 return fmt.Errorf("update disk usage: %v", err)
1575 }
1576 }
1577 return nil
1578 })
1579 xctl.xcheck(err, "write transaction for mailbox counts")
1580
1581 store.BroadcastChanges(acc, changes)
1582 })
1583 xw.xclose()
1584
1585 case "fixmsgsize":
1586 /* protocol:
1587 > "fixmsgsize"
1588 > account or empty
1589 < "ok" or error
1590 < stream
1591 */
1592
1593 accountOpt := xctl.xread()
1594 xctl.xwriteok()
1595 xw := xctl.writer()
1596
1597 var foundProblem bool
1598 const batchSize = 10000
1599
1600 xfixmsgsize := func(accName string) {
1601 acc, err := store.OpenAccount(log, accName, false)
1602 xctl.xcheck(err, "open account")
1603 defer func() {
1604 err := acc.Close()
1605 log.Check(err, "closing account after fixing message sizes")
1606 }()
1607
1608 total := 0
1609 var lastID int64
1610 for {
1611 var n int
1612
1613 acc.WithRLock(func() {
1614 mailboxCounts := map[int64]store.Mailbox{} // For broadcasting.
1615
1616 // Don't process all message in one transaction, we could block the account for too long.
1617 err := acc.DB.Write(ctx, func(tx *bstore.Tx) error {
1618 q := bstore.QueryTx[store.Message](tx)
1619 q.FilterEqual("Expunged", false)
1620 q.FilterGreater("ID", lastID)
1621 q.Limit(batchSize)
1622 q.SortAsc("ID")
1623 return q.ForEach(func(m store.Message) error {
1624 lastID = m.ID
1625 n++
1626
1627 p := acc.MessagePath(m.ID)
1628 st, err := os.Stat(p)
1629 if err != nil {
1630 mb := store.Mailbox{ID: m.MailboxID}
1631 if xerr := tx.Get(&mb); xerr != nil {
1632 fmt.Fprintf(xw, "get mailbox id %d for message with file error: %v\n", mb.ID, xerr)
1633 }
1634 fmt.Fprintf(xw, "checking file %s for message %d in mailbox %q (id %d): %v (continuing)\n", p, m.ID, mb.Name, mb.ID, err)
1635 return nil
1636 }
1637 filesize := st.Size()
1638 correctSize := int64(len(m.MsgPrefix)) + filesize
1639 if m.Size == correctSize {
1640 return nil
1641 }
1642
1643 foundProblem = true
1644
1645 mb := store.Mailbox{ID: m.MailboxID}
1646 if err := tx.Get(&mb); err != nil {
1647 fmt.Fprintf(xw, "get mailbox id %d for message with file size mismatch: %v\n", mb.ID, err)
1648 return nil
1649 }
1650 if mb.Expunged {
1651 fmt.Fprintf(xw, "message %d is in expunged mailbox %q (id %d) (continuing)\n", m.ID, mb.Name, mb.ID)
1652 }
1653 fmt.Fprintf(xw, "fixing message %d in mailbox %q (id %d) with incorrect size %d, should be %d (len msg prefix %d + on-disk file %s size %d)\n", m.ID, mb.Name, mb.ID, m.Size, correctSize, len(m.MsgPrefix), p, filesize)
1654
1655 // We assume that the original message size was accounted as stored in the mailbox
1656 // total size. If this isn't correct, the user can always run
1657 // recalculatemailboxcounts.
1658 mb.Size -= m.Size
1659 mb.Size += correctSize
1660 if err := tx.Update(&mb); err != nil {
1661 return fmt.Errorf("update mailbox counts: %v", err)
1662 }
1663 mailboxCounts[mb.ID] = mb
1664
1665 m.Size = correctSize
1666
1667 mr := acc.MessageReader(m)
1668 part, err := message.EnsurePart(log.Logger, false, mr, m.Size)
1669 if err != nil {
1670 fmt.Fprintf(xw, "parsing message %d again: %v (continuing)\n", m.ID, err)
1671 }
1672 m.ParsedBuf, err = json.Marshal(part)
1673 if err != nil {
1674 return fmt.Errorf("marshal parsed message: %v", err)
1675 }
1676 total++
1677 if err := tx.Update(&m); err != nil {
1678 return fmt.Errorf("update message: %v", err)
1679 }
1680 return nil
1681 })
1682 })
1683 xctl.xcheck(err, "find and fix wrong message sizes")
1684
1685 var changes []store.Change
1686 for _, mb := range mailboxCounts {
1687 changes = append(changes, mb.ChangeCounts())
1688 }
1689 store.BroadcastChanges(acc, changes)
1690 })
1691 if n < batchSize {
1692 break
1693 }
1694 }
1695 fmt.Fprintf(xw, "%d message size(s) fixed for account %s\n", total, accName)
1696 }
1697
1698 if accountOpt != "" {
1699 xfixmsgsize(accountOpt)
1700 } else {
1701 for i, accName := range mox.Conf.Accounts() {
1702 var line string
1703 if i > 0 {
1704 line = "\n"
1705 }
1706 fmt.Fprintf(xw, "%sFixing message sizes in account %s...\n", line, accName)
1707 xfixmsgsize(accName)
1708 }
1709 }
1710 if foundProblem {
1711 fmt.Fprintf(xw, "\nProblems were found and fixed. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
1712 }
1713
1714 xw.xclose()
1715
1716 case "reparse":
1717 /* protocol:
1718 > "reparse"
1719 > account or empty
1720 < "ok" or error
1721 < stream
1722 */
1723
1724 accountOpt := xctl.xread()
1725 xctl.xwriteok()
1726 xw := xctl.writer()
1727
1728 xreparseAccount := func(accName string) {
1729 acc, err := store.OpenAccount(log, accName, false)
1730 xctl.xcheck(err, "open account")
1731 defer func() {
1732 err := acc.Close()
1733 log.Check(err, "closing account after reparsing messages")
1734 }()
1735
1736 start := time.Now()
1737 total, err := acc.ReparseMessages(ctx, log)
1738 xctl.xcheck(err, "reparse messages")
1739
1740 fmt.Fprintf(xw, "%d message(s) reparsed for account %s in %dms\n", total, accName, time.Since(start)/time.Millisecond)
1741 }
1742
1743 if accountOpt != "" {
1744 xreparseAccount(accountOpt)
1745 } else {
1746 for i, accName := range mox.Conf.Accounts() {
1747 var line string
1748 if i > 0 {
1749 line = "\n"
1750 }
1751 fmt.Fprintf(xw, "%sReparsing account %s...\n", line, accName)
1752 xreparseAccount(accName)
1753 }
1754 }
1755 xw.xclose()
1756
1757 case "reassignthreads":
1758 /* protocol:
1759 > "reassignthreads"
1760 > account or empty
1761 < "ok" or error
1762 < stream
1763 */
1764
1765 accountOpt := xctl.xread()
1766 xctl.xwriteok()
1767 xw := xctl.writer()
1768
1769 xreassignThreads := func(accName string) {
1770 acc, err := store.OpenAccount(log, accName, false)
1771 xctl.xcheck(err, "open account")
1772 defer func() {
1773 err := acc.Close()
1774 log.Check(err, "closing account after reassigning threads")
1775 }()
1776
1777 // We don't want to step on an existing upgrade process.
1778 err = acc.ThreadingWait(log)
1779 xctl.xcheck(err, "waiting for threading upgrade to finish")
1780 // todo: should we try to continue if the threading upgrade failed? only if there is a chance it will succeed this time...
1781
1782 // todo: reassigning isn't atomic (in a single transaction), ideally it would be (bstore would need to be able to handle large updates).
1783 const batchSize = 50000
1784 total, err := acc.ResetThreading(ctx, log, batchSize, true)
1785 xctl.xcheck(err, "resetting threading fields")
1786 fmt.Fprintf(xw, "New thread base subject assigned to %d message(s), starting to reassign threads...\n", total)
1787
1788 // Assign threads again. Ideally we would do this in a single transaction, but
1789 // bstore/boltdb cannot handle so many pending changes, so we set a high batchsize.
1790 err = acc.AssignThreads(ctx, log, nil, 0, 50000, xw)
1791 xctl.xcheck(err, "reassign threads")
1792
1793 fmt.Fprintf(xw, "Threads reassigned. You should invalidate messages stored at imap clients with the \"mox bumpuidvalidity account [mailbox]\" command.\n")
1794 }
1795
1796 if accountOpt != "" {
1797 xreassignThreads(accountOpt)
1798 } else {
1799 for i, accName := range mox.Conf.Accounts() {
1800 var line string
1801 if i > 0 {
1802 line = "\n"
1803 }
1804 fmt.Fprintf(xw, "%sReassigning threads for account %s...\n", line, accName)
1805 xreassignThreads(accName)
1806 }
1807 }
1808 xw.xclose()
1809
1810 case "backup":
1811 xbackupctl(ctx, xctl)
1812
1813 case "imapserve":
1814 /* protocol:
1815 > "imapserve"
1816 > address
1817 < "ok or error"
1818 imap protocol
1819 */
1820 address := xctl.xread()
1821 xctl.xwriteok()
1822 imapserver.ServeConnPreauth("(imapserve)", cid, xctl.conn, address)
1823 xctl.log.Debug("imap connection finished")
1824
1825 default:
1826 log.Info("unrecognized command", slog.String("cmd", cmd))
1827 xctl.xwrite("unrecognized command")
1828 return
1829 }
1830}
1831