Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decide on data organisation (bucketing / partitioning / sorting) #382

Closed
NigelHambly opened this issue Feb 23, 2021 · 7 comments
Closed
Assignees
Labels
deployment high priority For urgent issues to be worked on before others where possible

Comments

@NigelHambly
Copy link
Collaborator

Presently on the platform the data are loaded from csv into default partitions of 128M with the arrangement of data undefined with respect to the contents. For many use cases and looking ahead we require to support joins between separate sets of parquet formatted data, e.g. gaia_source and xp_spectra, that are keyed on Gaia source ID. Furthermore we need to support cross-matched external survey usage, for users to analyse Gaia data in conjunction with external survey photometry (PanSTARRs, 2MASS, ...). Careful consideration of parquet storage details (bucketing, partitioning and sorting) are needed to avoid enormous amounts of shuffling between workers and O(NxM) cartesian (outer product of N rows in one table and M rows in another) joins in these scenarios. This issue is raised to record the considerations and experiments already done and to define explicitly the decision(s) and how the data are organised on disk.

@NigelHambly NigelHambly added high priority For urgent issues to be worked on before others where possible deployment labels Feb 23, 2021
@NigelHambly NigelHambly self-assigned this Feb 23, 2021
@NigelHambly
Copy link
Collaborator Author

There are some prototype on-the-fly cross-match algorithms available for Spark (AXS, ASTROIDE) that solve (although in the case of the former see #223) the generic catalogue cross-match problem in terms of simple proximity pairings with no catalogue proper motion propagation between the observational epochs of the respective catalogues.

On the other hand, Gaia DPAC publishes "best neighbours" to common large-scale external survey datasets that fold in more information than simple positional proximity, including epoch difference propagation and a probabilistic analysis of association via local source density. If we employ DPAC best neighbours then, at least for those external catalogues treated, we can support pre-computed cross-match as simply another dataset keyed on Gaia source ID. Caveats would be

  • a given external source can be the "best" neighbour of more than one Gaia source, so the materialisation of the combined best neighbour table plus external catalogue records would contain duplicate records for the external set and should not be used in isolation without great care
  • a given external source may not be the best neighbour of any Gaia source so would be absent in the materialised external dataset ... hence the latter may be incomplete wrt the original external catalogue in general

These could be considered minor cons: we simply clearly state them and impress on the users that if they want to analyse those data inedpendently of Gaia data they should look elsewhere for a facility on which to do so!

@NigelHambly
Copy link
Collaborator Author

A generic cross-match on-the-fly solution in principle could also be employed to support spatial querying (cone/region search etc.). But is this a crucial functionality for our platform, the primary usage mode of which is large-scale (i.e. whole-sky) analysis? Spatially-limited querying is better served by existing relational systems.

@NigelHambly
Copy link
Collaborator Author

Data skew: a consideration in a traditional hadoop cluster is load-balancing and data-skew amongst the partitions. The AXS paper discusses this. In our situation it may be less of an issue given that we have a virtualised separation between workers and the back-end storage. Provided we have many more partitions than workers, then load balancing may be less of a concern.

@NigelHambly
Copy link
Collaborator Author

Probably the simplest solution is to partition/bucket/sort all data sets by source ID given that all the large scale data sets are (or can be generated to be in the case of best neighbours) keyed on that attribute. Then cross-querying is a simple matter of plain Spark joins on that key and the underlying execution engine will plan as joins within partitions with no requirement for expensive data shuffling between the workers. This should be tested and verified of course.

@NigelHambly
Copy link
Collaborator Author

Final decision is to partition/bucket/sort by source_id into 2048 buckets. Testing has shown that this enables efficient joins between datasets keyed on source_id, including the best-neighbour tables (when pre-joined with their respective "external" catalogues on the external UID). When separate Gaia datasets are added in the future at subsequent data release points, they can be similarly added to the platform data holdings, re-partitioned the same way into separate parquet file sets, so as to enable efficient joint querying.

The data sets, which were written to hdfs on the production system, have been copied onto my user space on the CephFS: see folder /user/nch/PARQUET/REPARTITIONED. Using the python function defined in the notebook linked below, these should be re-established as tabular datasets in the meta-store of any new Spark context as follows:

  • gaia_source : /user/nch/PARQUET/REPARTITIONED/GEDR3/
  • gaia_source_ps1_best_neighbours : ___/GEDR3_PS1_BEST_NEIGHBOURS/
  • gaia_source_2masspsc_best_neighbours : ___/GEDR_2MASSPSC_BEST_NEIGHBOURS/
  • gaia_source_allwise_best_neighbours : ___/GEDR3_ALLWISE_BEST_NEIGHBOURS/

The notebook that has the relevant code is available on the DPAC SVN. This also contains a few sanity checks that can serve as examples of how to cross-query.

@Zarquan
Copy link
Collaborator

Zarquan commented Jun 23, 2021

@NigelHambly I think this is probably done ?

@NigelHambly
Copy link
Collaborator Author

Yep - the optimisation of the actual number of partitions (2048, 2096, 8196) will be tackled and tracked elsewhere.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
deployment high priority For urgent issues to be worked on before others where possible
Projects
None yet
Development

No branches or pull requests

2 participants