1// Package queue is in charge of outgoing messages, queueing them when submitted,
 
2// attempting a first delivery over SMTP, retrying with backoff and sending DSNs
 
3// for delayed or failed deliveries.
 
21	"golang.org/x/net/proxy"
 
23	"github.com/prometheus/client_golang/prometheus"
 
24	"github.com/prometheus/client_golang/prometheus/promauto"
 
26	"github.com/mjl-/bstore"
 
28	"github.com/mjl-/mox/config"
 
29	"github.com/mjl-/mox/dns"
 
30	"github.com/mjl-/mox/dsn"
 
31	"github.com/mjl-/mox/metrics"
 
32	"github.com/mjl-/mox/mlog"
 
33	"github.com/mjl-/mox/mox-"
 
34	"github.com/mjl-/mox/moxio"
 
35	"github.com/mjl-/mox/smtp"
 
36	"github.com/mjl-/mox/smtpclient"
 
37	"github.com/mjl-/mox/store"
 
38	"github.com/mjl-/mox/tlsrpt"
 
39	"github.com/mjl-/mox/tlsrptdb"
 
40	"github.com/mjl-/mox/webapi"
 
41	"github.com/mjl-/mox/webhook"
 
44// ErrFromID indicate a fromid was present when adding a message to the queue, but
 
46var ErrFromID = errors.New("fromid not unique")
 
