1package queue
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "net"
10 "os"
11 "time"
12
13 "github.com/mjl-/mox/config"
14 "github.com/mjl-/mox/dns"
15 "github.com/mjl-/mox/dsn"
16 "github.com/mjl-/mox/mlog"
17 "github.com/mjl-/mox/mox-"
18 "github.com/mjl-/mox/sasl"
19 "github.com/mjl-/mox/smtp"
20 "github.com/mjl-/mox/smtpclient"
21 "github.com/mjl-/mox/store"
22)
23
24// todo: reuse connection? do fewer concurrently (other than with direct delivery).
25
26// deliver via another SMTP server, e.g. relaying to a smart host, possibly
27// with authentication (submission).
28func deliverSubmit(cid int64, qlog *mlog.Log, resolver dns.Resolver, dialer smtpclient.Dialer, m Msg, backoff time.Duration, transportName string, transport *config.TransportSMTP, dialTLS bool, defaultPort int) {
29 // todo: configurable timeouts
30
31 port := transport.Port
32 if port == 0 {
33 port = defaultPort
34 }
35
36 tlsMode := smtpclient.TLSRequiredStartTLS
37 tlsPKIX := true
38 if dialTLS {
39 tlsMode = smtpclient.TLSImmediate
40 } else if transport.STARTTLSInsecureSkipVerify {
41 tlsMode = smtpclient.TLSOpportunistic
42 tlsPKIX = false
43 } else if transport.NoSTARTTLS {
44 tlsMode = smtpclient.TLSSkip
45 tlsPKIX = false
46 }
47 start := time.Now()
48 var deliveryResult string
49 var permanent bool
50 var secodeOpt string
51 var errmsg string
52 var success bool
53 defer func() {
54 metricDelivery.WithLabelValues(fmt.Sprintf("%d", m.Attempts), transportName, string(tlsMode), deliveryResult).Observe(float64(time.Since(start)) / float64(time.Second))
55 qlog.Debug("queue deliversubmit result", mlog.Field("host", transport.DNSHost), mlog.Field("port", port), mlog.Field("attempt", m.Attempts), mlog.Field("permanent", permanent), mlog.Field("secodeopt", secodeOpt), mlog.Field("errmsg", errmsg), mlog.Field("ok", success), mlog.Field("duration", time.Since(start)))
56 }()
57
58 // todo: SMTP-DANE should be used when relaying on port 25.
59 // ../rfc/7672:1261
60
61 // todo: for submission, understand SRV records, and even DANE.
62
63 // If submit was done with REQUIRETLS extension for SMTP, we must verify TLS
64 // certificates. If our submission connection is not configured that way, abort.
65 requireTLS := m.RequireTLS != nil && *m.RequireTLS
66 if requireTLS && (tlsMode != smtpclient.TLSRequiredStartTLS && tlsMode != smtpclient.TLSImmediate || !tlsPKIX) {
67 errmsg = fmt.Sprintf("transport %s: message requires verified tls but transport does not verify tls", transportName)
68 fail(qlog, m, backoff, true, dsn.NameIP{}, smtp.SePol7MissingReqTLS, errmsg)
69 return
70 }
71
72 dialctx, dialcancel := context.WithTimeout(context.Background(), 30*time.Second)
73 defer dialcancel()
74 if m.DialedIPs == nil {
75 m.DialedIPs = map[string][]net.IP{}
76 }
77 _, _, _, ips, _, err := smtpclient.GatherIPs(dialctx, qlog, resolver, dns.IPDomain{Domain: transport.DNSHost}, m.DialedIPs)
78 var conn net.Conn
79 if err == nil {
80 if m.DialedIPs == nil {
81 m.DialedIPs = map[string][]net.IP{}
82 }
83 conn, _, err = smtpclient.Dial(dialctx, qlog, dialer, dns.IPDomain{Domain: transport.DNSHost}, ips, port, m.DialedIPs)
84 }
85 addr := net.JoinHostPort(transport.Host, fmt.Sprintf("%d", port))
86 var result string
87 switch {
88 case err == nil:
89 result = "ok"
90 case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
91 result = "timeout"
92 case errors.Is(err, context.Canceled):
93 result = "canceled"
94 default:
95 result = "error"
96 }
97 metricConnection.WithLabelValues(result).Inc()
98 if err != nil {
99 if conn != nil {
100 err := conn.Close()
101 qlog.Check(err, "closing connection")
102 }
103 qlog.Errorx("dialing for submission", err, mlog.Field("remote", addr))
104 errmsg = fmt.Sprintf("transport %s: dialing %s for submission: %v", transportName, addr, err)
105 fail(qlog, m, backoff, false, dsn.NameIP{}, "", errmsg)
106 return
107 }
108 dialcancel()
109
110 var auth []sasl.Client
111 if transport.Auth != nil {
112 a := transport.Auth
113 for _, mech := range a.EffectiveMechanisms {
114 switch mech {
115 case "PLAIN":
116 auth = append(auth, sasl.NewClientPlain(a.Username, a.Password))
117 case "CRAM-MD5":
118 auth = append(auth, sasl.NewClientCRAMMD5(a.Username, a.Password))
119 case "SCRAM-SHA-1":
120 auth = append(auth, sasl.NewClientSCRAMSHA1(a.Username, a.Password))
121 case "SCRAM-SHA-256":
122 auth = append(auth, sasl.NewClientSCRAMSHA256(a.Username, a.Password))
123 default:
124 // Should not happen.
125 qlog.Error("missing smtp authentication mechanisms implementation", mlog.Field("mechanism", mech))
126 errmsg = fmt.Sprintf("transport %s: authentication mechanisms %q not implemented", transportName, mech)
127 fail(qlog, m, backoff, false, dsn.NameIP{}, "", errmsg)
128 return
129 }
130 }
131 }
132 clientctx, clientcancel := context.WithTimeout(context.Background(), 60*time.Second)
133 defer clientcancel()
134 opts := smtpclient.Opts{
135 Auth: auth,
136 RootCAs: mox.Conf.Static.TLS.CertPool,
137 }
138 client, err := smtpclient.New(clientctx, qlog, conn, tlsMode, tlsPKIX, mox.Conf.Static.HostnameDomain, transport.DNSHost, opts)
139 if err != nil {
140 smtperr, ok := err.(smtpclient.Error)
141 var remoteMTA dsn.NameIP
142 if ok {
143 remoteMTA.Name = transport.Host
144 }
145 qlog.Errorx("establishing smtp session for submission", err, mlog.Field("remote", addr))
146 errmsg = fmt.Sprintf("transport %s: establishing smtp session with %s for submission: %v", transportName, addr, err)
147 secodeOpt = smtperr.Secode
148 fail(qlog, m, backoff, false, remoteMTA, secodeOpt, errmsg)
149 return
150 }
151 defer func() {
152 err := client.Close()
153 qlog.Check(err, "closing smtp client after delivery")
154 }()
155 clientcancel()
156
157 var msgr io.ReadCloser
158 var size int64
159 var req8bit, reqsmtputf8 bool
160 if len(m.DSNUTF8) > 0 && client.SupportsSMTPUTF8() {
161 msgr = io.NopCloser(bytes.NewReader(m.DSNUTF8))
162 reqsmtputf8 = true
163 size = int64(len(m.DSNUTF8))
164 } else {
165 req8bit = m.Has8bit // todo: not require this, but just try to submit?
166 size = m.Size
167
168 p := m.MessagePath()
169 f, err := os.Open(p)
170 if err != nil {
171 qlog.Errorx("opening message for delivery", err, mlog.Field("remote", addr), mlog.Field("path", p))
172 errmsg = fmt.Sprintf("transport %s: opening message file for submission: %v", transportName, err)
173 fail(qlog, m, backoff, false, dsn.NameIP{}, "", errmsg)
174 return
175 }
176 msgr = store.FileMsgReader(m.MsgPrefix, f)
177 defer func() {
178 err := msgr.Close()
179 qlog.Check(err, "closing message after delivery attempt")
180 }()
181 }
182
183 deliverctx, delivercancel := context.WithTimeout(context.Background(), time.Duration(60+size/(1024*1024))*time.Second)
184 defer delivercancel()
185 err = client.Deliver(deliverctx, m.Sender().String(), m.Recipient().String(), size, msgr, req8bit, reqsmtputf8, requireTLS)
186 if err != nil {
187 qlog.Infox("delivery failed", err)
188 }
189 var cerr smtpclient.Error
190 switch {
191 case err == nil:
192 deliveryResult = "ok"
193 success = true
194 case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
195 deliveryResult = "timeout"
196 case errors.Is(err, context.Canceled):
197 deliveryResult = "canceled"
198 case errors.As(err, &cerr):
199 deliveryResult = "temperror"
200 if cerr.Permanent {
201 deliveryResult = "permerror"
202 }
203 default:
204 deliveryResult = "error"
205 }
206 if err != nil {
207 smtperr, ok := err.(smtpclient.Error)
208 var remoteMTA dsn.NameIP
209 if ok {
210 remoteMTA.Name = transport.Host
211 }
212 qlog.Errorx("submitting email", err, mlog.Field("remote", addr))
213 permanent = smtperr.Permanent
214 secodeOpt = smtperr.Secode
215 errmsg = fmt.Sprintf("transport %s: submitting email to %s: %v", transportName, addr, err)
216 fail(qlog, m, backoff, permanent, remoteMTA, secodeOpt, errmsg)
217 return
218 }
219 qlog.Info("delivered from queue with transport")
220 if err := queueDelete(context.Background(), m.ID); err != nil {
221 qlog.Errorx("deleting message from queue after delivery", err)
222 }
223}
224