For cached connections ignore late replies after read timeout

Signed-off-by: Santhosh Manohar <santhosh@docker.com>
This commit is contained in:
Santhosh Manohar 2016-06-13 22:56:59 -07:00
parent 0122e632e3
commit 54f694c913
1 changed files with 92 additions and 6 deletions

View File

@ -62,6 +62,21 @@ type extDNSEntry struct {
extOnce sync.Once
}
type sboxQuery struct {
sboxID string
dnsID uint16
}
type clientConnGC struct {
toDelete bool
client clientConn
}
var (
queryGCMutex sync.Mutex
queryGC map[sboxQuery]*clientConnGC
)
// resolver implements the Resolver interface
type resolver struct {
sb *sandbox
@ -79,6 +94,21 @@ type resolver struct {
func init() {
rand.Seed(time.Now().Unix())
queryGC = make(map[sboxQuery]*clientConnGC)
go func() {
ticker := time.NewTicker(1 * time.Minute)
for range ticker.C {
queryGCMutex.Lock()
for query, conn := range queryGC {
if !conn.toDelete {
conn.toDelete = true
continue
}
delete(queryGC, query)
}
queryGCMutex.Unlock()
}
}()
}
// NewResolver creates a new instance of the Resolver
@ -370,6 +400,7 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
writer = w
} else {
queryID := query.Id
extQueryLoop:
for i := 0; i < maxExtDNS; i++ {
extDNS := &r.extDNSList[i]
if extDNS.ipStr == "" {
@ -435,14 +466,26 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
log.Debugf("Send to DNS server failed, %s", err)
continue
}
for {
// If a reply comes after a read timeout it will remain in the socket buffer
// and will be read after sending next query. To ignore such stale replies
// save the query context in a GC queue when read timesout. On the next reply
// if the context is present in the GC queue its a old reply. Ignore it and
// read again
resp, err = co.ReadMsg()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
r.addQueryToGC(w, query)
}
r.forwardQueryEnd(w, query)
log.Debugf("Read from DNS server failed, %s", err)
continue extQueryLoop
}
resp, err = co.ReadMsg()
if err != nil {
r.forwardQueryEnd(w, query)
log.Debugf("Read from DNS server failed, %s", err)
continue
if !r.checkRespInGC(w, resp) {
break
}
}
// Retrieves the context for the forwarded query and returns the client connection
// to send the reply to
writer = r.forwardQueryEnd(w, resp)
@ -501,6 +544,49 @@ func (r *resolver) forwardQueryStart(w dns.ResponseWriter, msg *dns.Msg, queryID
return true
}
func (r *resolver) addQueryToGC(w dns.ResponseWriter, msg *dns.Msg) {
if w.LocalAddr().Network() != "udp" {
return
}
r.queryLock.Lock()
cc, ok := r.client[msg.Id]
r.queryLock.Unlock()
if !ok {
return
}
query := sboxQuery{
sboxID: r.sb.ID(),
dnsID: msg.Id,
}
clientGC := &clientConnGC{
client: cc,
}
queryGCMutex.Lock()
queryGC[query] = clientGC
queryGCMutex.Unlock()
}
func (r *resolver) checkRespInGC(w dns.ResponseWriter, msg *dns.Msg) bool {
if w.LocalAddr().Network() != "udp" {
return false
}
query := sboxQuery{
sboxID: r.sb.ID(),
dnsID: msg.Id,
}
queryGCMutex.Lock()
defer queryGCMutex.Unlock()
if _, ok := queryGC[query]; ok {
delete(queryGC, query)
return true
}
return false
}
func (r *resolver) forwardQueryEnd(w dns.ResponseWriter, msg *dns.Msg) dns.ResponseWriter {
var (
cc clientConn