Skip to content

Commit

Permalink
worker: add HEAD object endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Mar 5, 2024
1 parent 13351a5 commit f51fc98
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 45 deletions.
66 changes: 65 additions & 1 deletion api/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,24 @@ type (
Object *Object `json:"object,omitempty"`
}

// GetObjectResponse is the response type for the /worker/object endpoint.
// GetObjectResponse is the response type for the GET /worker/object endpoint.
GetObjectResponse struct {
Content io.ReadCloser `json:"content"`
ContentType string `json:"contentType"`
LastModified string `json:"lastModified"`
Range *DownloadRange `json:"range,omitempty"`
Size int64 `json:"size"`
Metadata ObjectUserMetadata `json:"metadata"`
// NOTE: keep HeadObjectResponse in sync with this type
}

// HeadObjectResponse is the response type for the HEAD /worker/object endpoint.
HeadObjectResponse struct {
ContentType string `json:"contentType"`
LastModified string `json:"lastModified"`
Range *DownloadRange `json:"range,omitempty"`
Size int64 `json:"size"`
Metadata ObjectUserMetadata `json:"metadata"`
}

// ObjectsDeleteRequest is the request type for the /bus/objects/list endpoint.
Expand Down Expand Up @@ -135,6 +145,46 @@ type (
}
)

func ParseObjectHeadResponseFrom(header http.Header) (HeadObjectResponse, error) {
// parse size
var size int64
_, err := fmt.Sscan(header.Get("Content-Length"), &size)
if err != nil {
return HeadObjectResponse{}, err
}

// parse range
var r *DownloadRange
if cr := header.Get("Content-Range"); cr != "" {
dr, err := ParseDownloadRange(cr)
if err != nil {
return HeadObjectResponse{}, err
}
r = &dr

// if a range is set, the size is the size extracted from the range
// since Content-Length will then only be the length of the returned
// range.
size = dr.Size
}

// parse headers
headers := make(map[string]string)
for k, v := range header {
if len(v) > 0 {
headers[k] = v[0]
}
}

return HeadObjectResponse{
ContentType: header.Get("Content-Type"),
LastModified: header.Get("Last-Modified"),
Range: r,
Size: size,
Metadata: ExtractObjectUserMetadataFrom(headers),
}, nil
}