49	metricConnection = promauto.NewCounterVec(
 
50		prometheus.CounterOpts{
 
51			Name: "mox_queue_connection_total",
 
52			Help: "Queue client connections, outgoing.",
 
55			"result", // "ok", "timeout", "canceled", "error"
 
58	metricDelivery = promauto.NewHistogramVec(
 
59		prometheus.HistogramOpts{
 
60			Name:    "mox_queue_delivery_duration_seconds",
 
61			Help:    "SMTP client delivery attempt to single host.",
 
62			Buckets: []float64{0.01, 0.05, 0.100, 0.5, 1, 5, 10, 20, 30, 60, 120},
 
65			"attempt",   // Number of attempts.
 
66			"transport", // empty for default direct delivery.
 
67			"tlsmode",   // immediate, requiredstarttls, opportunistic, skip (from smtpclient.TLSMode), with optional +mtasts and/or +dane.
 
68			"result",    // ok, timeout, canceled, temperror, permerror, error
 
71	metricHold = promauto.NewGauge(
 
73			Name: "mox_queue_hold",
 
74			Help: "Messages in queue that are on hold.",
 
79var jitter = mox.NewPseudoRand()
 
81var DBTypes = []any{Msg{}, HoldRule{}, MsgRetired{}, webapi.Suppression{}, Hook{}, HookRetired{}} // Types stored in DB.
 
82var DB *bstore.DB                                                                                 // Exported for making backups.
 
84// Allow requesting delivery starting from up to this interval from time of submission.
 
85const FutureReleaseIntervalMax = 60 * 24 * time.Hour
 
87// Set for mox localserve, to prevent queueing.
 
90// HoldRule is a set of conditions that cause a matching message to be marked as on
 
91// hold when it is queued. All-empty conditions matches all messages, effectively
 
92// pausing the entire queue.
 
96	SenderDomain       dns.Domain
 
97	RecipientDomain    dns.Domain
 
98	SenderDomainStr    string // Unicode.
 
99	RecipientDomainStr string // Unicode.
 
102func (pr HoldRule) All() bool {
 
104	return pr == HoldRule{}
 
107func (pr HoldRule) matches(m Msg) bool {
 
108	return pr.All() || pr.Account == m.SenderAccount || pr.SenderDomainStr == m.SenderDomainStr || pr.RecipientDomainStr == m.RecipientDomainStr
 
111// Msg is a message in the queue.
 
113// Use MakeMsg to make a message with fields that Add needs. Add will further set
 
114// queueing related fields.
 
118	// A message for multiple recipients will get a BaseID that is identical to the
 
119	// first Msg.ID queued. The message contents will be identical for each recipient,
 
120	// including MsgPrefix. If other properties are identical too, including recipient
 
121	// domain, multiple Msgs may be delivered in a single SMTP transaction. For
 
122	// messages with a single recipient, this field will be 0.
 
123	BaseID int64 `bstore:"index"`
 
125	Queued             time.Time      `bstore:"default now"`
 
126	Hold               bool           // If set, delivery won't be attempted.
 
127	SenderAccount      string         // Failures are delivered back to this local account. Also used for routing.
 
128	SenderLocalpart    smtp.Localpart // Should be a local user and domain.
 
129	SenderDomain       dns.IPDomain
 
130	SenderDomainStr    string         // For filtering, unicode.
 
131	FromID             string         // For transactional messages, used to match later DSNs.
 
132	RecipientLocalpart smtp.Localpart // Typically a remote user and domain.
 
133	RecipientDomain    dns.IPDomain
 
134	RecipientDomainStr string              // For filtering, unicode domain. Can also contain ip enclosed in [].
 
135	Attempts           int                 // Next attempt is based on last attempt and exponential back off based on attempts.
 
136	MaxAttempts        int                 // Max number of attempts before giving up. If 0, then the default of 8 attempts is used instead.
 
137	DialedIPs          map[string][]net.IP // For each host, the IPs that were dialed. Used for IP selection for later attempts.
 
138	NextAttempt        time.Time           // For scheduling.
 
139	LastAttempt        *time.Time
 
142	Has8bit       bool   // Whether message contains bytes with high bit set, determines whether 8BITMIME SMTP extension is needed.
 
143	SMTPUTF8      bool   // Whether message requires use of SMTPUTF8.
 
144	IsDMARCReport bool   // Delivery failures for DMARC reports are handled differently.
 
145	IsTLSReport   bool   // Delivery failures for TLS reports are handled differently.
 
146	Size          int64  // Full size of message, combined MsgPrefix with contents of message file.
 
147	MessageID     string // Message-ID header, including <>. Used when composing a DSN, in its References header.
 
148	MsgPrefix     []byte // Data to send before the contents from the file, typically with headers like DKIM-Signature.
 
149	Subject       string // For context about delivery.
 
151	// If set, this message is a DSN and this is a version using utf-8, for the case
 
152	// the remote MTA supports smtputf8. In this case, Size and MsgPrefix are not
 
156	// If non-empty, the transport to use for this message. Can be set through cli or
 
157	// admin interface. If empty (the default for a submitted message), regular routing
 
161	// RequireTLS influences TLS verification during delivery.
 
163	// If nil, the recipient domain policy is followed (MTA-STS and/or DANE), falling
 
164	// back to optional opportunistic non-verified STARTTLS.
 
166	// If RequireTLS is true (through SMTP REQUIRETLS extension or webmail submit),
 
167	// MTA-STS or DANE is required, as well as REQUIRETLS support by the next hop
 
170	// If RequireTLS is false (through messag header "TLS-Required: No"), the recipient
 
171	// domain's policy is ignored if it does not lead to a successful TLS connection,
 
172	// i.e. falling back to SMTP delivery with unverified STARTTLS or plain text.
 
176	// For DSNs, where the original FUTURERELEASE value must be included as per-message
 
177	// field. This field should be of the form "for;" plus interval, or "until;" plus
 
179	FutureReleaseRequest string
 
182	Extra map[string]string // Extra information, for transactional email.
 
185// MsgResult is the result (or work in progress) of a delivery attempt.
 
186type MsgResult struct {
 
188	Duration time.Duration
 
193	// todo: store smtp trace for failed deliveries for debugging, perhaps also for successful deliveries.
 
196// Stored in MsgResult.Error while delivery is in progress. Replaced after success/error.
 
197const resultErrorDelivering = "delivering..."
 
199// markResult updates/adds a delivery result.
 
200func (m *Msg) markResult(code int, secode string, errmsg string, success bool) {
 
201	if len(m.Results) == 0 || m.Results[len(m.Results)-1].Error != resultErrorDelivering {
 
202		m.Results = append(m.Results, MsgResult{Start: time.Now()})
 
204	result := &m.Results[len(m.Results)-1]
 
205	result.Duration = time.Since(result.Start)
 
207	result.Secode = secode
 
208	result.Error = errmsg
 
209	result.Success = false
 
212// LastResult returns the last result entry, or an empty result.
 
213func (m *Msg) LastResult() MsgResult {
 
214	if len(m.Results) == 0 {
 
215		return MsgResult{Start: time.Now()}
 
217	return m.Results[len(m.Results)-1]
 
220// Sender of message as used in MAIL FROM.
 
221func (m Msg) Sender() smtp.Path {
 
222	return smtp.Path{Localpart: m.SenderLocalpart, IPDomain: m.SenderDomain}
 
225// Recipient of message as used in RCPT TO.
 
226func (m Msg) Recipient() smtp.Path {
 
227	return smtp.Path{Localpart: m.RecipientLocalpart, IPDomain: m.RecipientDomain}
 
230// MessagePath returns the path where the message is stored.
 
231func (m Msg) MessagePath() string {
 
232	return mox.DataDirPath(filepath.Join("queue", store.MessagePath(m.ID)))
 
235// todo: store which transport (if any) was actually used in MsgResult, based on routes.
 
237// Retired returns a MsgRetired for the message, for history of deliveries.
 
238func (m Msg) Retired(success bool, t, keepUntil time.Time) MsgRetired {
 
243		SenderAccount:        m.SenderAccount,
 
244		SenderLocalpart:      m.SenderLocalpart,
 
245		SenderDomainStr:      m.SenderDomainStr,
 
247		RecipientLocalpart:   m.RecipientLocalpart,
 
248		RecipientDomain:      m.RecipientDomain,
 
249		RecipientDomainStr:   m.RecipientDomainStr,
 
250		Attempts:             m.Attempts,
 
251		MaxAttempts:          m.MaxAttempts,
 
252		DialedIPs:            m.DialedIPs,
 
253		LastAttempt:          m.LastAttempt,
 
256		SMTPUTF8:             m.SMTPUTF8,
 
257		IsDMARCReport:        m.IsDMARCReport,
 
258		IsTLSReport:          m.IsTLSReport,
 
260		MessageID:            m.MessageID,
 
262		Transport:            m.Transport,
 
263		RequireTLS:           m.RequireTLS,
 
264		FutureReleaseRequest: m.FutureReleaseRequest,
 
267		RecipientAddress: smtp.Path{Localpart: m.RecipientLocalpart, IPDomain: m.RecipientDomain}.XString(true),
 
270		KeepUntil:        keepUntil,
 
274// MsgRetired is a message for which delivery completed, either successful,
 
275// failed/canceled. Retired messages are only stored if so configured, and will be
 
276// cleaned up after the configured period.
 
277type MsgRetired struct {
 
278	ID int64 // Same ID as it was as Msg.ID.
 
282	SenderAccount      string         // Failures are delivered back to this local account. Also used for routing.
 
283	SenderLocalpart    smtp.Localpart // Should be a local user and domain.
 
284	SenderDomainStr    string         // For filtering, unicode.
 
285	FromID             string         `bstore:"index"` // Used to match DSNs.
 
286	RecipientLocalpart smtp.Localpart // Typically a remote user and domain.
 
287	RecipientDomain    dns.IPDomain
 
288	RecipientDomainStr string              // For filtering, unicode.
 
289	Attempts           int                 // Next attempt is based on last attempt and exponential back off based on attempts.
 
290	MaxAttempts        int                 // Max number of attempts before giving up. If 0, then the default of 8 attempts is used instead.
 
291	DialedIPs          map[string][]net.IP // For each host, the IPs that were dialed. Used for IP selection for later attempts.
 
292	LastAttempt        *time.Time
 
295	Has8bit       bool   // Whether message contains bytes with high bit set, determines whether 8BITMIME SMTP extension is needed.
 
296	SMTPUTF8      bool   // Whether message requires use of SMTPUTF8.
 
297	IsDMARCReport bool   // Delivery failures for DMARC reports are handled differently.
 
298	IsTLSReport   bool   // Delivery failures for TLS reports are handled differently.
 
299	Size          int64  // Full size of message, combined MsgPrefix with contents of message file.
 
300	MessageID     string // Used when composing a DSN, in its References header.
 
301	Subject       string // For context about delivery.
 
305	FutureReleaseRequest string
 
307	Extra map[string]string // Extra information, for transactional email.
 
309	LastActivity     time.Time `bstore:"index"`
 
310	RecipientAddress string    `bstore:"index RecipientAddress+LastActivity"`
 
311	Success          bool      // Whether delivery to next hop succeeded.
 
312	KeepUntil        time.Time `bstore:"index"`
 
315// Sender of message as used in MAIL FROM.
 
316func (m MsgRetired) Sender() (path smtp.Path, err error) {
 
317	path.Localpart = m.RecipientLocalpart
 
318	if strings.HasPrefix(m.SenderDomainStr, "[") && strings.HasSuffix(m.SenderDomainStr, "]") {
 
319		s := m.SenderDomainStr[1 : len(m.SenderDomainStr)-1]
 
320		path.IPDomain.IP = net.ParseIP(s)
 
321		if path.IPDomain.IP == nil {
 
322			err = fmt.Errorf("parsing ip address %q: %v", s, err)
 
325		path.IPDomain.Domain, err = dns.ParseDomain(m.SenderDomainStr)
 
330// Recipient of message as used in RCPT TO.
 
331func (m MsgRetired) Recipient() smtp.Path {
 
332	return smtp.Path{Localpart: m.RecipientLocalpart, IPDomain: m.RecipientDomain}
 
335// LastResult returns the last result entry, or an empty result.
 
336func (m MsgRetired) LastResult() MsgResult {
 
337	if len(m.Results) == 0 {
 
340	return m.Results[len(m.Results)-1]
 
343// Init opens the queue database without starting delivery.
 
345	qpath := mox.DataDirPath(filepath.FromSlash("queue/index.db"))
 
346	os.MkdirAll(filepath.Dir(qpath), 0770)
 
348	if _, err := os.Stat(qpath); err != nil && os.IsNotExist(err) {
 
353	DB, err = bstore.Open(mox.Shutdown, qpath, &bstore.Options{Timeout: 5 * time.Second, Perm: 0660}, DBTypes...)
 
355		err = DB.Read(mox.Shutdown, func(tx *bstore.Tx) error {
 
356			return metricHoldUpdate(tx)
 
363		return fmt.Errorf("open queue database: %s", err)
 
368// When we update the gauge, we just get the full current value, not try to account
 
370func metricHoldUpdate(tx *bstore.Tx) error {
 
371	count, err := bstore.QueryTx[Msg](tx).FilterNonzero(Msg{Hold: true}).Count()
 
373		return fmt.Errorf("querying messages on hold for metric: %v", err)
 
375	metricHold.Set(float64(count))
 
379// Shutdown closes the queue database. The delivery process isn't stopped. For tests only.
 
383		mlog.New("queue", nil).Errorx("closing queue db", err)
 
388// todo: the filtering & sorting can use improvements. too much duplicated code (variants between {Msg,Hook}{,Retired}. Sort has pagination fields, some untyped.
 
390// Filter filters messages to list or operate on. Used by admin web interface
 
393// Only non-empty/non-zero values are applied to the filter. Leaving all fields
 
394// empty/zero matches all messages.
 
402	Submitted   string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
 
403	NextAttempt string // ">$duration" or "<$duration", also with "now" for duration.
 
407func (f Filter) apply(q *bstore.Query[Msg]) error {
 
411	applyTime := func(field string, s string) error {
 
414		if strings.HasPrefix(s, "<") {
 
416		} else if !strings.HasPrefix(s, ">") {
 
417			return fmt.Errorf(`must start with "<" for before or ">" for after a duration`)
 
419		s = strings.TrimSpace(s[1:])
 
423		} else if d, err := time.ParseDuration(s); err != nil {
 
424			return fmt.Errorf("parsing duration %q: %v", orig, err)
 
426			t = time.Now().Add(d)
 
429			q.FilterLess(field, t)
 
431			q.FilterGreater(field, t)
 
436		q.FilterEqual("Hold", *f.Hold)
 
438	if f.Submitted != "" {
 
439		if err := applyTime("Queued", f.Submitted); err != nil {
 
440			return fmt.Errorf("applying filter for submitted: %v", err)
 
443	if f.NextAttempt != "" {
 
444		if err := applyTime("NextAttempt", f.NextAttempt); err != nil {
 
445			return fmt.Errorf("applying filter for next attempt: %v", err)
 
449		q.FilterNonzero(Msg{SenderAccount: f.Account})
 
451	if f.Transport != nil {
 
452		q.FilterEqual("Transport", *f.Transport)
 
454	if f.From != "" || f.To != "" {
 
455		q.FilterFn(func(m Msg) bool {
 
456			return f.From != "" && strings.Contains(m.Sender().XString(true), f.From) || f.To != "" && strings.Contains(m.Recipient().XString(true), f.To)
 
466	Field  string // "Queued" or "NextAttempt"/"".
 
467	LastID int64  // If > 0, we return objects beyond this, less/greater depending on Asc.
 
468	Last   any    // Value of Field for last object. Must be set iff LastID is set.
 
469	Asc    bool   // Ascending, or descending.
 
472func (s Sort) apply(q *bstore.Query[Msg]) error {
 
474	case "", "NextAttempt":
 
475		s.Field = "NextAttempt"
 
479		return fmt.Errorf("unknown sort order field %q", s.Field)
 
483		ls, ok := s.Last.(string)
 
485			return fmt.Errorf("last should be string with time, not %T %q", s.Last, s.Last)
 
487		last, err := time.Parse(time.RFC3339Nano, ls)
 
489			last, err = time.Parse(time.RFC3339, ls)
 
492			return fmt.Errorf("parsing last %q as time: %v", s.Last, err)
 
494		q.FilterNotEqual("ID", s.LastID)
 
495		var fieldEqual func(m Msg) bool
 
496		if s.Field == "NextAttempt" {
 
497			fieldEqual = func(m Msg) bool { return m.NextAttempt.Equal(last) }
 
499			fieldEqual = func(m Msg) bool { return m.Queued.Equal(last) }
 
502			q.FilterGreaterEqual(s.Field, last)
 
503			q.FilterFn(func(m Msg) bool {
 
504				return !fieldEqual(m) || m.ID > s.LastID
 
507			q.FilterLessEqual(s.Field, last)
 
508			q.FilterFn(func(m Msg) bool {
 
509				return !fieldEqual(m) || m.ID < s.LastID
 
514		q.SortAsc(s.Field, "ID")
 
516		q.SortDesc(s.Field, "ID")
 
521// List returns max 100 messages matching filter in the delivery queue.
 
522// By default, orders by next delivery attempt.
 
523func List(ctx context.Context, filter Filter, sort Sort) ([]Msg, error) {
 
524	q := bstore.QueryDB[Msg](ctx, DB)
 
525	if err := filter.apply(q); err != nil {
 
528	if err := sort.apply(q); err != nil {
 
531	qmsgs, err := q.List()
 
538// Count returns the number of messages in the delivery queue.
 
539func Count(ctx context.Context) (int, error) {
 
540	return bstore.QueryDB[Msg](ctx, DB).Count()
 
543// HoldRuleList returns all hold rules.
 
544func HoldRuleList(ctx context.Context) ([]HoldRule, error) {
 
545	return bstore.QueryDB[HoldRule](ctx, DB).List()
 
548// HoldRuleAdd adds a new hold rule causing newly submitted messages to be marked
 
549// as "on hold", and existing matching messages too.
 
550func HoldRuleAdd(ctx context.Context, log mlog.Log, hr HoldRule) (HoldRule, error) {
 
552	err := DB.Write(ctx, func(tx *bstore.Tx) error {
 
554		hr.SenderDomainStr = hr.SenderDomain.Name()
 
555		hr.RecipientDomainStr = hr.RecipientDomain.Name()
 
556		if err := tx.Insert(&hr); err != nil {
 
559		log.Info("adding hold rule", slog.Any("holdrule", hr))
 
561		q := bstore.QueryTx[Msg](tx)
 
564				SenderAccount:      hr.Account,
 
565				SenderDomainStr:    hr.SenderDomainStr,
 
566				RecipientDomainStr: hr.RecipientDomainStr,
 
570		n, err = q.UpdateField("Hold", true)
 
572			return fmt.Errorf("marking existing matching messages in queue on hold: %v", err)
 
574		return metricHoldUpdate(tx)
 
577		return HoldRule{}, err
 
579	log.Info("marked messages in queue as on hold", slog.Int("messages", n))
 
584// HoldRuleRemove removes a hold rule. The Hold field of existing messages are not
 
586func HoldRuleRemove(ctx context.Context, log mlog.Log, holdRuleID int64) error {
 
587	return DB.Write(ctx, func(tx *bstore.Tx) error {
 
588		hr := HoldRule{ID: holdRuleID}
 
589		if err := tx.Get(&hr); err != nil {
 
592		log.Info("removing hold rule", slog.Any("holdrule", hr))
 
593		return tx.Delete(HoldRule{ID: holdRuleID})
 
597// MakeMsg is a convenience function that sets the commonly used fields for a Msg.
 
598// messageID should include <>.
 
599func MakeMsg(sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool, next time.Time, subject string) Msg {
 
601		SenderLocalpart:    sender.Localpart,
 
602		SenderDomain:       sender.IPDomain,
 
603		RecipientLocalpart: recipient.Localpart,
 
604		RecipientDomain:    recipient.IPDomain,
 
608		MessageID:          messageID,
 
611		RequireTLS:         requireTLS,
 
617// Add one or more new messages to the queue. If the sender paths and MsgPrefix are
 
618// identical, they'll get the same BaseID, so they can be delivered in a single
 
619// SMTP transaction, with a single DATA command, but may be split into multiple
 
620// transactions if errors/limits are encountered. The queue is kicked immediately
 
621// to start a first delivery attempt.
 
623// ID of the messagse must be 0 and will be set after inserting in the queue.
 
625// Add sets derived fields like SenderDomainStr and RecipientDomainStr, and fields
 
626// related to queueing, such as Queued, NextAttempt.
 
627func Add(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.File, qml ...Msg) error {
 
629		return fmt.Errorf("must queue at least one message")
 
634	for i, qm := range qml {
 
636			return fmt.Errorf("id of queued messages must be 0")
 
638		// Sanity check, internal consistency.
 
639		qml[i].SenderDomainStr = formatIPDomain(qm.SenderDomain)
 
640		qml[i].RecipientDomainStr = formatIPDomain(qm.RecipientDomain)
 
641		if base && i > 0 && qm.Sender().String() != qml[0].Sender().String() || !bytes.Equal(qm.MsgPrefix, qml[0].MsgPrefix) {
 
646	tx, err := DB.Begin(ctx, true)
 
648		return fmt.Errorf("begin transaction: %w", err)
 
652			if err := tx.Rollback(); err != nil {
 
653				log.Errorx("rollback for queue", err)
 
658	// Mark messages Hold if they match a hold rule.
 
659	holdRules, err := bstore.QueryTx[HoldRule](tx).List()
 
661		return fmt.Errorf("getting queue hold rules")
 
664	// Insert messages into queue. If multiple messages are to be delivered in a single
 
665	// transaction, they all get a non-zero BaseID that is the Msg.ID of the first
 
669		// FromIDs must be unique if present. We don't have a unique index because values
 
670		// can be the empty string. We check in both Msg and MsgRetired, both are relevant
 
671		// for uniquely identifying a message sent in the past.
 
672		if fromID := qml[i].FromID; fromID != "" {
 
673			if exists, err := bstore.QueryTx[Msg](tx).FilterNonzero(Msg{FromID: fromID}).Exists(); err != nil {
 
674				return fmt.Errorf("looking up fromid: %v", err)
 
676				return fmt.Errorf("%w: fromid %q already present in message queue", ErrFromID, fromID)
 
678			if exists, err := bstore.QueryTx[MsgRetired](tx).FilterNonzero(MsgRetired{FromID: fromID}).Exists(); err != nil {
 
679				return fmt.Errorf("looking up fromid: %v", err)
 
681				return fmt.Errorf("%w: fromid %q already present in retired message queue", ErrFromID, fromID)
 
685		qml[i].SenderAccount = senderAccount
 
686		qml[i].BaseID = baseID
 
687		for _, hr := range holdRules {
 
688			if hr.matches(qml[i]) {
 
693		if err := tx.Insert(&qml[i]); err != nil {
 
696		if base && i == 0 && len(qml) > 1 {
 
698			qml[i].BaseID = baseID
 
699			if err := tx.Update(&qml[i]); err != nil {
 
707		for _, p := range paths {
 
709			log.Check(err, "removing destination message file for queue", slog.String("path", p))
 
713	for _, qm := range qml {
 
714		dst := qm.MessagePath()
 
715		paths = append(paths, dst)
 
716		dstDir := filepath.Dir(dst)
 
717		os.MkdirAll(dstDir, 0770)
 
718		if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil {
 
719			return fmt.Errorf("linking/copying message to new file: %s", err)
 
720		} else if err := moxio.SyncDir(log, dstDir); err != nil {
 
721			return fmt.Errorf("sync directory: %v", err)
 
725	for _, m := range qml {
 
727			if err := metricHoldUpdate(tx); err != nil {
 
734	if err := tx.Commit(); err != nil {
 
735		return fmt.Errorf("commit transaction: %s", err)
 
745func formatIPDomain(d dns.IPDomain) string {
 
747		return "[" + d.IP.String() + "]"
 
749	return d.Domain.Name()
 
753	msgqueue        = make(chan struct{}, 1)
 
754	deliveryResults = make(chan string, 1)
 
764	case msgqueue <- struct{}{}:
 
769// NextAttemptAdd adds a duration to the NextAttempt for all matching messages, and
 
771func NextAttemptAdd(ctx context.Context, filter Filter, d time.Duration) (affected int, err error) {
 
772	err = DB.Write(ctx, func(tx *bstore.Tx) error {
 
773		q := bstore.QueryTx[Msg](tx)
 
774		if err := filter.apply(q); err != nil {
 
777		msgs, err := q.List()
 
779			return fmt.Errorf("listing matching messages: %v", err)
 
781		for _, m := range msgs {
 
782			m.NextAttempt = m.NextAttempt.Add(d)
 
783			if err := tx.Update(&m); err != nil {
 
797// NextAttemptSet sets NextAttempt for all matching messages to a new time, and
 
799func NextAttemptSet(ctx context.Context, filter Filter, t time.Time) (affected int, err error) {
 
800	q := bstore.QueryDB[Msg](ctx, DB)
 
801	if err := filter.apply(q); err != nil {
 
804	n, err := q.UpdateNonzero(Msg{NextAttempt: t})
 
806		return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
 
812// HoldSet sets Hold for all matching messages and kicks the queue.
 
813func HoldSet(ctx context.Context, filter Filter, hold bool) (affected int, err error) {
 
814	err = DB.Write(ctx, func(tx *bstore.Tx) error {
 
815		q := bstore.QueryTx[Msg](tx)
 
816		if err := filter.apply(q); err != nil {
 
819		n, err := q.UpdateFields(map[string]any{"Hold": hold})
 
821			return fmt.Errorf("selecting and updating messages in queue: %v", err)
 
824		return metricHoldUpdate(tx)
 
833// TransportSet changes the transport to use for the matching messages.
 
834func TransportSet(ctx context.Context, filter Filter, transport string) (affected int, err error) {
 
835	q := bstore.QueryDB[Msg](ctx, DB)
 
836	if err := filter.apply(q); err != nil {
 
839	n, err := q.UpdateFields(map[string]any{"Transport": transport})
 
841		return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
 
847// Fail marks matching messages as failed for delivery, delivers a DSN to the
 
848// sender, and sends a webhook.
 
850// Returns number of messages removed, which can be non-zero even in case of an
 
852func Fail(ctx context.Context, log mlog.Log, f Filter) (affected int, err error) {
 
853	return failDrop(ctx, log, f, true)
 
856// Drop removes matching messages from the queue. Messages are added as retired
 
857// message, webhooks with the "canceled" event are queued.
 
859// Returns number of messages removed, which can be non-zero even in case of an
 
861func Drop(ctx context.Context, log mlog.Log, f Filter) (affected int, err error) {
 
862	return failDrop(ctx, log, f, false)
 
865func failDrop(ctx context.Context, log mlog.Log, filter Filter, fail bool) (affected int, err error) {
 
867	err = DB.Write(ctx, func(tx *bstore.Tx) error {
 
868		q := bstore.QueryTx[Msg](tx)
 
869		if err := filter.apply(q); err != nil {
 
875			return fmt.Errorf("getting messages to delete: %v", err)
 
883		var remoteMTA dsn.NameIP
 
884		for i := range msgs {
 
887				Error: "delivery canceled by admin",
 
889			msgs[i].Results = append(msgs[i].Results, result)
 
891				if msgs[i].LastAttempt == nil {
 
892					msgs[i].LastAttempt = &now
 
894				deliverDSNFailure(log, msgs[i], remoteMTA, "", result.Error, nil)
 
897		event := webhook.EventCanceled
 
899			event = webhook.EventFailed
 
901		if err := retireMsgs(log, tx, event, 0, "", nil, msgs...); err != nil {
 
902			return fmt.Errorf("removing queue messages from database: %w", err)
 
904		return metricHoldUpdate(tx)
 
910		if err := removeMsgsFS(log, msgs...); err != nil {
 
911			return len(msgs), fmt.Errorf("removing queue messages from file system: %w", err)
 
915	return len(msgs), nil
 
918// RequireTLSSet updates the RequireTLS field of matching messages.
 
919func RequireTLSSet(ctx context.Context, filter Filter, requireTLS *bool) (affected int, err error) {
 
920	q := bstore.QueryDB[Msg](ctx, DB)
 
921	if err := filter.apply(q); err != nil {
 
924	n, err := q.UpdateFields(map[string]any{"RequireTLS": requireTLS})
 
929// RetiredFilter filters messages to list or operate on. Used by admin web interface
 
932// Only non-empty/non-zero values are applied to the filter. Leaving all fields
 
933// empty/zero matches all messages.
 
934type RetiredFilter struct {
 
940	Submitted    string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
 
941	LastActivity string // ">$duration" or "<$duration", also with "now" for duration.
 
946func (f RetiredFilter) apply(q *bstore.Query[MsgRetired]) error {
 
950	applyTime := func(field string, s string) error {
 
953		if strings.HasPrefix(s, "<") {
 
955		} else if !strings.HasPrefix(s, ">") {
 
956			return fmt.Errorf(`must start with "<" for before or ">" for after a duration`)
 
958		s = strings.TrimSpace(s[1:])
 
962		} else if d, err := time.ParseDuration(s); err != nil {
 
963			return fmt.Errorf("parsing duration %q: %v", orig, err)
 
965			t = time.Now().Add(d)
 
968			q.FilterLess(field, t)
 
970			q.FilterGreater(field, t)
 
974	if f.Submitted != "" {
 
975		if err := applyTime("Queued", f.Submitted); err != nil {
 
976			return fmt.Errorf("applying filter for submitted: %v", err)
 
979	if f.LastActivity != "" {
 
980		if err := applyTime("LastActivity", f.LastActivity); err != nil {
 
981			return fmt.Errorf("applying filter for last activity: %v", err)
 
985		q.FilterNonzero(MsgRetired{SenderAccount: f.Account})
 
987	if f.Transport != nil {
 
988		q.FilterEqual("Transport", *f.Transport)
 
990	if f.From != "" || f.To != "" {
 
991		q.FilterFn(func(m MsgRetired) bool {
 
992			return f.From != "" && strings.Contains(m.SenderLocalpart.String()+"@"+m.SenderDomainStr, f.From) || f.To != "" && strings.Contains(m.Recipient().XString(true), f.To)
 
995	if f.Success != nil {
 
996		q.FilterEqual("Success", *f.Success)
 
1004type RetiredSort struct {
 
1005	Field  string // "Queued" or "LastActivity"/"".
 
1006	LastID int64  // If > 0, we return objects beyond this, less/greater depending on Asc.
 
1007	Last   any    // Value of Field for last object. Must be set iff LastID is set.
 
1008	Asc    bool   // Ascending, or descending.
 
1011func (s RetiredSort) apply(q *bstore.Query[MsgRetired]) error {
 
1013	case "", "LastActivity":
 
1014		s.Field = "LastActivity"
 
1018		return fmt.Errorf("unknown sort order field %q", s.Field)
 
1022		ls, ok := s.Last.(string)
 
1024			return fmt.Errorf("last should be string with time, not %T %q", s.Last, s.Last)
 
1026		last, err := time.Parse(time.RFC3339Nano, ls)
 
1028			last, err = time.Parse(time.RFC3339, ls)
 
1031			return fmt.Errorf("parsing last %q as time: %v", s.Last, err)
 
1033		q.FilterNotEqual("ID", s.LastID)
 
1034		var fieldEqual func(m MsgRetired) bool
 
1035		if s.Field == "LastActivity" {
 
1036			fieldEqual = func(m MsgRetired) bool { return m.LastActivity.Equal(last) }
 
1038			fieldEqual = func(m MsgRetired) bool { return m.Queued.Equal(last) }
 
1041			q.FilterGreaterEqual(s.Field, last)
 
1042			q.FilterFn(func(mr MsgRetired) bool {
 
1043				return !fieldEqual(mr) || mr.ID > s.LastID
 
1046			q.FilterLessEqual(s.Field, last)
 
1047			q.FilterFn(func(mr MsgRetired) bool {
 
1048				return !fieldEqual(mr) || mr.ID < s.LastID
 
1053		q.SortAsc(s.Field, "ID")
 
1055		q.SortDesc(s.Field, "ID")
 
1060// RetiredList returns retired messages.
 
1061func RetiredList(ctx context.Context, filter RetiredFilter, sort RetiredSort) ([]MsgRetired, error) {
 
1062	q := bstore.QueryDB[MsgRetired](ctx, DB)
 
1063	if err := filter.apply(q); err != nil {
 
1066	if err := sort.apply(q); err != nil {
 
1072type ReadReaderAtCloser interface {
 
1077// OpenMessage opens a message present in the queue.
 
1078func OpenMessage(ctx context.Context, id int64) (ReadReaderAtCloser, error) {
 
1080	err := DB.Get(ctx, &qm)
 
1084	f, err := os.Open(qm.MessagePath())
 
1086		return nil, fmt.Errorf("open message file: %s", err)
 
1088	r := store.FileMsgReader(qm.MsgPrefix, f)
 
1092const maxConcurrentDeliveries = 10
 
1093const maxConcurrentHookDeliveries = 10
 
1095// Start opens the database by calling Init, then starts the delivery and cleanup
 
1097func Start(resolver dns.Resolver, done chan struct{}) error {
 
1098	if err := Init(); err != nil {
 
1102	go startQueue(resolver, done)
 
1103	go startHookQueue(done)
 
1105	go cleanupMsgRetired(done)
 
1106	go cleanupHookRetired(done)
 
1111func cleanupMsgRetired(done chan struct{}) {
 
1112	log := mlog.New("queue", nil)
 
1117			log.Error("unhandled panic in cleanupMsgRetired", slog.Any("x", x))
 
1119			metrics.PanicInc(metrics.Queue)
 
1123	timer := time.NewTimer(3 * time.Second)
 
1126		case <-mox.Shutdown.Done():
 
1132		cleanupMsgRetiredSingle(log)
 
1133		timer.Reset(time.Hour)
 
1137func cleanupMsgRetiredSingle(log mlog.Log) {
 
1138	n, err := bstore.QueryDB[MsgRetired](mox.Shutdown, DB).FilterLess("KeepUntil", time.Now()).Delete()
 
1139	log.Check(err, "removing old retired messages")
 
1141		log.Debug("cleaned up retired messages", slog.Int("count", n))
 
1145func startQueue(resolver dns.Resolver, done chan struct{}) {
 
1147	log := mlog.New("queue", nil)
 
1149	// Map keys are either dns.Domain.Name()'s, or string-formatted IP addresses.
 
1150	busyDomains := map[string]struct{}{}
 
1152	timer := time.NewTimer(0)
 
1156		case <-mox.Shutdown.Done():
 
1157			for len(busyDomains) > 0 {
 
1158				domain := <-deliveryResults
 
1159				delete(busyDomains, domain)
 
1165		case domain := <-deliveryResults:
 
1166			delete(busyDomains, domain)
 
1169		if len(busyDomains) >= maxConcurrentDeliveries {
 
1173		launchWork(log, resolver, busyDomains)
 
1174		timer.Reset(nextWork(mox.Shutdown, log, busyDomains))
 
1178func nextWork(ctx context.Context, log mlog.Log, busyDomains map[string]struct{}) time.Duration {
 
1179	q := bstore.QueryDB[Msg](ctx, DB)
 
1180	if len(busyDomains) > 0 {
 
1182		for d := range busyDomains {
 
1183			doms = append(doms, d)
 
1185		q.FilterNotEqual("RecipientDomainStr", doms...)
 
1187	q.FilterEqual("Hold", false)
 
1188	q.SortAsc("NextAttempt")
 
1191	if err == bstore.ErrAbsent {
 
1192		return 24 * time.Hour
 
1193	} else if err != nil {
 
1194		log.Errorx("finding time for next delivery attempt", err)
 
1195		return 1 * time.Minute
 
1197	return time.Until(qm.NextAttempt)
 
1200func launchWork(log mlog.Log, resolver dns.Resolver, busyDomains map[string]struct{}) int {
 
1201	q := bstore.QueryDB[Msg](mox.Shutdown, DB)
 
1202	q.FilterLessEqual("NextAttempt", time.Now())
 
1203	q.FilterEqual("Hold", false)
 
1204	q.SortAsc("NextAttempt")
 
1205	q.Limit(maxConcurrentDeliveries)
 
1206	if len(busyDomains) > 0 {
 
1208		for d := range busyDomains {
 
1209			doms = append(doms, d)
 
1211		q.FilterNotEqual("RecipientDomainStr", doms...)
 
1214	seen := map[string]bool{}
 
1215	err := q.ForEach(func(m Msg) error {
 
1216		dom := m.RecipientDomainStr
 
1217		if _, ok := busyDomains[dom]; !ok && !seen[dom] {
 
1219			msgs = append(msgs, m)
 
1224		log.Errorx("querying for work in queue", err)
 
1225		mox.Sleep(mox.Shutdown, 1*time.Second)
 
1229	for _, m := range msgs {
 
1230		busyDomains[m.RecipientDomainStr] = struct{}{}
 
1231		go deliver(log, resolver, m)
 
1236// todo future: we may consider keeping message files around for a while after retiring. especially for failures to deliver. to inspect what exactly wasn't delivered.
 
1238func removeMsgsFS(log mlog.Log, msgs ...Msg) error {
 
1240	for _, m := range msgs {
 
1241		p := mox.DataDirPath(filepath.Join("queue", store.MessagePath(m.ID)))
 
1242		if err := os.Remove(p); err != nil {
 
1243			errs = append(errs, fmt.Sprintf("%s: %v", p, err))
 
1247		return fmt.Errorf("removing message files from queue: %s", strings.Join(errs, "; "))
 
1252// Move one or more messages to retire list or remove it. Webhooks are scheduled.
 
1253// IDs of msgs in suppressedMsgIDs caused a suppression to be added.
 
1255// Callers should update Msg.Results before calling.
 
1257// Callers must remove the messages from the file system afterwards, see
 
1258// removeMsgsFS. Callers must also kick the message and webhook queues.
 
1259func retireMsgs(log mlog.Log, tx *bstore.Tx, event webhook.OutgoingEvent, code int, secode string, suppressedMsgIDs []int64, msgs ...Msg) error {
 
1264	accConf, ok := mox.Conf.Account(m0.SenderAccount)
 
1266	if accConf.OutgoingWebhook != nil {
 
1267		hookURL = accConf.OutgoingWebhook.URL
 
1269	log.Debug("retiring messages from queue", slog.Any("event", event), slog.String("account", m0.SenderAccount), slog.Bool("ok", ok), slog.String("webhookurl", hookURL))
 
1270	if hookURL != "" && (len(accConf.OutgoingWebhook.Events) == 0 || slices.Contains(accConf.OutgoingWebhook.Events, string(event))) {
 
1271		for _, m := range msgs {
 
1272			suppressing := slices.Contains(suppressedMsgIDs, m.ID)
 
1273			h, err := hookCompose(m, hookURL, accConf.OutgoingWebhook.Authorization, event, suppressing, code, secode)
 
1275				log.Errorx("composing webhooks while retiring messages from queue, not queueing hook for message", err, slog.Int64("msgid", m.ID), slog.Any("recipient", m.Recipient()))
 
1277				hooks = append(hooks, h)
 
1282	msgKeep := 24 * 7 * time.Hour
 
1283	hookKeep := 24 * 7 * time.Hour
 
1285		msgKeep = accConf.KeepRetiredMessagePeriod
 
1286		hookKeep = accConf.KeepRetiredWebhookPeriod
 
1289	for _, m := range msgs {
 
1290		if err := tx.Delete(&m); err != nil {
 
1295		for _, m := range msgs {
 
1296			rm := m.Retired(event == webhook.EventDelivered, now, now.Add(msgKeep))
 
1297			if err := tx.Insert(&rm); err != nil {
 
1303	for i := range hooks {
 
1304		if err := hookInsert(tx, &hooks[i], now, hookKeep); err != nil {
 
1305			return fmt.Errorf("enqueueing webhooks while retiring messages from queue: %v", err)
 
1310		for _, h := range hooks {
 
1311			log.Debug("queued webhook while retiring message from queue", h.attrs()...)
 
1318// deliver attempts to deliver a message.
 
1319// The queue is updated, either by removing a delivered or permanently failed
 
1320// message, or updating the time for the next attempt. A DSN may be sent.
 
1321func deliver(log mlog.Log, resolver dns.Resolver, m0 Msg) {
 
1324	qlog := log.WithCid(mox.Cid()).With(
 
1325		slog.Any("from", m0.Sender()),
 
1326		slog.Int("attempts", m0.Attempts))
 
1329		deliveryResults <- formatIPDomain(m0.RecipientDomain)
 
1333			qlog.Error("deliver panic", slog.Any("panic", x), slog.Int64("msgid", m0.ID), slog.Any("recipient", m0.Recipient()))
 
1335			metrics.PanicInc(metrics.Queue)
 
1339	// We'll use a single transaction for the various checks, committing as soon as
 
1340	// we're done with it.
 
1341	xtx, err := DB.Begin(mox.Shutdown, true)
 
1343		qlog.Errorx("transaction for gathering messages to deliver", err)
 
1348			err := xtx.Rollback()
 
1349			qlog.Check(err, "rolling back transaction after error delivering")
 
1353	// We register this attempt by setting LastAttempt, adding an empty Result, and
 
1354	// already setting NextAttempt in the future with exponential backoff. If we run
 
1355	// into trouble delivery below, at least we won't be bothering the receiving server
 
1356	// with our problems.
 
1357	// Delivery attempts: immediately, 7.5m, 15m, 30m, 1h, 2h (send delayed DSN), 4h,
 
1358	// 8h, 16h (send permanent failure DSN).
 
1362	var backoff time.Duration
 
1363	var origNextAttempt time.Time
 
1364	prepare := func() error {
 
1365		// Refresh message within transaction.
 
1367		if err := xtx.Get(&m0); err != nil {
 
1368			return fmt.Errorf("get message to be delivered: %v", err)
 
1371		backoff = time.Duration(7*60+30+jitter.Intn(10)-5) * time.Second
 
1372		for i := 0; i < m0.Attempts; i++ {
 
1373			backoff *= time.Duration(2)
 
1376		origNextAttempt = m0.NextAttempt
 
1377		m0.LastAttempt = &now
 
1378		m0.NextAttempt = now.Add(backoff)
 
1379		m0.Results = append(m0.Results, MsgResult{Start: now, Error: resultErrorDelivering})
 
1380		if err := xtx.Update(&m0); err != nil {
 
1381			return fmt.Errorf("update message to be delivered: %v", err)
 
1385	if err := prepare(); err != nil {
 
1386		qlog.Errorx("storing delivery attempt", err, slog.Int64("msgid", m0.ID), slog.Any("recipient", m0.Recipient()))
 
1392	// Check if recipient is on suppression list. If so, fail delivery.
 
1393	path := smtp.Path{Localpart: m0.RecipientLocalpart, IPDomain: m0.RecipientDomain}
 
1394	baseAddr := baseAddress(path).XString(true)
 
1395	qsup := bstore.QueryTx[webapi.Suppression](xtx)
 
1396	qsup.FilterNonzero(webapi.Suppression{Account: m0.SenderAccount, BaseAddress: baseAddr})
 
1397	exists, err := qsup.Exists()
 
1398	if err != nil || exists {
 
1400			qlog.Errorx("checking whether recipient address is in suppression list", err)
 
1402			err := fmt.Errorf("not delivering to recipient address %s: %w", path.XString(true), errSuppressed)
 
1403			err = smtpclient.Error{Permanent: true, Err: err}
 
1404			failMsgsTx(qlog, xtx, []*Msg{&m0}, m0.DialedIPs, backoff, remoteMTA, err)
 
1407		qlog.Check(err, "commit processing failure to deliver messages")
 
1413	resolveTransport := func(mm Msg) (string, config.Transport, bool) {
 
1414		if mm.Transport != "" {
 
1415			transport, ok := mox.Conf.Static.Transports[mm.Transport]
 
1417				return "", config.Transport{}, false
 
1419			return mm.Transport, transport, ok
 
1421		route := findRoute(mm.Attempts, mm)
 
1422		return route.Transport, route.ResolvedTransport, true
 
1425	// Find route for transport to use for delivery attempt.
 
1427	transportName, transport, transportOK := resolveTransport(m0)
 
1430		failMsgsTx(qlog, xtx, []*Msg{&m0}, m0.DialedIPs, backoff, remoteMTA, fmt.Errorf("cannot find transport %q", m0.Transport))
 
1432		qlog.Check(err, "commit processing failure to deliver messages")
 
1438	if transportName != "" {
 
1439		qlog = qlog.With(slog.String("transport", transportName))
 
1440		qlog.Debug("delivering with transport")
 
1443	// Attempt to gather more recipients for this identical message, only with the same
 
1444	// recipient domain, and under the same conditions (recipientdomain, attempts,
 
1448		gather := func() error {
 
1449			q := bstore.QueryTx[Msg](xtx)
 
1450			q.FilterNonzero(Msg{BaseID: m0.BaseID, RecipientDomainStr: m0.RecipientDomainStr, Attempts: m0.Attempts - 1})
 
1451			q.FilterNotEqual("ID", m0.ID)
 
1452			q.FilterLessEqual("NextAttempt", origNextAttempt)
 
1453			q.FilterEqual("Hold", false)
 
1454			err := q.ForEach(func(xm Msg) error {
 
1455				mrtls := m0.RequireTLS != nil
 
1456				xmrtls := xm.RequireTLS != nil
 
1457				if mrtls != xmrtls || mrtls && *m0.RequireTLS != *xm.RequireTLS {
 
1460				tn, _, ok := resolveTransport(xm)
 
1461				if ok && tn == transportName {
 
1462					msgs = append(msgs, &xm)
 
1467				return fmt.Errorf("looking up more recipients: %v", err)
 
1470			// Mark these additional messages as attempted too.
 
1471			for _, mm := range msgs[1:] {
 
1473				mm.NextAttempt = m0.NextAttempt
 
1474				mm.LastAttempt = m0.LastAttempt
 
1475				mm.Results = append(mm.Results, MsgResult{Start: now, Error: resultErrorDelivering})
 
1476				if err := xtx.Update(mm); err != nil {
 
1477					return fmt.Errorf("updating more message recipients for smtp transaction: %v", err)
 
1482		if err := gather(); err != nil {
 
1483			qlog.Errorx("error finding more recipients for message, will attempt to send to single recipient", err)
 
1488	if err := xtx.Commit(); err != nil {
 
1489		qlog.Errorx("commit of preparation to deliver", err, slog.Any("msgid", m0.ID))
 
1495		ids := make([]int64, len(msgs))
 
1496		rcpts := make([]smtp.Path, len(msgs))
 
1497		for i, m := range msgs {
 
1499			rcpts[i] = m.Recipient()
 
1501		qlog.Debug("delivering to multiple recipients", slog.Any("msgids", ids), slog.Any("recipients", rcpts))
 
1503		qlog.Debug("delivering to single recipient", slog.Any("msgid", m0.ID), slog.Any("recipient", m0.Recipient()))
 
1507		deliverLocalserve(ctx, qlog, msgs, backoff)
 
1511	// We gather TLS connection successes and failures during delivery, and we store
 
1512	// them in tlsrptdb. Every 24 hours we send an email with a report to the recipient
 
1513	// domains that opt in via a TLSRPT DNS record.  For us, the tricky part is
 
1514	// collecting all reporting information. We've got several TLS modes
 
1515	// (opportunistic, DANE and/or MTA-STS (PKIX), overrides due to Require TLS).
 
1516	// Failures can happen at various levels: MTA-STS policies (apply to whole delivery
 
1517	// attempt/domain), MX targets (possibly multiple per delivery attempt, both for
 
1518	// MTA-STS and DANE).
 
1520	// Once the SMTP client has tried a TLS handshake, we register success/failure,
 
1521	// regardless of what happens next on the connection. We also register failures
 
1522	// when they happen before we get to the SMTP client, but only if they are related
 
1523	// to TLS (and some DNSSEC).
 
1524	var recipientDomainResult tlsrpt.Result
 
1525	var hostResults []tlsrpt.Result
 
1527		if mox.Conf.Static.NoOutgoingTLSReports || m0.RecipientDomain.IsIP() {
 
1532		dayUTC := now.UTC().Format("20060102")
 
1534		// See if this contains a failure. If not, we'll mark TLS results for delivering
 
1535		// DMARC reports SendReport false, so we won't as easily get into a report sending
 
1538		for _, result := range hostResults {
 
1539			if result.Summary.TotalFailureSessionCount > 0 {
 
1544		if recipientDomainResult.Summary.TotalFailureSessionCount > 0 {
 
1548		results := make([]tlsrptdb.TLSResult, 0, 1+len(hostResults))
 
1549		tlsaPolicyDomains := map[string]bool{}
 
1550		addResult := func(r tlsrpt.Result, isHost bool) {
 
1551			var zerotype tlsrpt.PolicyType
 
1552			if r.Policy.Type == zerotype {
 
1556			// Ensure we store policy domain in unicode in database.
 
1557			policyDomain, err := dns.ParseDomain(r.Policy.Domain)
 
1559				qlog.Errorx("parsing policy domain for tls result", err, slog.String("policydomain", r.Policy.Domain))
 
1563			if r.Policy.Type == tlsrpt.TLSA {
 
1564				tlsaPolicyDomains[policyDomain.ASCII] = true
 
1567			tlsResult := tlsrptdb.TLSResult{
 
1568				PolicyDomain:    policyDomain.Name(),
 
1570				RecipientDomain: m0.RecipientDomain.Domain.Name(),
 
1572				SendReport:      !m0.IsTLSReport && (!m0.IsDMARCReport || failure),
 
1573				Results:         []tlsrpt.Result{r},
 
1575			results = append(results, tlsResult)
 
1577		for _, result := range hostResults {
 
1578			addResult(result, true)
 
1580		// If we were delivering to a mail host directly (not a domain with MX records), we
 
1581		// are more likely to get a TLSA policy than an STS policy. Don't potentially
 
1582		// confuse operators with both a tlsa and no-policy-found result.
 
1584		if recipientDomainResult.Policy.Type != tlsrpt.NoPolicyFound || !tlsaPolicyDomains[recipientDomainResult.Policy.Domain] {
 
1585			addResult(recipientDomainResult, false)
 
1588		if len(results) > 0 {
 
1589			err := tlsrptdb.AddTLSResults(context.Background(), results)
 
1590			qlog.Check(err, "adding tls results to database for upcoming tlsrpt report")
 
1594	var dialer smtpclient.Dialer = &net.Dialer{}
 
1595	if transport.Submissions != nil {
 
1596		deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.Submissions, true, 465)
 
1597	} else if transport.Submission != nil {
 
1598		deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.Submission, false, 587)
 
1599	} else if transport.SMTP != nil {
 
1600		// todo future: perhaps also gather tlsrpt results for submissions.
 
1601		deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.SMTP, false, 25)
 
1603		ourHostname := mox.Conf.Static.HostnameDomain
 
1604		if transport.Socks != nil {
 
1605			socksdialer, err := proxy.SOCKS5("tcp", transport.Socks.Address, nil, &net.Dialer{})
 
1607				failMsgsDB(qlog, msgs, msgs[0].DialedIPs, backoff, dsn.NameIP{}, fmt.Errorf("socks dialer: %v", err))
 
1609			} else if d, ok := socksdialer.(smtpclient.Dialer); !ok {
 
1610				failMsgsDB(qlog, msgs, msgs[0].DialedIPs, backoff, dsn.NameIP{}, fmt.Errorf("socks dialer is not a contextdialer"))
 
1615			ourHostname = transport.Socks.Hostname
 
1617		recipientDomainResult, hostResults = deliverDirect(qlog, resolver, dialer, ourHostname, transportName, transport.Direct, msgs, backoff)
 
1621func findRoute(attempt int, m Msg) config.Route {
 
1622	routesAccount, routesDomain, routesGlobal := mox.Conf.Routes(m.SenderAccount, m.SenderDomain.Domain)
 
1623	if r, ok := findRouteInList(attempt, m, routesAccount); ok {
 
1626	if r, ok := findRouteInList(attempt, m, routesDomain); ok {
 
1629	if r, ok := findRouteInList(attempt, m, routesGlobal); ok {
 
1632	return config.Route{}
 
1635func findRouteInList(attempt int, m Msg, routes []config.Route) (config.Route, bool) {
 
1636	for _, r := range routes {
 
1637		if routeMatch(attempt, m, r) {
 
1641	return config.Route{}, false
 
1644func routeMatch(attempt int, m Msg, r config.Route) bool {
 
1645	return attempt >= r.MinimumAttempts && routeMatchDomain(r.FromDomainASCII, m.SenderDomain.Domain) && routeMatchDomain(r.ToDomainASCII, m.RecipientDomain.Domain)
 
1648func routeMatchDomain(l []string, d dns.Domain) bool {
 
1652	for _, e := range l {
 
1653		if d.ASCII == e || strings.HasPrefix(e, ".") && (d.ASCII == e[1:] || strings.HasSuffix(d.ASCII, e)) {
 
1660// Returns string representing delivery result for err, and number of delivered and
 
1663// Values: ok, okpartial, timeout, canceled, temperror, permerror, error.
 
1664func deliveryResult(err error, delivered, failed int) string {
 
1665	var cerr smtpclient.Error
 
1670		} else if failed > 0 {
 
1674	case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
 
1676	case errors.Is(err, context.Canceled):
 
1678	case errors.As(err, &cerr):