1// Package queue is in charge of outgoing messages, queueing them when submitted,
2// attempting a first delivery over SMTP, retrying with backoff and sending DSNs
3// for delayed or failed deliveries.
18 "golang.org/x/net/proxy"
20 "github.com/prometheus/client_golang/prometheus"
21 "github.com/prometheus/client_golang/prometheus/promauto"
23 "github.com/mjl-/bstore"
25 "github.com/mjl-/mox/config"
26 "github.com/mjl-/mox/dns"
27 "github.com/mjl-/mox/dsn"
28 "github.com/mjl-/mox/metrics"
29 "github.com/mjl-/mox/mlog"
30 "github.com/mjl-/mox/mox-"
31 "github.com/mjl-/mox/moxio"
32 "github.com/mjl-/mox/smtp"
33 "github.com/mjl-/mox/store"
36var xlog = mlog.New("queue")
39 metricConnection = promauto.NewCounterVec(
40 prometheus.CounterOpts{
41 Name: "mox_queue_connection_total",
42 Help: "Queue client connections, outgoing.",
45 "result", // "ok", "timeout", "canceled", "error"
48 metricDelivery = promauto.NewHistogramVec(
49 prometheus.HistogramOpts{
50 Name: "mox_queue_delivery_duration_seconds",
51 Help: "SMTP client delivery attempt to single host.",
52 Buckets: []float64{0.01, 0.05, 0.100, 0.5, 1, 5, 10, 20, 30, 60, 120},
55 "attempt", // Number of attempts.
56 "transport", // empty for default direct delivery.
57 "tlsmode", // strict, opportunistic, skip
58 "result", // ok, timeout, canceled, temperror, permerror, error
63type contextDialer interface {
64 DialContext(ctx context.Context, network, addr string) (c net.Conn, err error)
67// Used to dial remote SMTP servers.
68// Overridden for tests.
69var dial = func(ctx context.Context, dialer contextDialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
70 // If this is a net.Dialer, use its settings and add the timeout and localaddr.
71 // This is the typical case, but SOCKS5 support can use a different dialer.
72 if d, ok := dialer.(*net.Dialer); ok {
76 return nd.DialContext(ctx, "tcp", addr)
78 return dialer.DialContext(ctx, "tcp", addr)
81var jitter = mox.NewRand()
83var DBTypes = []any{Msg{}} // Types stored in DB.
84var DB *bstore.DB // Exported for making backups.
86// Set for mox localserve, to prevent queueing.
89// Msg is a message in the queue.
92 Queued time.Time `bstore:"default now"`
93 SenderAccount string // Failures are delivered back to this local account. Also used for routing.
94 SenderLocalpart smtp.Localpart // Should be a local user and domain.
95 SenderDomain dns.IPDomain
96 RecipientLocalpart smtp.Localpart // Typically a remote user and domain.
97 RecipientDomain dns.IPDomain
98 RecipientDomainStr string // For filtering.
99 Attempts int // Next attempt is based on last attempt and exponential back off based on attempts.
100 DialedIPs map[string][]net.IP // For each host, the IPs that were dialed. Used for IP selection for later attempts.
101 NextAttempt time.Time // For scheduling.
102 LastAttempt *time.Time
104 Has8bit bool // Whether message contains bytes with high bit set, determines whether 8BITMIME SMTP extension is needed.
105 SMTPUTF8 bool // Whether message requires use of SMTPUTF8.
106 Size int64 // Full size of message, combined MsgPrefix with contents of message file.
107 MessageID string // Used when composing a DSN, in its References header.
110 // If set, this message is a DSN and this is a version using utf-8, for the case
111 // the remote MTA supports smtputf8. In this case, Size and MsgPrefix are not
115 // If non-empty, the transport to use for this message. Can be set through cli or
116 // admin interface. If empty (the default for a submitted message), regular routing
121// Sender of message as used in MAIL FROM.
122func (m Msg) Sender() smtp.Path {
123 return smtp.Path{Localpart: m.SenderLocalpart, IPDomain: m.SenderDomain}
126// Recipient of message as used in RCPT TO.
127func (m Msg) Recipient() smtp.Path {
128 return smtp.Path{Localpart: m.RecipientLocalpart, IPDomain: m.RecipientDomain}
131// MessagePath returns the path where the message is stored.
132func (m Msg) MessagePath() string {
133 return mox.DataDirPath(filepath.Join("queue", store.MessagePath(m.ID)))
136// Init opens the queue database without starting delivery.
138 qpath := mox.DataDirPath("queue/index.db")
139 os.MkdirAll(filepath.Dir(qpath), 0770)
141 if _, err := os.Stat(qpath); err != nil && os.IsNotExist(err) {
146 DB, err = bstore.Open(mox.Shutdown, qpath, &bstore.Options{Timeout: 5 * time.Second, Perm: 0660}, DBTypes...)
151 return fmt.Errorf("open queue database: %s", err)
156// Shutdown closes the queue database. The delivery process isn't stopped. For tests only.
159 xlog.Check(err, "closing queue db")
163// List returns all messages in the delivery queue.
164// Ordered by earliest delivery attempt first.
165func List(ctx context.Context) ([]Msg, error) {
166 qmsgs, err := bstore.QueryDB[Msg](ctx, DB).List()
170 sort.Slice(qmsgs, func(i, j int) bool {
173 la := a.LastAttempt != nil
174 lb := b.LastAttempt != nil
177 } else if la && !lb {
180 if !la && !lb || a.LastAttempt.Equal(*b.LastAttempt) {
183 return a.LastAttempt.Before(*b.LastAttempt)
188// Count returns the number of messages in the delivery queue.
189func Count(ctx context.Context) (int, error) {
190 return bstore.QueryDB[Msg](ctx, DB).Count()
193// Add a new message to the queue. The queue is kicked immediately to start a
194// first delivery attempt.
196// If consumeFile is true, it is removed as part of delivery (by rename or copy
197// and remove). msgFile is never closed by Add.
199// dnsutf8Opt is a utf8-version of the message, to be used only for DNSs. If set,
200// this data is used as the message when delivering the DSN and the remote SMTP
201// server supports SMTPUTF8. If the remote SMTP server does not support SMTPUTF8,
202// the regular non-utf8 message is delivered.
203func Add(ctx context.Context, log *mlog.Log, senderAccount string, mailFrom, rcptTo smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, msgPrefix []byte, msgFile *os.File, dsnutf8Opt []byte, consumeFile bool) (int64, error) {
204 // todo: Add should accept multiple rcptTo if they are for the same domain. so we can queue them for delivery in one (or just a few) session(s), transferring the data only once.
../rfc/5321:3759
207 if senderAccount == "" {
208 return 0, fmt.Errorf("cannot queue with localserve without local account")
210 acc, err := store.OpenAccount(senderAccount)
212 return 0, fmt.Errorf("opening sender account for immediate delivery with localserve: %v", err)
216 log.Check(err, "closing account")
218 m := store.Message{Size: size, MsgPrefix: msgPrefix}
219 conf, _ := acc.Conf()
220 dest := conf.Destinations[mailFrom.String()]
221 acc.WithWLock(func() {
222 err = acc.DeliverDestination(log, dest, &m, msgFile, consumeFile)
225 return 0, fmt.Errorf("delivering message: %v", err)
227 log.Debug("immediately delivered from queue to sender")
231 tx, err := DB.Begin(ctx, true)
233 return 0, fmt.Errorf("begin transaction: %w", err)
237 if err := tx.Rollback(); err != nil {
238 log.Errorx("rollback for queue", err)
244 qm := Msg{0, now, senderAccount, mailFrom.Localpart, mailFrom.IPDomain, rcptTo.Localpart, rcptTo.IPDomain, formatIPDomain(rcptTo.IPDomain), 0, nil, now, nil, "", has8bit, smtputf8, size, messageID, msgPrefix, dsnutf8Opt, ""}
246 if err := tx.Insert(&qm); err != nil {
250 dst := qm.MessagePath()
253 err := os.Remove(dst)
254 log.Check(err, "removing destination message file for queue", mlog.Field("path", dst))
257 dstDir := filepath.Dir(dst)
258 os.MkdirAll(dstDir, 0770)
260 if err := os.Rename(msgFile.Name(), dst); err != nil {
261 // Could be due to cross-filesystem rename. Users shouldn't configure their systems that way.
262 return 0, fmt.Errorf("move message into queue dir: %w", err)
264 } else if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil {
265 return 0, fmt.Errorf("linking/copying message to new file: %s", err)
266 } else if err := moxio.SyncDir(dstDir); err != nil {
267 return 0, fmt.Errorf("sync directory: %v", err)
270 if err := tx.Commit(); err != nil {
271 return 0, fmt.Errorf("commit transaction: %s", err)
280func formatIPDomain(d dns.IPDomain) string {
282 return "[" + d.IP.String() + "]"
284 return d.Domain.Name()
288 kick = make(chan struct{}, 1)
289 deliveryResult = make(chan string, 1)
294 case kick <- struct{}{}:
299// Kick sets the NextAttempt for messages matching all filter parameters (ID,
300// toDomain, recipient) that are nonzero, and kicks the queue, attempting delivery
301// of those messages. If all parameters are zero, all messages are kicked. If
302// transport is set, the delivery attempts for the matching messages will use the
303// transport. An empty string is the default transport, i.e. direct delivery.
304// Returns number of messages queued for immediate delivery.
305func Kick(ctx context.Context, ID int64, toDomain, recipient string, transport *string) (int, error) {
306 q := bstore.QueryDB[Msg](ctx, DB)
311 q.FilterEqual("RecipientDomainStr", toDomain)
314 q.FilterFn(func(qm Msg) bool {
315 return qm.Recipient().XString(true) == recipient
318 up := map[string]any{"NextAttempt": time.Now()}
319 if transport != nil {
320 if *transport != "" {
321 _, ok := mox.Conf.Static.Transports[*transport]
323 return 0, fmt.Errorf("unknown transport %q", *transport)
326 up["Transport"] = *transport
328 n, err := q.UpdateFields(up)
330 return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
336// Drop removes messages from the queue that match all nonzero parameters.
337// If all parameters are zero, all messages are removed.
338// Returns number of messages removed.
339func Drop(ctx context.Context, ID int64, toDomain string, recipient string) (int, error) {
340 q := bstore.QueryDB[Msg](ctx, DB)
345 q.FilterEqual("RecipientDomainStr", toDomain)
348 q.FilterFn(func(qm Msg) bool {
349 return qm.Recipient().XString(true) == recipient
356 return 0, fmt.Errorf("selecting and deleting messages from queue: %v", err)
358 for _, m := range msgs {
360 if err := os.Remove(p); err != nil {
361 xlog.WithContext(ctx).Errorx("removing queue message from file system", err, mlog.Field("queuemsgid", m.ID), mlog.Field("path", p))
367type ReadReaderAtCloser interface {
372// OpenMessage opens a message present in the queue.
373func OpenMessage(ctx context.Context, id int64) (ReadReaderAtCloser, error) {
375 err := DB.Get(ctx, &qm)
379 f, err := os.Open(qm.MessagePath())
381 return nil, fmt.Errorf("open message file: %s", err)
383 r := store.FileMsgReader(qm.MsgPrefix, f)
387const maxConcurrentDeliveries = 10
389// Start opens the database by calling Init, then starts the delivery process.
390func Start(resolver dns.Resolver, done chan struct{}) error {
391 if err := Init(); err != nil {
397 // Map keys are either dns.Domain.Name()'s, or string-formatted IP addresses.
398 busyDomains := map[string]struct{}{}
400 timer := time.NewTimer(0)
404 case <-mox.Shutdown.Done():
409 case domain := <-deliveryResult:
410 delete(busyDomains, domain)
413 if len(busyDomains) >= maxConcurrentDeliveries {
417 launchWork(resolver, busyDomains)
418 timer.Reset(nextWork(mox.Shutdown, busyDomains))
424func nextWork(ctx context.Context, busyDomains map[string]struct{}) time.Duration {
425 q := bstore.QueryDB[Msg](ctx, DB)
426 if len(busyDomains) > 0 {
428 for d := range busyDomains {
429 doms = append(doms, d)
431 q.FilterNotEqual("RecipientDomainStr", doms...)
433 q.SortAsc("NextAttempt")
436 if err == bstore.ErrAbsent {
437 return 24 * time.Hour
438 } else if err != nil {
439 xlog.Errorx("finding time for next delivery attempt", err)
440 return 1 * time.Minute
442 return time.Until(qm.NextAttempt)
445func launchWork(resolver dns.Resolver, busyDomains map[string]struct{}) int {
446 q := bstore.QueryDB[Msg](mox.Shutdown, DB)
447 q.FilterLessEqual("NextAttempt", time.Now())
448 q.SortAsc("NextAttempt")
449 q.Limit(maxConcurrentDeliveries)
450 if len(busyDomains) > 0 {
452 for d := range busyDomains {
453 doms = append(doms, d)
455 q.FilterNotEqual("RecipientDomainStr", doms...)
457 msgs, err := q.List()
459 xlog.Errorx("querying for work in queue", err)
460 mox.Sleep(mox.Shutdown, 1*time.Second)
464 for _, m := range msgs {
465 busyDomains[formatIPDomain(m.RecipientDomain)] = struct{}{}
466 go deliver(resolver, m)
471// Remove message from queue in database and file system.
472func queueDelete(ctx context.Context, msgID int64) error {
473 if err := DB.Delete(ctx, &Msg{ID: msgID}); err != nil {
476 // If removing from database fails, we'll also leave the file in the file system.
478 p := mox.DataDirPath(filepath.Join("queue", store.MessagePath(msgID)))
479 if err := os.Remove(p); err != nil {
480 return fmt.Errorf("removing queue message from file system: %v", err)
486// deliver attempts to deliver a message.
487// The queue is updated, either by removing a delivered or permanently failed
488// message, or updating the time for the next attempt. A DSN may be sent.
489func deliver(resolver dns.Resolver, m Msg) {
491 qlog := xlog.WithCid(cid).Fields(mlog.Field("from", m.Sender()), mlog.Field("recipient", m.Recipient()), mlog.Field("attempts", m.Attempts), mlog.Field("msgid", m.ID))
494 deliveryResult <- formatIPDomain(m.RecipientDomain)
498 qlog.Error("deliver panic", mlog.Field("panic", x))
500 metrics.PanicInc(metrics.Queue)
504 // We register this attempt by setting last_attempt, and already next_attempt time
505 // in the future with exponential backoff. If we run into trouble delivery below,
506 // at least we won't be bothering the receiving server with our problems.
507 // Delivery attempts: immediately, 7.5m, 15m, 30m, 1h, 2h (send delayed DSN), 4h,
508 // 8h, 16h (send permanent failure DSN).
511 backoff := time.Duration(7*60+30+jitter.Intn(10)-5) * time.Second
512 for i := 0; i < m.Attempts; i++ {
513 backoff *= time.Duration(2)
518 m.NextAttempt = now.Add(backoff)
519 qup := bstore.QueryDB[Msg](mox.Shutdown, DB)
521 update := Msg{Attempts: m.Attempts, NextAttempt: m.NextAttempt, LastAttempt: m.LastAttempt}
522 if _, err := qup.UpdateNonzero(update); err != nil {
523 qlog.Errorx("storing delivery attempt", err)
527 // Find route for transport to use for delivery attempt.
528 var transport config.Transport
529 var transportName string
530 if m.Transport != "" {
532 transport, ok = mox.Conf.Static.Transports[m.Transport]
535 fail(qlog, m, backoff, false, remoteMTA, "", fmt.Sprintf("cannot find transport %q", m.Transport))
538 transportName = m.Transport
540 route := findRoute(m.Attempts-1, m)
541 transport = route.ResolvedTransport
542 transportName = route.Transport
545 if transportName != "" {
546 qlog = qlog.Fields(mlog.Field("transport", transportName))
547 qlog.Debug("delivering with transport", mlog.Field("transport", transportName))
550 var dialer contextDialer = &net.Dialer{}
551 if transport.Submissions != nil {
552 deliverSubmit(cid, qlog, resolver, dialer, m, backoff, transportName, transport.Submissions, true, 465)
553 } else if transport.Submission != nil {
554 deliverSubmit(cid, qlog, resolver, dialer, m, backoff, transportName, transport.Submission, false, 587)
555 } else if transport.SMTP != nil {
556 deliverSubmit(cid, qlog, resolver, dialer, m, backoff, transportName, transport.SMTP, false, 25)
558 ourHostname := mox.Conf.Static.HostnameDomain
559 if transport.Socks != nil {
560 socksdialer, err := proxy.SOCKS5("tcp", transport.Socks.Address, nil, &net.Dialer{})
562 fail(qlog, m, backoff, false, dsn.NameIP{}, "", fmt.Sprintf("socks dialer: %v", err))
564 } else if d, ok := socksdialer.(contextDialer); !ok {
565 fail(qlog, m, backoff, false, dsn.NameIP{}, "", "socks dialer is not a contextdialer")
570 ourHostname = transport.Socks.Hostname
572 deliverDirect(cid, qlog, resolver, dialer, ourHostname, transportName, m, backoff)
576func findRoute(attempt int, m Msg) config.Route {
577 routesAccount, routesDomain, routesGlobal := mox.Conf.Routes(m.SenderAccount, m.SenderDomain.Domain)
578 if r, ok := findRouteInList(attempt, m, routesAccount); ok {
581 if r, ok := findRouteInList(attempt, m, routesDomain); ok {
584 if r, ok := findRouteInList(attempt, m, routesGlobal); ok {
587 return config.Route{}
590func findRouteInList(attempt int, m Msg, routes []config.Route) (config.Route, bool) {
591 for _, r := range routes {
592 if routeMatch(attempt, m, r) {
596 return config.Route{}, false
599func routeMatch(attempt int, m Msg, r config.Route) bool {
600 return attempt >= r.MinimumAttempts && routeMatchDomain(r.FromDomainASCII, m.SenderDomain.Domain) && routeMatchDomain(r.ToDomainASCII, m.RecipientDomain.Domain)
603func routeMatchDomain(l []string, d dns.Domain) bool {
607 for _, e := range l {
608 if d.ASCII == e || strings.HasPrefix(e, ".") && (d.ASCII == e[1:] || strings.HasSuffix(d.ASCII, e)) {
615// dialHost dials host for delivering Msg, taking previous attempts into accounts.
616// If the previous attempt used IPv4, this attempt will use IPv6 (in case one of the IPs is in a DNSBL).
617// The second attempt for an address family we prefer the same IP as earlier, to increase our chances if remote is doing greylisting.
618// dialHost updates m with the dialed IP and m should be saved in case of failure.
619// If we have fully specified local smtp listen IPs, we set those for the outgoing
620// connection. The admin probably configured these same IPs in SPF, but others
622func dialHost(ctx context.Context, log *mlog.Log, resolver dns.Resolver, dialer contextDialer, host dns.IPDomain, port int, m *Msg) (conn net.Conn, ip net.IP, dualstack bool, rerr error) {
624 if len(host.IP) > 0 {
625 ips = []net.IP{host.IP}
627 // todo: The Go resolver automatically follows CNAMEs, which is not allowed for
629 name := host.Domain.ASCII + "."
630 ipaddrs, err := resolver.LookupIPAddr(ctx, name)
631 if err != nil || len(ipaddrs) == 0 {
632 return nil, nil, false, fmt.Errorf("looking up %q: %v", name, err)
634 var have4, have6 bool
635 for _, ipaddr := range ipaddrs {
636 ips = append(ips, ipaddr.IP)
637 if ipaddr.IP.To4() == nil {
643 dualstack = have4 && have6
644 prevIPs := m.DialedIPs[host.String()]
645 if len(prevIPs) > 0 {
646 prevIP := prevIPs[len(prevIPs)-1]
647 prevIs4 := prevIP.To4() != nil
649 for _, ip := range prevIPs {
650 is4 := ip.To4() != nil
655 preferPrev := sameFamily == 1
656 // We use stable sort so any preferred/randomized listing from DNS is kept intact.
657 sort.SliceStable(ips, func(i, j int) bool {
658 aIs4 := ips[i].To4() != nil
659 bIs4 := ips[j].To4() != nil
661 // Prefer "i" if it is not same address family.
662 return aIs4 != prevIs4
664 // Prefer "i" if it is the same as last and we should be preferring it.
665 return preferPrev && ips[i].Equal(prevIP)
667 log.Debug("ordered ips for dialing", mlog.Field("ips", ips))
671 var timeout time.Duration
672 deadline, ok := ctx.Deadline()
674 timeout = 30 * time.Second
676 timeout = time.Until(deadline) / time.Duration(len(ips))
681 for _, ip := range ips {
682 addr := net.JoinHostPort(ip.String(), fmt.Sprintf("%d", port))
683 log.Debug("dialing remote host for delivery", mlog.Field("addr", addr))
685 for _, lip := range mox.Conf.Static.SpecifiedSMTPListenIPs {
686 ipIs4 := ip.To4() != nil
687 lipIs4 := lip.To4() != nil
689 laddr = &net.TCPAddr{IP: lip}
693 conn, err := dial(ctx, dialer, timeout, addr, laddr)
695 log.Debug("connected for smtp delivery", mlog.Field("host", host), mlog.Field("addr", addr), mlog.Field("laddr", laddr))
696 if m.DialedIPs == nil {
697 m.DialedIPs = map[string][]net.IP{}
699 name := host.String()
700 m.DialedIPs[name] = append(m.DialedIPs[name], ip)
701 return conn, ip, dualstack, nil
703 log.Debugx("connection attempt for smtp delivery", err, mlog.Field("host", host), mlog.Field("addr", addr), mlog.Field("laddr", laddr))
707 return nil, lastIP, dualstack, lastErr