Skip to content

Commit

Permalink
transport: unexport transport opt field
Browse files Browse the repository at this point in the history
  • Loading branch information
IrineSistiana committed Oct 26, 2023
1 parent 963a2ff commit 74ab9cc
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 48 deletions.
27 changes: 11 additions & 16 deletions pkg/upstream/transport/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ import (
// PipelineTransport will pipeline queries as RFC 7766 6.2.1.1 suggested.
// It also can reuse udp socket. Since dns over udp is some kind of "pipeline".
type PipelineTransport struct {
PipelineOpts

m sync.Mutex // protect following fields
closed bool
conns map[*lazyDnsConn]struct{}

logger *zap.Logger // not nil
dialFunc func(ctx context.Context) (DnsConn, error)
dialTimeout time.Duration
maxLazyConnQueue int
logger *zap.Logger // not nil
}

type PipelineOpts struct {
Expand All @@ -59,14 +60,13 @@ type PipelineOpts struct {

func NewPipelineTransport(opt PipelineOpts) *PipelineTransport {
t := &PipelineTransport{
PipelineOpts: opt,
conns: make(map[*lazyDnsConn]struct{}),
}
if opt.Logger != nil {
t.logger = opt.Logger
} else {
t.logger = nopLogger
conns: make(map[*lazyDnsConn]struct{}),
}
t.dialFunc = opt.DialContext
setDefaultGZ(&t.dialTimeout, opt.DialTimeout, defaultDialTimeout)
setDefaultGZ(&t.maxLazyConnQueue, opt.MaxConcurrentQueryWhileDialing, defaultMaxLazyConnQueue)
setNonNilLogger(&t.logger, opt.Logger)

return t
}

Expand Down Expand Up @@ -108,11 +108,6 @@ func (t *PipelineTransport) Close() error {
}

func (t *PipelineTransport) getReservedExchanger() (_ ReservedExchanger, isNewConn bool, err error) {
maxConcurrentQueryWhileDialing := t.MaxConcurrentQueryWhileDialing
if maxConcurrentQueryWhileDialing <= 0 {
maxConcurrentQueryWhileDialing = defaultLazyConnMaxConcurrentQuery
}

t.m.Lock()
if t.closed {
err = ErrClosedTransport
Expand Down Expand Up @@ -141,7 +136,7 @@ func (t *PipelineTransport) getReservedExchanger() (_ ReservedExchanger, isNewCo

// Dial a new connection
if rxc == nil {
c := newLazyDnsConn(t.DialContext, t.DialTimeout, t.MaxConcurrentQueryWhileDialing, t.logger)
c := newLazyDnsConn(t.dialFunc, t.dialTimeout, t.maxLazyConnQueue, t.logger)
rxc, _ = c.ReserveNewQuery() // ignore the closed error for new lazy connection
isNewConn = true
t.conns[c] = struct{}{}
Expand Down
45 changes: 18 additions & 27 deletions pkg/upstream/transport/reuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import (

// ReuseConnTransport is for old tcp protocol. (no pipelining)
type ReuseConnTransport struct {
ReuseConnOpts

logger *zap.Logger // non-nil
ctx context.Context
ctxCancel context.CancelCauseFunc
dialFunc func(ctx context.Context) (NetConn, error)
dialTimeout time.Duration
idleTimeout time.Duration
logger *zap.Logger // non-nil
ctx context.Context
ctxCancel context.CancelCauseFunc

m sync.Mutex // protect following fields
closed bool
Expand All @@ -61,17 +62,16 @@ type ReuseConnOpts struct {
func NewReuseConnTransport(opt ReuseConnOpts) *ReuseConnTransport {
ctx, cancel := context.WithCancelCause(context.Background())
t := &ReuseConnTransport{
ctx: ctx,
ctxCancel: cancel,
ReuseConnOpts: opt,
idleConns: make(map[*reusableConn]struct{}),
conns: make(map[*reusableConn]struct{}),
}
if opt.Logger != nil {
t.logger = opt.Logger
} else {
t.logger = nopLogger
ctx: ctx,
ctxCancel: cancel,
idleConns: make(map[*reusableConn]struct{}),
conns: make(map[*reusableConn]struct{}),
}
t.dialFunc = opt.DialContext
setDefaultGZ(&t.dialTimeout, opt.DialTimeout, defaultDialTimeout)
setDefaultGZ(&t.idleTimeout, opt.IdleTimeout, defaultIdleTimeout)
setNonNilLogger(&t.logger, opt.Logger)

return t
}

Expand Down Expand Up @@ -136,25 +136,16 @@ func (t *ReuseConnTransport) wantNewConn(ctx context.Context) (*reusableConn, er

dialChan := make(chan dialRes)
go func() {
dialTimeout := t.DialTimeout
if dialTimeout <= 0 {
dialTimeout = defaultDialTimeout
}
idleTimeout := t.IdleTimeout
if idleTimeout <= 0 {
idleTimeout = defaultIdleTimeout
}

ctx, cancel := context.WithTimeout(t.ctx, dialTimeout)
ctx, cancel := context.WithTimeout(t.ctx, t.dialTimeout)
defer cancel()

var rc *reusableConn
c, err := t.DialContext(ctx)
c, err := t.dialFunc(ctx)
if err != nil {
t.logger.Check(zap.WarnLevel, "fail to dial reusable conn").Write(zap.Error(err))
}
if c != nil {
rc = newReusableConn(c, idleTimeout)
rc = newReusableConn(c, t.idleTimeout)
t.trackNewReusableConn(rc)
}

Expand Down
7 changes: 2 additions & 5 deletions pkg/upstream/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"time"

"github.com/IrineSistiana/mosdns/v5/pkg/pool"
"go.uber.org/zap"
)

var (
Expand All @@ -36,8 +35,6 @@ var (
ErrLazyConnCannotReserveQueryExchanger = errors.New("lazy connection failed to reserve query exchanger")
)

var nopLogger = zap.NewNop()

func ReleaseResp(b *[]byte) {
pool.ReleaseBuf(b)
}
Expand All @@ -55,8 +52,8 @@ const (
// something goes wrong with the connection or the server. The connection will be closed.
waitingReplyTimeout = time.Second * 10

defaultTdcMaxConcurrentQuery = 32
defaultLazyConnMaxConcurrentQuery = 16
defaultTdcMaxConcurrentQuery = 32
defaultMaxLazyConnQueue = 16
)

// One method MUST be called in ReservedExchanger.
Expand Down
20 changes: 20 additions & 0 deletions pkg/upstream/transport/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

"github.com/IrineSistiana/mosdns/v5/pkg/pool"
"github.com/miekg/dns"
"go.uber.org/zap"
"golang.org/x/exp/constraints"
)

const (
Expand Down Expand Up @@ -87,3 +89,21 @@ readAgain:
*payload = (*payload)[:n]
return payload, err
}

func setDefaultGZ[T constraints.Float | constraints.Integer](i *T, s, d T) {
if s > 0 {
*i = s
} else {
*i = d
}
}

var nopLogger = zap.NewNop()

func setNonNilLogger(i **zap.Logger, s *zap.Logger) {
if s != nil {
*i = s
} else {
*i = nopLogger
}
}

0 comments on commit 74ab9cc

Please sign in to comment.