forked from google/goblet
-
Notifications
You must be signed in to change notification settings - Fork 2
/
goblet.go
201 lines (163 loc) · 6.74 KB
/
goblet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
// Copyright 2021 Canva Inc
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package goblet
import (
"io"
"log"
"net/http"
"net/url"
"os"
"path"
"strings"
"time"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"golang.org/x/oauth2"
)
var (
// CommandTypeKey indicates a command type ("ls-refs", "fetch",
// "not-a-command").
CommandTypeKey = tag.MustNewKey("github.com/google/goblet/command-type")
// CommandCacheStateKey indicates whether the command response is cached
// or not ("locally-served", "queried-upstream").
CommandCacheStateKey = tag.MustNewKey("github.com/google/goblet/command-cache-state")
// CommandCanonicalStatusKey indicates whether the command is succeeded
// or not ("OK", "Unauthenticated").
CommandCanonicalStatusKey = tag.MustNewKey("github.com/google/goblet/command-status")
// InboundCommandProcessingTime is a processing time of the inbound
// commands.
InboundCommandProcessingTime = stats.Int64("github.com/google/goblet/inbound-command-processing-time", "processing time of inbound commands", stats.UnitMilliseconds)
// OutboundCommandProcessingTime is a processing time of the outbound
// commands.
OutboundCommandProcessingTime = stats.Int64("github.com/google/goblet/outbound-command-processing-time", "processing time of outbound commands", stats.UnitMilliseconds)
// UpstreamFetchWaitingTime is a duration that a fetch request waited
// for the upstream.
UpstreamFetchWaitingTime = stats.Int64("github.com/google/goblet/upstream-fetch-waiting-time", "waiting time of upstream fetch command", stats.UnitMilliseconds)
// InboundCommandCount is a count of inbound commands.
InboundCommandCount = stats.Int64("github.com/google/goblet/inbound-command-count", "number of inbound commands", stats.UnitDimensionless)
// OutboundCommandCount is a count of outbound commands.
OutboundCommandCount = stats.Int64("github.com/google/goblet/outbound-command-count", "number of outbound commands", stats.UnitDimensionless)
)
type ServerConfig struct {
LocalDiskCacheRoot string
URLCanonicalizer func(*url.URL) (*url.URL, error)
RequestAuthorizer func(*http.Request) error
TokenSource oauth2.TokenSource
ErrorReporter func(*http.Request, error)
RequestLogger func(r *http.Request, status int, requestSize, responseSize int64, latency time.Duration)
LongRunningOperationLogger func(string, *url.URL) RunningOperation
PackObjectsHook string
PackObjectsCache string
}
type RunningOperation interface {
Printf(format string, a ...interface{})
Done(error)
}
type ManagedRepository interface {
UpstreamURL() *url.URL
LastUpdateTime() time.Time
RecoverFromBundle(string) error
WriteBundle(io.Writer) error
}
func HTTPHandler(config *ServerConfig) http.Handler {
return &httpProxyServer{config}
}
// RunEvery schedules a given function to be executed on a duty cycle. A cancellation function is
// returned to prevent any future executions. In-flight execution cancellations are delegated to
// callers.
func RunEvery(delay time.Duration, f func(t time.Time)) func() {
stop := make(chan bool)
go func() {
for {
select {
case t := <-time.After(delay):
f(t)
case <-stop:
return
}
}
}()
return func() { stop <- true }
}
func OpenManagedRepository(config *ServerConfig, u *url.URL) (ManagedRepository, error) {
return openManagedRepository(config, u)
}
func FetchManagedRepositoryAsync(config *ServerConfig, u *url.URL, mustFetch bool, errorChan chan<- error) {
repo, err := openManagedRepository(config, u)
if err != nil {
errorChan <- err
return
}
if !mustFetch {
pendingFetches := repo.fetchUpstreamPool.WaitingTasks()
if pendingFetches > 0 {
log.Printf("FetchManagedRepository skipped since there are %d pending fetches (%s)\n", pendingFetches, repo.localDiskPath)
errorChan <- nil
return
}
elapsedSinceLastUpdate := time.Since(repo.LastUpdateTime())
if elapsedSinceLastUpdate < 15*time.Minute {
log.Printf("FetchManagedRepository skipped since repo was updated %s ago (%s)\n", elapsedSinceLastUpdate, repo.localDiskPath)
errorChan <- nil
return
}
}
fetchStartTime := time.Now()
repo.fetchUpstreamPool.Submit(func() {
logElapsed("FetchManagedRepository queuing", fetchStartTime, time.Minute, repo.localDiskPath)
if mustFetch {
log.Printf("FetchManagedRepository required since mustFetch is set (%s)\n", repo.localDiskPath)
StatsdClient.Incr("goblet.operation.count", []string{"dir:" + repo.localDiskPath, "op:background_fetch", "must:1"}, 1)
errorChan <- repo.fetchUpstream(nil)
} else {
// check again when the task is picked up
elapsedSinceLastUpdate := time.Since(repo.LastUpdateTime())
if elapsedSinceLastUpdate < 15*time.Minute {
log.Printf("FetchManagedRepository skipped since repo was updated %s ago (%s)\n", elapsedSinceLastUpdate, repo.localDiskPath)
errorChan <- nil
} else {
log.Printf("FetchManagedRepository required since repo was not updated for %s (%s)\n", elapsedSinceLastUpdate, repo.localDiskPath)
StatsdClient.Incr("goblet.operation.count", []string{"dir:" + repo.localDiskPath, "op:background_fetch", "must:0"}, 1)
errorChan <- repo.fetchUpstream(nil)
}
}
// log gc.log content if any
if content, err := os.ReadFile(path.Join(repo.localDiskPath, "gc.log")); err == nil {
log.Printf("Found git gc log file (content:%s, dir:%s)\n", strings.ReplaceAll(string(content), "\n", " "), repo.localDiskPath)
}
})
}
// DefaultURLCanonicalizer is a URLCanonicalizer implementation that agnostic to any Git hosting provider.
func DefaultURLCanonicalizer(u *url.URL) (*url.URL, error) {
ret := url.URL{}
ret.Scheme = "https"
ret.Host = strings.ToLower(u.Host)
ret.Path = u.Path
// Git endpoint suffixes.
if strings.HasSuffix(ret.Path, "/info/refs") {
ret.Path = strings.TrimSuffix(ret.Path, "/info/refs")
} else if strings.HasSuffix(ret.Path, "/git-upload-pack") {
ret.Path = strings.TrimSuffix(ret.Path, "/git-upload-pack")
} else if strings.HasSuffix(ret.Path, "/git-receive-pack") {
ret.Path = strings.TrimSuffix(ret.Path, "/git-receive-pack")
}
ret.Path = strings.TrimSuffix(ret.Path, ".git")
return &ret, nil
}
// NoOpRequestAuthorizer is a request authorizer that always succeeds and checks nothing
// about the incoming request.
func NoOpRequestAuthorizer(request *http.Request) error {
return nil
}