-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decide on data organisation (bucketing / partitioning / sorting) #382
Comments
There are some prototype on-the-fly cross-match algorithms available for Spark (AXS, ASTROIDE) that solve (although in the case of the former see #223) the generic catalogue cross-match problem in terms of simple proximity pairings with no catalogue proper motion propagation between the observational epochs of the respective catalogues. On the other hand, Gaia DPAC publishes "best neighbours" to common large-scale external survey datasets that fold in more information than simple positional proximity, including epoch difference propagation and a probabilistic analysis of association via local source density. If we employ DPAC best neighbours then, at least for those external catalogues treated, we can support pre-computed cross-match as simply another dataset keyed on Gaia source ID. Caveats would be
These could be considered minor cons: we simply clearly state them and impress on the users that if they want to analyse those data inedpendently of Gaia data they should look elsewhere for a facility on which to do so! |
A generic cross-match on-the-fly solution in principle could also be employed to support spatial querying (cone/region search etc.). But is this a crucial functionality for our platform, the primary usage mode of which is large-scale (i.e. whole-sky) analysis? Spatially-limited querying is better served by existing relational systems. |
Data skew: a consideration in a traditional hadoop cluster is load-balancing and data-skew amongst the partitions. The AXS paper discusses this. In our situation it may be less of an issue given that we have a virtualised separation between workers and the back-end storage. Provided we have many more partitions than workers, then load balancing may be less of a concern. |
Probably the simplest solution is to partition/bucket/sort all data sets by source ID given that all the large scale data sets are (or can be generated to be in the case of best neighbours) keyed on that attribute. Then cross-querying is a simple matter of plain Spark joins on that key and the underlying execution engine will plan as joins within partitions with no requirement for expensive data shuffling between the workers. This should be tested and verified of course. |
Final decision is to partition/bucket/sort by source_id into 2048 buckets. Testing has shown that this enables efficient joins between datasets keyed on source_id, including the best-neighbour tables (when pre-joined with their respective "external" catalogues on the external UID). When separate Gaia datasets are added in the future at subsequent data release points, they can be similarly added to the platform data holdings, re-partitioned the same way into separate parquet file sets, so as to enable efficient joint querying. The data sets, which were written to hdfs on the production system, have been copied onto my user space on the CephFS: see folder /user/nch/PARQUET/REPARTITIONED. Using the python function defined in the notebook linked below, these should be re-established as tabular datasets in the meta-store of any new Spark context as follows:
The notebook that has the relevant code is available on the DPAC SVN. This also contains a few sanity checks that can serve as examples of how to cross-query. |
@NigelHambly I think this is probably done ? |
Yep - the optimisation of the actual number of partitions (2048, 2096, 8196) will be tackled and tracked elsewhere. |
Presently on the platform the data are loaded from csv into default partitions of 128M with the arrangement of data undefined with respect to the contents. For many use cases and looking ahead we require to support joins between separate sets of parquet formatted data, e.g. gaia_source and xp_spectra, that are keyed on Gaia source ID. Furthermore we need to support cross-matched external survey usage, for users to analyse Gaia data in conjunction with external survey photometry (PanSTARRs, 2MASS, ...). Careful consideration of parquet storage details (bucketing, partitioning and sorting) are needed to avoid enormous amounts of shuffling between workers and O(NxM) cartesian (outer product of N rows in one table and M rows in another) joins in these scenarios. This issue is raised to record the considerations and experiments already done and to define explicitly the decision(s) and how the data are organised on disk.
The text was updated successfully, but these errors were encountered: