Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Add mode to read consolidated ZARR datasets #2992

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion libdispatch/ds3util.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,13 @@ NC_s3urlprocess(NCURI* url, NCS3INFO* s3, NCURI** newurlp)

/* Rebuild the URL to path format and get a usable region and optional bucket*/
if((stat = NC_s3urlrebuild(url,s3,&url2))) goto done;
s3->host = strdup(url2->host);
if(url2->port){
s3->host = strndup(url2->host,strlen(url2->host) + 1 + strlen(url2->port) +1);
s3->host = strcat(s3->host, ":");
s3->host = strcat(s3->host, url2->port);
}else{
s3->host = strdup(url2->host);
}
/* construct the rootkey minus the leading bucket */
pathsegments = nclistnew();
if((stat = NC_split_delim(url2->path,'/',pathsegments))) goto done;
Expand Down
3 changes: 2 additions & 1 deletion libnczarr/zarr.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ ncz_open_dataset(NC_FILE_INFO_T* file, NClist* controls)
if (!(file->format_file_info = calloc(1, sizeof(NCZ_FILE_INFO_T))))
{stat = NC_ENOMEM; goto done;}
zinfo = file->format_file_info;

zinfo->consolidated = NULL;
/* Fill in NCZ_FILE_INFO_T */
zinfo->creating = 0;
zinfo->common.file = file;
Expand Down Expand Up @@ -279,6 +279,7 @@ applycontrols(NCZ_FILE_INFO_T* zinfo)
else if(strcasecmp(p,"zip")==0) zinfo->controls.mapimpl = NCZM_ZIP;
else if(strcasecmp(p,"file")==0) zinfo->controls.mapimpl = NCZM_FILE;
else if(strcasecmp(p,"s3")==0) zinfo->controls.mapimpl = NCZM_S3;
else if(strcasecmp(p,"consolidated")==0) zinfo->controls.flags |= FLAG_CONSOLIDATED;
}
/* Apply negative controls by turning off negative flags */
/* This is necessary to avoid order dependence of mode flags when both positive and negative flags are defined */
Expand Down
1 change: 1 addition & 0 deletions libnczarr/zclose.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ ncz_close_file(NC_FILE_INFO_T* file, int abort)
if((stat = nczmap_close(zinfo->map,(abort && zinfo->creating)?1:0)))
goto done;
nclistfreeall(zinfo->controllist);
NCJreclaim(zinfo->consolidated);
NC_authfree(zinfo->auth);
nullfree(zinfo);

Expand Down
27 changes: 16 additions & 11 deletions libnczarr/zinternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
# define NC_MAX_PATH 4096
# endif
#endif

#define ZMETAROOT "/.zgroup"
#define ZMETAATTR "/.zattrs"
#define ZGROUP ".zgroup"
#define ZATTRS ".zattrs"
#define ZARRAY ".zarray"
#define ZMETAROOT "/.zgroup"
#define ZMETAGROUP "/.zgroup"
#define ZMETAATTR "/.zattrs"
#define ZMETAARRAY "/.zarray"
#define ZMETADATA ".zmetadata"
#define ZGROUP ".zgroup"
#define ZATTRS ".zattrs"
#define ZARRAY ".zarray"

/* V2 Reserved Attributes */
/*
Expand Down Expand Up @@ -101,6 +103,7 @@ Inserted into any .zattrs
#define ncidforx(file,grpid) ((file)->controller->ext_ncid | (grpid))
#define ncidfor(var) ncidforx((var)->container->nc4_info,(var)->container->hdr.id)

#define isconsolidaded(var) ( (var)->controls.flags & (FLAG_CONSOLIDATED) )
/**************************************************/
/* Forward */