func ExtractObjectUserMetadataFrom(metadata map[string]string) ObjectUserMetadata {
oum := make(map[string]string)
for k, v := range metadata {
Expand Down Expand Up @@ -206,6 +256,10 @@ type (
Batch bool
}

HeadObjectOptions struct {
Range DownloadRange
}

DownloadObjectOptions struct {
GetObjectOptions
Range DownloadRange
Expand Down Expand Up @@ -301,6 +355,16 @@ func (opts DeleteObjectOptions) Apply(values url.Values) {
}
}

func (opts HeadObjectOptions) ApplyHeaders(h http.Header) {
if opts.Range != (DownloadRange{}) {
if opts.Range.Length == -1 {
h.Set("Range", fmt.Sprintf("bytes=%v-", opts.Range.Offset))
} else {
h.Set("Range", fmt.Sprintf("bytes=%v-%v", opts.Range.Offset, opts.Range.Offset+opts.Range.Length-1))
}
}
}

func (opts GetObjectOptions) Apply(values url.Values) {
if opts.Prefix != "" {
values.Set("prefix", opts.Prefix)
Expand Down
35 changes: 25 additions & 10 deletions internal/test/e2e/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,42 @@ func TestObjectMetadata(t *testing.T) {
}

// upload the object
_, err := w.UploadObject(context.Background(), bytes.NewReader([]byte(t.Name())), api.DefaultBucketName, t.Name(), opts)
data := []byte(t.Name())
_, err := w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, t.Name(), opts)
if err != nil {
t.Fatal(err)
}

// get the object from the bus and assert it has the metadata
ress, err := b.Object(context.Background(), api.DefaultBucketName, t.Name(), api.GetObjectOptions{})
or, err := b.Object(context.Background(), api.DefaultBucketName, t.Name(), api.GetObjectOptions{})
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(ress.Object.Metadata, opts.Metadata) {
t.Fatal("metadata mismatch", ress.Object.Metadata)
if !reflect.DeepEqual(or.Object.Metadata, opts.Metadata) {
t.Fatal("metadata mismatch", or.Object.Metadata)
}

// get the object from the worker and assert it has the metadata
res, err := w.GetObject(context.Background(), api.DefaultBucketName, t.Name(), api.DownloadObjectOptions{})
gor, err := w.GetObject(context.Background(), api.DefaultBucketName, t.Name(), api.DownloadObjectOptions{})
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(res.Metadata, opts.Metadata) {
t.Fatal("metadata mismatch", res.Metadata)
if !reflect.DeepEqual(gor.Metadata, opts.Metadata) {
t.Fatal("metadata mismatch", gor.Metadata)
}

// perform a HEAD request and assert the headers are all present
hor, err := w.HeadObject(context.Background(), api.DefaultBucketName, t.Name(), api.HeadObjectOptions{Range: api.DownloadRange{Offset: 1, Length: 1}})
if err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(hor, &api.HeadObjectResponse{
ContentType: or.Object.ContentType(),
LastModified: or.Object.LastModified(),
Range: &api.DownloadRange{Offset: 1, Length: 1, Size: int64(len(data))},
Size: int64(len(data)),
Metadata: gor.Metadata,
}) {
t.Fatalf("unexpected response: %+v", hor)
}

// re-upload the object
Expand All @@ -63,11 +78,11 @@ func TestObjectMetadata(t *testing.T) {
}

// assert metadata was removed
res, err = w.GetObject(context.Background(), api.DefaultBucketName, t.Name(), api.DownloadObjectOptions{})
gor, err = w.GetObject(context.Background(), api.DefaultBucketName, t.Name(), api.DownloadObjectOptions{})
if err != nil {
t.Fatal(err)
}
if len(res.Metadata) > 0 {
t.Fatal("unexpected metadata", res.Metadata)
if len(gor.Metadata) > 0 {
t.Fatal("unexpected metadata", gor.Metadata)
}
}
73 changes: 43 additions & 30 deletions worker/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,49 @@ func (c *Client) DownloadStats() (resp api.DownloadStatsResponse, err error) {
return
}

// HeadObject returns the metadata of the object at the given path.
func (c *Client) HeadObject(ctx context.Context, bucket, path string, opts api.HeadObjectOptions) (*api.HeadObjectResponse, error) {
c.c.Custom("HEAD", fmt.Sprintf("/objects/%s", path), nil, nil)

if strings.HasSuffix(path, "/") {
return nil, errors.New("the given path is a directory, HEAD can only be performed on objects")
}

values := url.Values{}
values.Set("bucket", url.QueryEscape(bucket))
path += "?" + values.Encode()

// TODO: support HEAD in jape client
req, err := http.NewRequestWithContext(ctx, "HEAD", fmt.Sprintf("%s/objects/%s", c.c.BaseURL, path), nil)
if err != nil {
panic(err)
}
req.SetBasicAuth("", c.c.WithContext(ctx).Password)
opts.ApplyHeaders(req.Header)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != 200 && resp.StatusCode != 206 {
err, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
return nil, errors.New(string(err))
}

head, err := api.ParseObjectHeadResponseFrom(resp.Header)
if err != nil {
return nil, err
}
return &head, nil
}

// GetObject returns the object at given path alongside its metadata.
func (c *Client) GetObject(ctx context.Context, bucket, path string, opts api.DownloadObjectOptions) (*api.GetObjectResponse, error) {
if strings.HasSuffix(path, "/") {
return nil, errors.New("the given path is a directory, use ObjectEntries instead")
}

// Start download.
path = api.ObjectPathEscape(path)
body, header, err := c.object(ctx, bucket, path, opts)
if err != nil {
Expand All @@ -96,41 +132,18 @@ func (c *Client) GetObject(ctx context.Context, bucket, path string, opts api.Do
}
}()

// Parse header.
var size int64
_, err = fmt.Sscan(header.Get("Content-Length"), &size)
head, err := api.ParseObjectHeadResponseFrom(header)
if err != nil {
return nil, err
}
var r *api.DownloadRange
if cr := header.Get("Content-Range"); cr != "" {
dr, err := api.ParseDownloadRange(cr)
if err != nil {
return nil, err
}
r = &dr

// If a range is set, the size is the size extracted from the range
// since Content-Length will then only be the length of the returned
// range.
size = dr.Size
}

// Parse headers.
headers := make(map[string]string)
for k, v := range header {
if len(v) > 0 {
headers[k] = v[0]
}
}

return &api.GetObjectResponse{
Content: body,
ContentType: header.Get("Content-Type"),
LastModified: header.Get("Last-Modified"),
Range: r,
Size: size,
Metadata: api.ExtractObjectUserMetadataFrom(headers),
ContentType: head.ContentType,
LastModified: head.LastModified,
Range: head.Range,
Size: head.Size,
Metadata: head.Metadata,
}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions worker/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ func serveContent(rw http.ResponseWriter, req *http.Request, obj api.Object, dow
}
}()

// create a content reader
rs := newContentReader(pr, obj, offset)

// fetch the content type, if not set and we can't infer it from object's
// name we default to application/octet-stream, that is important because we
// have to avoid http.ServeContent to sniff the content type as it would
Expand All @@ -87,17 +84,20 @@ func serveContent(rw http.ResponseWriter, req *http.Request, obj api.Object, dow
if contentType == "" {
contentType = "application/octet-stream"
}
rw.Header().Set("Content-Type", contentType)

// set the response headers, no need to set Last-Modified header as
// serveContent does that for us
rw.Header().Set("ETag", api.FormatETag(obj.ETag))
rw.Header().Set("Content-Type", contentType)

// set the user metadata headers
for k, v := range obj.Metadata {
rw.Header().Set(fmt.Sprintf("%s%s", api.ObjectMetadataPrefix, k), v)
}

// create a content reader
rs := newContentReader(pr, obj, offset)

http.ServeContent(rw, req, obj.Name, obj.ModTime.Std(), rs)
return http.StatusOK, nil
}
Expand Down
41 changes: 41 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,46 @@ func (w *worker) uploadsStatsHandlerGET(jc jape.Context) {
})
}

func (w *worker) objectsHandlerHEAD(jc jape.Context) {
// parse bucket
bucket := api.DefaultBucketName
if jc.DecodeForm("bucket", &bucket) != nil {
return
}

// parse path
path := jc.PathParam("path")
if path == "" || strings.HasSuffix(path, "/") {
jc.Error(fmt.Errorf("directories are not accepted"), http.StatusBadRequest)
return
}

// fetch object metadata
res, err := w.bus.Object(jc.Request.Context(), bucket, path, api.GetObjectOptions{
OnlyMetadata: true,
})
if errors.Is(err, api.ErrObjectNotFound) {
jc.Error(err, http.StatusNotFound)
return
} else if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
} else if res.Object == nil {
jc.Error(api.ErrObjectNotFound, http.StatusInternalServerError) // should never happen but checking because we deref. later
return
}

// serve the content
status, err := serveContent(jc.ResponseWriter, jc.Request, *res.Object, func(io.Writer, int64, int64) error { return nil })
if errors.Is(err, http_range.ErrInvalid) || errors.Is(err, errMultiRangeNotSupported) {
jc.Error(err, http.StatusBadRequest)
} else if errors.Is(err, http_range.ErrNoOverlap) {
jc.Error(err, http.StatusRequestedRangeNotSatisfiable)
} else if err != nil {
jc.Error(err, status)
}
}

func (w *worker) objectsHandlerGET(jc jape.Context) {
jc.Custom(nil, []api.ObjectMetadata{})

Expand Down Expand Up @@ -1366,6 +1406,7 @@ func (w *worker) Handler() http.Handler {
"GET /stats/uploads": w.uploadsStatsHandlerGET,
"POST /slab/migrate": w.slabMigrateHandler,

"HEAD /objects/*path": w.objectsHandlerHEAD,
"GET /objects/*path": w.objectsHandlerGET,
"PUT /objects/*path": w.objectsHandlerPUT,
"DELETE /objects/*path": w.objectsHandlerDELETE,
Expand Down

0 comments on commit f51fc98

Please sign in to comment.