Expand Down Expand Up @@ -134,13 +137,15 @@ typedef struct NCZ_FILE_INFO {
int creating; /* 1=> created 0=>open */
int native_endianness; /* NC_ENDIAN_LITTLE | NC_ENDIAN_BIG */
NClist* controllist; /* Envv format */
NCjson * consolidated;
struct Controls {
size64_t flags;
# define FLAG_PUREZARR 1
# define FLAG_SHOWFETCH 2
# define FLAG_LOGGING 4
# define FLAG_XARRAYDIMS 8
# define FLAG_NCZARR_KEY 16 /* _nczarr_xxx keys are stored in object and not in _nczarr_attrs */
# define FLAG_PUREZARR 1
# define FLAG_SHOWFETCH 2
# define FLAG_LOGGING 4
# define FLAG_XARRAYDIMS 8
# define FLAG_NCZARR_KEY 16 /* _nczarr_xxx keys are stored in object and not in _nczarr_attrs */
# define FLAG_CONSOLIDATED 32
NCZM_IMPL mapimpl;
} controls;
int default_maxstrlen; /* default max str size for variables of type string */
Expand Down
28 changes: 18 additions & 10 deletions libnczarr/zmap_s3sdk.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ zs3create(const char *path, int mode, size64_t flags, void* parameters, NCZMAP**
/* The problem with open is that there
no obvious way to test for existence.
So, we assume that the dataset must have
some content. We look for that */
some content. We look for that ....
UNLESS flags has CONSOLIDATED,
this means that we try to access
a single, unified metadata file (.zmetadata)
*/
static int
zs3open(const char *path, int mode, size64_t flags, void* parameters, NCZMAP** mapp)
{
Expand Down Expand Up @@ -221,15 +225,19 @@ zs3open(const char *path, int mode, size64_t flags, void* parameters, NCZMAP** m
{stat = NC_EURL; goto done;}

z3map->s3client = NC_s3sdkcreateclient(&z3map->s3);

/* Search the root for content */
content = nclistnew();
if((stat = NC_s3sdkgetkeys(z3map->s3client,z3map->s3.bucket,z3map->s3.rootkey,&nkeys,NULL,&z3map->errmsg)))
goto done;
if(nkeys == 0) {
/* dataset does not actually exist; we choose to return ENOOBJECT instead of EEMPTY */
stat = NC_ENOOBJECT;
goto done;

if (flags & FLAG_CONSOLIDATED == 0){
/* Search the root for content */
content = nclistnew();
if((stat = NC_s3sdkgetkeys(z3map->s3client,z3map->s3.bucket,z3map->s3.rootkey,&nkeys,NULL,&z3map->errmsg)))
goto done;
if(nkeys == 0) {
/* dataset does not actually exist; we choose to return ENOOBJECT instead of EEMPTY */
stat = NC_ENOOBJECT;
goto done;
}
}else {
// Lazy open, we don't need to make use at this stage that we have something usefull on the path, we only implement reads...
}
if(mapp) *mapp = (NCZMAP*)z3map;

Expand Down
169 changes: 128 additions & 41 deletions libnczarr/zsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ static int insert_attr(NCjson* jatts, NCjson* jtypes, const char* aname, NCjson*
static int insert_nczarr_attr(NCjson* jatts, NCjson* jtypes);
static int upload_attrs(NC_FILE_INFO_T* file, NC_OBJ* container, NCjson* jatts);
static int getnczarrkey(NC_OBJ* container, const char* name, const NCjson** jncxxxp);
static int downloadzarrobj(NC_FILE_INFO_T*, struct ZARROBJ* zobj, const char* fullpath, const char* objname);
static int downloadconsolidated(NC_FILE_INFO_T*);
static int get_zarrobj(NC_FILE_INFO_T*, struct ZARROBJ* zobj, const char* fullpath, const char* objname);
static int dictgetalt(const NCjson* jdict, const char* name, const char* alt, const NCjson** jvaluep);

/**************************************************/
Expand Down Expand Up @@ -1142,7 +1143,7 @@ define_grp(NC_FILE_INFO_T* file, NC_GRP_INFO_T* grp)
if((stat = NCZ_grpkey(grp,&fullpath))) goto done;

/* Download .zgroup and .zattrs */
if((stat = downloadzarrobj(file,&zgrp->zgroup,fullpath,ZGROUP))) goto done;
if((stat = get_zarrobj(file,&zgrp->zgroup,fullpath,ZGROUP))) goto done;
jgroup = zgrp->zgroup.obj;
jattrs = zgrp->zgroup.atts;

Expand Down Expand Up @@ -1448,7 +1449,7 @@ define_var1(NC_FILE_INFO_T* file, NC_GRP_INFO_T* grp, const char* varname)
goto done;

/* Download */
if((stat = downloadzarrobj(file,&zvar->zarray,varpath,ZARRAY))) goto done;
if((stat = get_zarrobj(file,&zvar->zarray,varpath,ZARRAY))) goto done;
jvar = zvar->zarray.obj;
jatts = zvar->zarray.atts;
assert(jvar == NULL || NCJsort(jvar) == NCJ_DICT);
Expand Down Expand Up @@ -1818,8 +1819,9 @@ ncz_read_superblock(NC_FILE_INFO_T* file, char** nczarrvp, char** zarrfp)
/* Construct grp key */
if((stat = NCZ_grpkey(root,&fullpath))) goto done;

int valid_consolidated = downloadconsolidated(file); //init zinfo->consolidated;
/* Download the root group .zgroup and associated .zattrs */
if((stat = downloadzarrobj(file, &zroot->zgroup, fullpath, ZGROUP))) goto done;
if((stat = get_zarrobj(file, &zroot->zgroup, fullpath, ZGROUP))) goto done;
jzgroup = zroot->zgroup.obj;

/* Look for superblock; first in .zattrs and then in .zgroup */
Expand All @@ -1834,7 +1836,7 @@ ncz_read_superblock(NC_FILE_INFO_T* file, char** nczarrvp, char** zarrfp)
file->no_write = 1;
}

if(jsuper == NULL) {
if(jsuper == NULL && valid_consolidated) {
/* See if this is looks like a NCZarr/Zarr dataset at all
by looking for anything here of the form ".z*" */
if((stat = ncz_validate(file))) goto done;
Expand Down Expand Up @@ -1979,19 +1981,45 @@ searchvars(NCZ_FILE_INFO_T* zfile, NC_GRP_INFO_T* grp, NClist* varnames)
/* Compute the key for the grp */
if((stat = NCZ_grpkey(grp,&grpkey))) goto done;
/* Get the map and search group */
if((stat = nczmap_search(zfile->map,grpkey,matches))) goto done;
for(i=0;i<nclistlength(matches);i++) {
const char* name = nclistget(matches,i);
if(name[0] == NCZM_DOT) continue; /* zarr/nczarr specific */
/* See if name/.zarray exists */
if((stat = nczm_concat(grpkey,name,&varkey))) goto done;
if((stat = nczm_concat(varkey,ZARRAY,&zarray))) goto done;
if((stat = nczmap_exists(zfile->map,zarray)) == NC_NOERR)
nclistpush(varnames,strdup(name));
stat = NC_NOERR;
nullfree(varkey); varkey = NULL;
nullfree(zarray); zarray = NULL;
}
if(zfile->consolidated) {
const char * group = grpkey + (grpkey[0] == '/');
size_t lgroup = strlen(group);

const NCjson * jmetadata = NULL;
NCJdictget(zfile->consolidated,"metadata", &jmetadata);
for(i=0;i<NCJlength(jmetadata);i+=2){
NCjson* jname = NCJith(jmetadata,i);
const char * fullname = NCJstring(jname);
size_t lfullname = strlen(fullname);
if(lfullname < lgroup || \
strncmp(fullname,group,lgroup) || \
(lgroup > 0 && fullname[lgroup] != NCZM_SEP[0]) ) {
continue;
}
char * start = fullname + lgroup + (lgroup>0);
char * end = strchr(start,NCZM_SEP[0]);
if(end == NULL) continue;
size_t lname = end - start;
//Ends with ".zarray"
if(strncmp(ZMETAARRAY,end,sizeof(ZMETAARRAY)) == 0){
nclistpush(varnames,strndup(start,lname));
}
}
}else{
if((stat = nczmap_search(zfile->map,grpkey,matches))) goto done;
for(i=0;i<nclistlength(matches);i++) {
const char* name = nclistget(matches,i);
if(name[0] == NCZM_DOT) continue; /* zarr/nczarr specific */
/* See if name/.zarray exists */
if((stat = nczm_concat(grpkey,name,&varkey))) goto done;
if((stat = nczm_concat(varkey,ZARRAY,&zarray))) goto done;
if((stat = nczmap_exists(zfile->map,zarray)) == NC_NOERR)
nclistpush(varnames,strdup(name));
stat = NC_NOERR;
nullfree(varkey); varkey = NULL;
nullfree(zarray); zarray = NULL;
}
}

done:
nullfree(grpkey);
Expand All @@ -2013,21 +2041,46 @@ searchsubgrps(NCZ_FILE_INFO_T* zfile, NC_GRP_INFO_T* grp, NClist* subgrpnames)

/* Compute the key for the grp */
if((stat = NCZ_grpkey(grp,&grpkey))) goto done;
/* Get the map and search group */
if((stat = nczmap_search(zfile->map,grpkey,matches))) goto done;
for(i=0;i<nclistlength(matches);i++) {
const char* name = nclistget(matches,i);
if(name[0] == NCZM_DOT) continue; /* zarr/nczarr specific */
/* See if name/.zgroup exists */
if((stat = nczm_concat(grpkey,name,&subkey))) goto done;
if((stat = nczm_concat(subkey,ZGROUP,&zgroup))) goto done;
if((stat = nczmap_exists(zfile->map,zgroup)) == NC_NOERR)
nclistpush(subgrpnames,strdup(name));
stat = NC_NOERR;
nullfree(subkey); subkey = NULL;
nullfree(zgroup); zgroup = NULL;
}

if(zfile->consolidated) {
const char * group = grpkey + (grpkey[0] == '/');
size_t lgroup = strlen(group);

const NCjson * jmetadata = NULL;
NCJdictget(zfile->consolidated,"metadata", &jmetadata);
for(i=0;i<NCJlength(jmetadata);i+=2){
NCjson* jname = NCJith(jmetadata,i);
const char * fullname = NCJstring(jname);
size_t lfullname = strlen(fullname);

if(lfullname < lgroup || \
strncmp(fullname,group,lgroup) || \
(lgroup > 0 && fullname[lgroup] != NCZM_SEP[0]) ) {
continue;
}
char * start = fullname + lgroup + (lgroup>0);
char * end = strchr(start,NCZM_SEP[0]);
if(end == NULL) continue;
size_t lname = end - start;
//Ends with "/.zgroup
if(strncmp(ZMETAGROUP,end,sizeof(ZMETAGROUP)) == 0){
nclistpush(subgrpnames,strndup(start,lname));
}
}
}else{/* Get the map and search group */
if((stat = nczmap_search(zfile->map,grpkey,matches))) goto done;
for(i=0;i<nclistlength(matches);i++) {
const char* name = nclistget(matches,i);
if(name[0] == NCZM_DOT) continue; /* zarr/nczarr specific */
/* See if name/.zgroup exists */
if((stat = nczm_concat(grpkey,name,&subkey))) goto done;
if((stat = nczm_concat(subkey,ZGROUP,&zgroup))) goto done;
if((stat = nczmap_exists(zfile->map,zgroup)) == NC_NOERR)
nclistpush(subgrpnames,strdup(name));
stat = NC_NOERR;
nullfree(subkey); subkey = NULL;
nullfree(zgroup); zgroup = NULL;
}
}
done:
nullfree(grpkey);
nullfree(subkey);
Expand Down Expand Up @@ -2586,22 +2639,56 @@ getnczarrkey(NC_OBJ* container, const char* name, const NCjson** jncxxxp)
}

static int
downloadzarrobj(NC_FILE_INFO_T* file, struct ZARROBJ* zobj, const char* fullpath, const char* objname)
downloadconsolidated(NC_FILE_INFO_T* file){
int stat = NC_NOERR;
NCjson * md = NULL;
NCZ_FILE_INFO_T* zinfo = ((NCZ_FILE_INFO_T*)file->format_file_info);
if (!isconsolidaded(zinfo)) {
return NC_ENCZARR;
}
// Download /.zmetadata
if((stat=NCZ_downloadjson(zinfo->map,ZMETADATA,&md))) goto done;
if(md == NULL){
return NC_ENCZARR;
}

zinfo->consolidated = md; md = NULL;
done:
return THROW(stat);
}


static int
get_zarrobj(NC_FILE_INFO_T* file, struct ZARROBJ* zobj, const char* fullpath, const char* objname)
{
int stat = NC_NOERR;
char* key = NULL;
NCZMAP* map = ((NCZ_FILE_INFO_T*)file->format_file_info)->map;

/* Download .zXXX and .zattrs */
nullfree(zobj->prefix);
zobj->prefix = strdup(fullpath);
NCJreclaim(zobj->obj); zobj->obj = NULL;
NCJreclaim(zobj->atts); zobj->obj = NULL;
if((stat = nczm_concat(fullpath,objname,&key))) goto done;
if((stat=NCZ_downloadjson(map,key,&zobj->obj))) goto done;
nullfree(key); key = NULL;
if((stat = nczm_concat(fullpath,ZATTRS,&key))) goto done;
if((stat=NCZ_downloadjson(map,key,&zobj->atts))) goto done;

NCZ_FILE_INFO_T* zinfo = ((NCZ_FILE_INFO_T*)file->format_file_info);
if(zinfo->consolidated){
NCjson * jtmp = NULL;
if(NCJdictget(zinfo->consolidated,"metadata",&jtmp) == 0 && jtmp){
NCjson * tmp = NULL;
if((stat = nczm_concat(fullpath,objname,&key))) goto done;
if((stat=NCJdictget(jtmp,key+(key[0]=='/'),&tmp))) goto done;
if(tmp)NCJclone(tmp,&zobj->obj);
nullfree(key); key = NULL;
if((stat = nczm_concat(fullpath,ZATTRS,&key))) goto done;
if((stat=NCJdictget(jtmp,key+(key[0]=='/'),&tmp))) goto done;
if(tmp)NCJclone(tmp,&zobj->atts);
}
}else{
if((stat = nczm_concat(fullpath,objname,&key))) goto done;
if((stat=NCZ_downloadjson(zinfo->map,key,&zobj->obj))) goto done;
nullfree(key); key = NULL;
if((stat = nczm_concat(fullpath,ZATTRS,&key))) goto done;
if((stat=NCZ_downloadjson(zinfo->map,key,&zobj->atts))) goto done;
}
done:
nullfree(key);
return THROW(stat);
Expand Down
Loading