diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..73a8838 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.pyc +build +*.egg-info diff --git a/COPYING b/COPYING new file mode 100644 index 0000000..3701087 --- /dev/null +++ b/COPYING @@ -0,0 +1,14 @@ +Copyright 2017, California Institute of Technology. + ALL RIGHTS RESERVED. + U.S. Government Sponsorship acknowledged. + +Any commercial use must be negotiated with the Office of Technology +Transfer t the California Institute of Technology. + +This software may be subject to U.S. export control laws and +regulations. By accepting this document, the user agrees to comply +with all applicable U.S. export laws and regulations. + +User has the responsibility to obtain export licenses, or other export +authority as may be required before exporting such information to +foreign countries or providing access to foreign persons. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0cbb939 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2017 California Institute of Technology. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..0637dc6 --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +HySDS Commons +============= + +Common Library functions for HySDS. diff --git a/archived_history.txt b/archived_history.txt new file mode 100644 index 0000000..588ee34 --- /dev/null +++ b/archived_history.txt @@ -0,0 +1,505 @@ +commit e014a0da961351a8c915b6be5e039deb5b752e40 +Merge: bcaaae8 e4f629e +Author: Gerald Manipon +Date: Thu Sep 14 14:07:30 2017 -0700 + + Merge pull request #2 from hysds-org/mstarch-actions + + mstarch: Fix explicit trigger rule + +commit e4f629e474d42b4fd645a1a589f6597b6325f0e6 +Merge: 7cd7254 bcaaae8 +Author: M Starch +Date: Tue Sep 5 18:39:00 2017 +0000 + + mstarch: merge from master + +commit 7cd7254b7dc43a66ebf24da1ebcbacb97b56f5b6 +Author: M Starch +Date: Tue Sep 5 18:37:50 2017 +0000 + + mstarch: adding action-type + +commit bcaaae8b242181367b2cf41c7e40635f61320f2b +Author: M Starch +Date: Thu Jul 20 10:21:30 2017 -0700 + + propagate ops account through to rule creation + +commit f2a00125f50adbbcdebab9aabb0c69e1bee6bbbe +Author: M Starch +Date: Wed Jul 19 21:01:35 2017 +0000 + + mstarch: fixing public sets, and adding _all user + +commit c182d268bfed2782f3bf3d91f94a4ea70c64fac9 +Author: gmanipon +Date: Thu Jul 13 15:28:18 2017 +0000 + + get username of system user if not defined in rules + +commit 7ffca9229c096f185b60a194bfd0e0398f1bcf69 +Author: gmanipon +Date: Thu Jul 13 02:34:26 2017 +0000 + + track username to be propagated to PROV-ES softwareAgent role + +commit 95646fccd4e3f36ebfd7d9f514bac7931d23e4b7 +Author: gmanipon +Date: Wed Jul 12 00:29:15 2017 +0000 + + ensure priority is either None or an int + +commit 08cfacb00c6240119daefd09842cd17afbc6ea90 +Author: M Starch +Date: Mon Jun 19 13:11:35 2017 -0700 + + mstarch: updating hysds_common to update by overwrite not upsert + +commit 16b5d7797973eea2ba3d1f9ed572424e59cf2c7f +Merge: 77e3300 b545cd0 +Author: M Starch +Date: Wed May 24 21:28:04 2017 +0000 + + Merge branch 'master' of https://github.jpl.nasa.gov/hysds-org/hysds_commons + +commit 77e330035b28bf82363c5ef3e861e492f59f234e +Author: M Starch +Date: Wed May 24 21:27:34 2017 +0000 + + mstarch: making sure optional arguments are optional + +commit b545cd01dfa5d826946f4a064f9c8a8f942d8454 +Author: jlinick +Date: Tue May 23 01:52:54 2017 +0000 + + disable dedup for passthru, single queries + +commit 751f732498a8ff7fc9758baf5e0bc9dcd05c2a19 +Author: jlinick +Date: Mon May 22 21:24:49 2017 +0000 + + initial implementation of job dedup + +commit 0ec0f0aa5f44f5daa6fa09d0b7618b20c3580abe +Merge: 4608a04 a1b59e9 +Author: M Starch +Date: Wed Apr 19 16:43:56 2017 +0000 + + Merge branch 'master' of https://github.jpl.nasa.gov/hysds-org/hysds_commons into special-inputs + +commit 4608a045fb7fca38053d2f05780683a422d57739 +Author: M Starch +Date: Tue Apr 11 17:29:47 2017 +0000 + + mstarch: refactoring type conversion, and a few pylint fixes + +commit 11db7b72c9ff716136f238e22861d2508002c981 +Author: M Starch +Date: Tue Apr 11 17:14:24 2017 +0000 + + mstarch: adding a region to BBox lambda builtin and fixing pylint errors + +commit a82dd144ac822131dc0bbf0fc1eabd07a285b6df +Author: M Starch +Date: Tue Apr 11 16:27:03 2017 +0000 + + mstarch: handling optional params, and email types + +commit 66f85383cfe89cec6ca0a2f525637c81a7b2db49 +Author: M Starch +Date: Mon Apr 10 22:29:52 2017 +0000 + + mstarch: fixing jobspec version type + +commit 7078c5f098e5f9c3999068a41b6c9cde3127e055 +Author: M Starch +Date: Mon Apr 10 22:26:01 2017 +0000 + + mstarch: more string-like types + +commit c2e7880d3c3e082e381ba3deccd851607fb934bb +Author: M Starch +Date: Mon Apr 10 19:44:52 2017 +0000 + + mstarch: adding more string-like types + +commit fe100cd76ef415394a9663a94494a08d8d2fae33 +Author: M Starch +Date: Mon Apr 10 19:03:43 2017 +0000 + + mstarch: fixing enum type handling + +commit be85ad331af292cf91737226af53d3fc4ea08996 +Author: M Starch +Date: Mon Apr 10 18:35:58 2017 +0000 + + mstarch: allowing specialized inputs + +commit a1b59e9376320ee4fb385bb465d1818c7d68e420 +Author: gmanipon +Date: Tue Mar 28 02:13:23 2017 +0000 + + propagate tag to context + +commit a97c05a4b2d013d94f5f440033aeaadd42c8539c +Author: gmanipon +Date: Mon Mar 27 18:17:41 2017 +0000 + + prevent from submitting to user_rules_* queues + +commit f7aac2ca24b6d6a65473174c6e43483dbfdb98d8 +Author: gmanipon +Date: Mon Mar 27 18:05:50 2017 +0000 + + protect from submitting to hysds orchestrator queues + +commit 8e5756b0dd69585b39d814b40578724e7225fbf7 +Author: M Starch +Date: Tue Mar 14 21:06:20 2017 +0000 + + mstarch: deserializing tags from JSON if needed + +commit 28feb59c4eef8a0521b1128eafea8bcec34e7e24 +Author: M Starch +Date: Tue Mar 14 20:30:42 2017 +0000 + + mstarch: allowing localize to handle lists, strings, and localize objects + +commit 546d956a7e9373ea10e90751c4e3211963b1299d +Author: gmanipon +Date: Fri Mar 10 22:53:35 2017 +0000 + + clean job name issued by job_iterator: job_type from rule is actually the hysds-io spec name + +commit ae6c42e4753be3bc3a77dae95d6373606f242e2e +Author: gmanipon +Date: Fri Mar 10 18:38:20 2017 +0000 + + handle job name for single submissions as well + +commit c8ec9010953b738d884def0980c85235de455b5a +Author: gmanipon +Date: Fri Mar 10 17:47:57 2017 +0000 + + properly name jobs submitted by job_iterator + +commit 4c1ae0c162bad574edf507d840720d7691c6ac44 +Author: gmanipon +Date: Fri Mar 10 17:05:24 2017 +0000 + + change to job_name as job_id implies uniqueness and consistency + +commit 82ae8069213a5f0b1414b26cc32fecd42b581a52 +Author: gmanipon +Date: Fri Mar 10 17:00:36 2017 +0000 + + allow job id specification through submit_mozart_job() + +commit 431fb09b440d2bc3fb8c45835630ab2edacbeaed +Author: gmanipon +Date: Fri Mar 10 16:24:55 2017 +0000 + + allow specifying _job_name in params to name job; fix bug in priority passing + +commit 1de5377f157193fc6dbac8dafd7d483c0e3487a9 +Author: gmanipon +Date: Fri Mar 10 15:58:45 2017 +0000 + + remove redundant function def + +commit 4e6c6a84159d82bdb4c6cff67f21bab932dc3c5f +Author: gmanipon +Date: Mon Mar 6 15:22:53 2017 +0000 + + allow configuration of hard/soft time limits from job spec + +commit 7be3d5f4dc04f3440d4ff0be4c71151fa6862089 +Author: gmanipon +Date: Thu Mar 2 15:39:45 2017 +0000 + + cleanup single/run_query logic; use any() instead of functools.reduce() to allow short-circuit + +commit 1ccb40046b99cf1f2e24284fd9d3fc3b9252b181 +Author: gmanipon +Date: Thu Mar 2 04:00:48 2017 +0000 + + remove bad log line + +commit a5b458c7ec50d59ca48e7ef4292d4b489b0cc9fd +Author: gmanipon +Date: Thu Mar 2 03:43:51 2017 +0000 + + remove bad logger line + +commit efde4d4512a17bb0694406d4c3842f6453838a7b +Author: gmanipon +Date: Thu Mar 2 03:29:04 2017 +0000 + + fix empty filter and list and enable logging + +commit 62f636d8a0dd801fa7ac359369d8769a0942e4c8 +Author: gmanipon +Date: Tue Feb 28 19:04:26 2017 +0000 + + update resolve_hysds_job() to handle params according to type + + Fix bugs with using hysds logger. + +commit 9f2beb29a6bf586df5bd8bb5ac74ddfb16862be8 +Author: gmanipon +Date: Tue Feb 28 18:29:56 2017 +0000 + + specify explicit params + +commit 8be2b031281892cded6b22cd6b5756e5e570d0a6 +Author: gmanipon +Date: Tue Feb 28 18:23:47 2017 +0000 + + fix bug in printing size of generator + +commit 99294db2ff6bde6a5a5547681462e3739070601f +Author: gmanipon +Date: Tue Feb 28 18:06:24 2017 +0000 + + make job-iterator submit through celery + +commit 71178bbc6cd297961e5276382830b405dae49ef7 +Author: gmanipon +Date: Tue Feb 28 17:07:54 2017 +0000 + + separate job resolving/submission functions by job JSON type + +commit f08f34adede246ccf9f69976f2ba3c2918dd1081 +Author: gmanipon +Date: Tue Feb 28 16:09:03 2017 +0000 + + refactored job resolver functions to job_utils + +commit 2f17401d38c4b538fa595a3b64d0dbebdd4ab9fc +Author: gmanipon +Date: Mon Feb 27 22:52:50 2017 +0000 + + handle complex query + +commit 0b176eee71f87d1d492a59343715b43f98b1830b +Author: gmanipon +Date: Mon Feb 27 22:38:45 2017 +0000 + + normalize query + +commit cdc608ead097082ffbc57633d323034b6f6de9c8 +Author: gmanipon +Date: Mon Feb 27 22:01:46 2017 +0000 + + cleanup + +commit 978ded68b24346254630fcc7c53e025198e0a04f +Author: gmanipon +Date: Mon Feb 27 21:28:20 2017 +0000 + + add job iterator + +commit 43cb34460561565683cdcf9d679df3671bcd7d6d +Author: gmanipon +Date: Mon Feb 27 19:19:53 2017 +0000 + + utilize .netrc to authenticate to rabbitmq admin interface + + Pass through requests error to enable advanced error handling instead of eating it up. + +commit b8988c63fdeda15b7e556991570912739bb2c40e +Author: gmanipon +Date: Mon Feb 27 18:35:05 2017 +0000 + + add queue utils + +commit e514be298100d68232d6087b97ffe796d1db8b5d +Author: gmanipon +Date: Mon Feb 27 18:22:07 2017 +0000 + + add version param + +commit 788b48807ba38fa6fe0a168b7eaf6ddc5580e4e6 +Author: gmanipon +Date: Mon Feb 27 17:25:44 2017 +0000 + + set default ops account as param; dump error if exception is caught + +commit b45a84efc167d2b2216ecea366562a054c525129 +Author: gmanipon +Date: Mon Feb 27 16:59:13 2017 +0000 + + optimize get_action_spec() and related library calls + +commit bc485a9dc003f74257103cb4078c7d0d039c8fcc +Author: M Starch +Date: Wed Feb 22 23:21:46 2017 +0000 + + mstarch: fixing display to put label in brackets + +commit 74304007b943785d6fe9308667895ff78224bf33 +Author: M Starch +Date: Thu Feb 16 03:09:33 2017 +0000 + + mstarch: fixing best url + +commit e6440f2211e36dd9c842ddf93eb7dddbcad1c2f8 +Author: M Starch +Date: Thu Feb 16 02:48:01 2017 +0000 + + mstarch: fixing lambda runs + +commit e540767f58bcdf8a7f0028cecde8e0b2be26f507 +Author: M Starch +Date: Thu Feb 16 00:36:42 2017 +0000 + + mstarch: fixing aggregate iteration + +commit 17125e2c0c60810201128cb5dec1e9fb144e7ba8 +Author: M Starch +Date: Wed Feb 15 22:46:48 2017 +0000 + + mstarch: fixing multi product set + +commit 35f4640befa969b692de495da1d51bfebd0ee211 +Author: M Starch +Date: Wed Feb 15 22:08:40 2017 +0000 + + mstarch: fixes typo + +commit ca6d6b15bdf56486c79c234f2ca8e6151f127b9b +Author: M Starch +Date: Wed Feb 15 22:00:14 2017 +0000 + + mstarch: fixing syntax errors + +commit 196b80546754b0b4920e6e7bf8f24bcf233fadf1 +Author: M Starch +Date: Wed Feb 15 17:23:10 2017 +0000 + + mstarch: allowing the submission of product sets + +commit 584ae914f0204adaaeec79deeb9e03062ce8a85c +Author: gmanipon +Date: Tue Feb 14 00:52:19 2017 +0000 + + add queue override + +commit d07af9c8eb99d1763711c0e70802dbe40a545f3b +Author: gmanipon +Date: Mon Feb 13 19:32:11 2017 +0000 + + clean out unused param from get_params_for_submission() + + Use global logger. + +commit 6b528c54439f5b058ee9c3feb1edfec5128e7619 +Author: gmanipon +Date: Mon Feb 13 17:12:32 2017 +0000 + + add logger + +commit f50cd8aa7931b13ce98ccce268997a54fa6371e2 +Author: M Starch +Date: Wed Feb 8 03:47:26 2017 +0000 + + mstarch: actions now display labels + +commit 7a5922deafe28b4502ccbea1d9adb70e42e1528b +Author: M Starch +Date: Thu Feb 2 00:57:56 2017 +0000 + + mstarch: sorting lists + +commit 74c040a6e904102c5b892b4b523a281b737ef3d5 +Author: M Starch +Date: Mon Jan 30 17:39:12 2017 +0000 + + action_utils.py + +commit ee1dd5ccdc1693369fecf5e623a0b406264806fa +Author: gmanipon +Date: Thu Jan 26 16:20:37 2017 +0000 + + future-proof container resolution tools across platforms + +commit 9e6e0d2607becd295f975de700c9169d0a0202a9 +Author: gmanipon +Date: Thu Jan 26 16:03:24 2017 +0000 + + add linux-specific helper utils for localhost identity resolution + +commit 0032eabd400a8fa088bde7d06203aa79bed5d371 +Author: M Starch +Date: Thu Jan 19 08:03:38 2017 +0000 + + mstarch: fixing type to job_type + +commit 0e0cc712357f26581167b18e590c32ec1dfd7e0c +Merge: d7ab0cd 7a4edee +Author: M Starch +Date: Thu Jan 19 04:01:30 2017 +0000 + + mstarch: merging + +commit d7ab0cdc063c9b05a512761d3cb09666873ed0a4 +Author: M Starch +Date: Thu Jan 19 04:00:04 2017 +0000 + + mstarch: tweaking kwarg and pass-through handling + +commit 7a4edeef1b8d51d4dae587f4510e5f2bf86331ef +Author: M Starch +Date: Thu Jan 19 00:52:55 2017 +0000 + + mstarch: fixing JSON issue + +commit 973808bc3d08fc2d73cd3d6ef4c5bd8227950298 +Author: M Starch +Date: Wed Jan 18 22:07:50 2017 +0000 + + mstarch: fixing action utils to use wiring + +commit a9e34d103db7dfe602f0a9844d52523cd63e02ef +Author: M Starch +Date: Wed Jan 18 21:13:20 2017 +0000 + + mstarch: fixing name to rule_name + +commit 890332a1f3a0f06ff1538f0ef2e6f2aa971b5634 +Author: M Starch +Date: Wed Jan 18 19:38:49 2017 +0000 + + mstarch: adding job-rest submission as library + +commit 9558f571c2b74356a0e9eba513ed5d60110ba93a +Merge: 3c77527 5bd3887 +Author: M Starch +Date: Thu Jan 12 18:06:11 2017 +0000 + + Merge branch 'master' of https://github.jpl.nasa.gov/hysds-org/hysds_commons + +commit 3c77527ea67a3896af3bdd71c92ba11113fdb333 +Author: M Starch +Date: Thu Jan 12 18:06:02 2017 +0000 + + mstarch: passing job spec id to queue listing + +commit 5bd3887af8766392ee7b2d7492793f98757f80d7 +Author: gmanipon +Date: Thu Jan 12 17:09:38 2017 +0000 + + add .gitignore + +commit 5b67c6cdc89a81f8e9685093c9512ad3bf6a5d16 +Author: M Starch +Date: Thu Jan 12 02:05:46 2017 +0000 + + mstarch: remmoving v1 support for actions + +commit eac5cb967b1d44319a191c7df19cd94c39ef924e +Author: M Starch +Date: Wed Jan 11 18:51:20 2017 +0000 + + mstarch: initial commit of hysds-stuff diff --git a/hysds_commons/__init__.py b/hysds_commons/__init__.py new file mode 100644 index 0000000..e663953 --- /dev/null +++ b/hysds_commons/__init__.py @@ -0,0 +1,4 @@ +__version__ = "0.1" +__description__ = "Common HySDS Functions, Utilities, Etc." +__url__ = "https://github.jpl.nasa.gov/hysds-org/hysds_commons" + diff --git a/hysds_commons/action_utils.py b/hysds_commons/action_utils.py new file mode 100644 index 0000000..0402023 --- /dev/null +++ b/hysds_commons/action_utils.py @@ -0,0 +1,91 @@ +import os, sys, operator, functools, json, traceback + +import hysds_commons.job_spec_utils +import hysds_commons.hysds_io_utils +from hysds_commons.log_utils import logger + + +def create_action_from_entry(jobspec_es_url, wiring, ops_account="ops"): + ''' + Create an action from an spec pulled from ES + @param jobspec_es_url: job spec ES url + @param wiring: wiring specification from HySDS + @param ops_account: string name of ops account + @return: action ready for display + ''' + + try: + #Setup the basic information for these actions + action_type = wiring.get("action-type", "both") + action = { + "allowed_accounts": [ ops_account ], + "monitoring_allowed": action_type == "trigger" or action_type == "both", + "processing_allowed": action_type == "on-demand" or action_type == "both", + "public": False + } + + #Break-out user inputs into kwargs + # and look for "passthrough", to set the correct paramater + action["kwargs"] = [param for param in wiring.get("params", []) if param.get("from", "") == "submitter"] + for arg in action["kwargs"]: + arg["validator"] = { "required": not arg.get("optional",False) } + arg["type"] = arg.get("type", "text") + arg["placeholder"] = arg.get("placeholder", arg["name"]) + action["passthru_query"] = functools.reduce( + lambda x, param: operator.or_(x, param["from"] == "passthrough" and param["name"] == "query"), + wiring.get("params",[]), False) + + #Setup user permissions + if "allowed_accounts" in wiring: + accounts = wiring.get("allowed_accounts", [ ops_account ]) + if not ops_account in accounts: + accounts.append(ops_account) + action["allowed_accounts"] = accounts + #If accounts are explicit, this is not public + action["public"] = "_all" in action["allowed_accounts"] + + #Setup action requirements + if "label" in wiring: + action["label"] = "{0} [{1}]".format(wiring.get("label"), wiring.get("job-version", "unknown-version")) + else: + label = wiring.get("id", "unknown-job:unknown-version") + try: + sp = label.split(":") + label = "{0} [{1}]".format(sp[0], sp[1]) + except Exception as e: + logger.warning("Error: {0}:{1}".format(type(e),e)) + action["label"] = label + action["type"] = wiring.get("id","unknown-job") + action["job_type"] = wiring.get("id","unknown-job") + #action["queues"] = hysds_commons.mozart_utils.get_queue_list(mozart_url,wiring["job-specification"]) + #action["specification"] = spec + action["wiring"] = wiring + except Exception as e: + logger.warning("Caught exception {}:\n{}".format(type(e), traceback.format_exc())) + return { + "allowed_accounts": ['ops'], + "monitoring_allowed": False, + "processing_allowed": False, + "public": False + } + return action + + +def get_action_spec(iospec_es_url, jobspec_es_url, ops_account="ops"): + """ + Returns action spec + @param iospec_es_url: ES url for IO specs + @param jobspec_es_url: ES url for job specs + """ + + action_specs = [] + wirings = [spec.get('_source', {}) for spec in hysds_commons.hysds_io_utils.get_hysds_ios(iospec_es_url)] + #logger.info("wirings: %s" % json.dumps(wirings, indent=2)) + jobs = hysds_commons.job_spec_utils.get_job_spec_types(jobspec_es_url) + #logger.info("jobs: %s" % json.dumps(jobs, indent=2)) + for wiring in wirings: + if not "job-specification" in wiring or not wiring["job-specification"] in jobs: + continue + action_specs.append(create_action_from_entry(jobspec_es_url, wiring, ops_account)) + #logger.info("action_specs: %s" % json.dumps(action_specs, indent=2)) + return action_specs diff --git a/hysds_commons/container_utils.py b/hysds_commons/container_utils.py new file mode 100644 index 0000000..4090543 --- /dev/null +++ b/hysds_commons/container_utils.py @@ -0,0 +1,54 @@ +import hysds_commons.metadata_rest_utils + + +CONTAINER_INDEX="containers" +CONTAINER_TYPE="container" + + +def get_container_types(es_url, logger=None): + ''' + Get the container listings from Elastic Search + @param es_url - elasticsearch URL + @return: list of container ids + ''' + return hysds_commons.metadata_rest_utils.get_types(es_url, CONTAINER_INDEX, + logger=logger) + + +def get_container(es_url, ident, logger=None): + ''' + Get a container (JSON body) + @param es_url - elasticsearch URL + @param ident - identity of container + @return: dict representing anonymous object of HySDS IO + ''' + return hysds_commons.metadata_rest_utils.get_by_id(es_url, CONTAINER_INDEX, + CONTAINER_TYPE, ident, + logger=logger) + + +def add_container(es_url, name, url, version, logger=None): + ''' + Ingests a container into the Mozart ElasticSearch index + @param es_url - elasticsearch URL + @param name - name of object for ingestion into ES + @param url - url of object for ingestion into ES + @param version - version of object for ingestion into ES + ''' + return hysds_commons.metadata_rest_utils.add_metadata(es_url, CONTAINER_INDEX, + CONTAINER_TYPE, { + "id":name, + "url":url, + "version":version}, + logger=logger) + + +def remove_container(es_url, ident, logger=None): + ''' + Remove a container + @param es_url - elasticsearch URL + @param ident - id to delete + ''' + return hysds_commons.metadata_rest_utils.remove_metadata(es_url, CONTAINER_INDEX, + CONTAINER_TYPE, ident, + logger=logger) diff --git a/hysds_commons/hysds_io_utils.py b/hysds_commons/hysds_io_utils.py new file mode 100644 index 0000000..86ef5c2 --- /dev/null +++ b/hysds_commons/hysds_io_utils.py @@ -0,0 +1,51 @@ +import hysds_commons.metadata_rest_utils + + +HYSDS_IO_INDEX="hysds_ios" +HYSDS_IO_TYPE="hysds_io" + + +def get_hysds_io_types(es_url, logger=None): + ''' + Get the hysds_io listings from Elastic Search + @param es_url - elastic search URL (from owning app i.e. Mozart, Tosca) + @return: list of hysds_io ids + ''' + return hysds_commons.metadata_rest_utils.get_types(es_url, HYSDS_IO_INDEX, logger=logger) + + +def get_hysds_ios(es_url, logger=None): + ''' + Get all hysds_io docs from ES + @param es_url - elastic search URL (from owning app i.e. Mozart, Tosca) + @return: dict representing anonymous object of HySDS IO + ''' + return hysds_commons.metadata_rest_utils.get_all(es_url, HYSDS_IO_INDEX, HYSDS_IO_TYPE, logger=logger) + + +def get_hysds_io(es_url, ident, logger=None): + ''' + Get a hysds_io (JSON body) + @param es_url - elastic search URL (from owning app i.e. Mozart, Tosca) + @param ident - identity of hysds_io + @return: dict representing anonymous object of HySDS IO + ''' + return hysds_commons.metadata_rest_utils.get_by_id(es_url, HYSDS_IO_INDEX, HYSDS_IO_TYPE, ident, logger=logger) + + +def add_hysds_io(es_url, obj, logger=None): + ''' + Ingests a hysds_io into the Mozart ElasticSearch index + @param es_url - elastic search URL (from owning app i.e. Mozart, Tosca) + @param obj - object for ingestion into ES + ''' + return hysds_commons.metadata_rest_utils.add_metadata(es_url, HYSDS_IO_INDEX, HYSDS_IO_TYPE, obj, logger=logger) + + +def remove_hysds_io(es_url, ident, logger=None): + ''' + Remove a container + @param es_url - elastic search URL (from owning app i.e. Mozart, Tosca) + @param ident - id to delete + ''' + return hysds_commons.metadata_rest_utils.remove_metadata(es_url, HYSDS_IO_INDEX, HYSDS_IO_TYPE, ident, logger=logger) diff --git a/hysds_commons/job_iterator.py b/hysds_commons/job_iterator.py new file mode 100644 index 0000000..a5bfe59 --- /dev/null +++ b/hysds_commons/job_iterator.py @@ -0,0 +1,130 @@ +#!/bin/env python +import copy, json, traceback +import logging + +from hysds_commons.request_utils import post_scrolled_json_responses +from hysds_commons.hysds_io_utils import get_hysds_io +from hysds_commons.job_utils import submit_mozart_job + +from hysds.celery import app + + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger("job-iterator") + + +def get_component_config(component): + ''' + From a component get the common configuration values + @param component - component + ''' + if component == "mozart" or component == "figaro": + es_url = app.conf["JOBS_ES_URL"] + query_idx = app.conf["STATUS_ALIAS"] + facetview_url = app.conf["MOZART_URL"] + elif component == "tosca": + es_url = app.conf["GRQ_ES_URL"] + query_idx = app.conf["DATASET_ALIAS"] + facetview_url = app.conf["GRQ_URL"] + return (es_url, query_idx, facetview_url) + + +def normalize_query(rule): + """Normalize final query.""" + + if rule.get('passthru_query', False) is True: + query = rule['query'] + filts = [] + if 'filtered' in query: + final_query = copy.deepcopy(query) + filts.append(final_query['filtered']['filter']) + final_query['filtered']['filter'] = { + 'and': filts + } + else: + final_query = { + 'filtered': { + 'query': query + } + } + final_query = { "query": final_query } + logger.info("final_query: %s" % json.dumps(final_query, indent=2)) + rule['query'] = final_query + rule['query_string'] = json.dumps(final_query) + + +def iterate(component, rule): + ''' + Iterator used to iterate across a query result and submit jobs for every hit + @param component - "mozart" or "tosca" where this submission came from + @param rule - rule containing information for running jobs + ''' + #Accumulators variables + ids = [] + error_count = 0 + errors = [] + + #Read config from "origin" + es_url, es_index, ignore1 = get_component_config(component) + + #Read in JSON formatted args and setup passthrough + normalize_query(rule) + if 'query' in rule.get('query', {}): queryobj = rule["query"] + else: queryobj = { "query": rule["query"] } + + #Get wiring + hysdsio = get_hysds_io(es_url, rule["job_type"], logger=logger) + + #Is this a single submission + passthru = rule.get('passthru_query', False) + single = hysdsio.get("submission_type", "individual" if passthru is True else "iteration") == "individual" + logger.info("single submission type: %s" % single) + + #Do we need the results + run_query = False if single else True + if not run_query: # check if we need the results anyway + run_query = any((i["from"].startswith('dataset_jpath') for i in hysdsio["params"])) + logger.info("run_query: %s" % run_query) + + #Run the query to get the products; for efficiency, run query only if we need the results + results = [{"_id":"Transient Faux-Results"}] + if run_query: + #Scroll product results + start_url = "{0}/{1}/_search".format(es_url, es_index) + scroll_url = "{0}/_search".format(es_url, es_index) + results = post_scrolled_json_responses(start_url, scroll_url, data=json.dumps(queryobj), + logger=logger, generator=True) + + #What to iterate for submission + submission_iterable = [{"_id":"Global Single Submission"}] if single else results + #Iterator loop + for item in submission_iterable: + try: + #For single submissions, submit all results as one + product = results if single else item + logger.info("Submitting mozart job for product: %s" % product) + + # set clean descriptive job name + job_type = rule['job_type'] + if job_type.startswith('hysds-io-'): + job_type = job_type.replace('hysds-io-', '', 1) + if isinstance(product, dict): + job_name="%s-%s" % (job_type, product.get('_id', 'unknown')) + else: + job_name="%s-single_submission" % job_type + + # disable dedup for passthru single submissions + enable_dedup = False if not run_query and single else True + + ids.append(submit_mozart_job(product, rule, hysdsio, + job_name=job_name, + enable_dedup=enable_dedup)) + except Exception as e: + error_count = error_count + 1 + if not str(e) in errors: + errors.append(str(e)) + logger.warning("Failed to submit jobs: {0}:{1}".format(type(e),str(e))) + logger.warning(traceback.format_exc()) + if error_count > 0: + logger.error("Failed to submit: {0} of {1} jobs. {2}".format(error_count,len(list(results))," ".join(errors))) + raise Exception("Job Submitter Job failed to submit all actions") diff --git a/hysds_commons/job_rest_utils.py b/hysds_commons/job_rest_utils.py new file mode 100644 index 0000000..60e910a --- /dev/null +++ b/hysds_commons/job_rest_utils.py @@ -0,0 +1,28 @@ +import json, traceback + +from hysds_commons.job_utils import resolve_mozart_job +from hysds_commons.mozart_utils import submit_job +from hysds_commons.log_utils import logger + + +def single_process_and_submission(mozart_url, product, rule, hysdsio=None, + es_hysdsio_url=None, queue=None): + ''' + Run a single job from inside the iterator + @param mozart_url - mozart url to submit to + @param product - product result body + @param rule - rule specification body + @param hysdsio - (optional) hysds-io body + @param es_hysdsio_url - (optional) url to request hysdsio data from + @param queue - (optional) job queue override + ''' + + # resolve job + job = resolve_mozart_job(product, rule, hysdsio, es_hysdsio_url, queue) + logger.info("resolved job: {}".format(json.dumps(job, indent=2))) + + # submit job + res = submit_job(mozart_url, job) + logger.info("submitted job to {}".format(mozart_url)) + + return res diff --git a/hysds_commons/job_spec_utils.py b/hysds_commons/job_spec_utils.py new file mode 100644 index 0000000..2076b20 --- /dev/null +++ b/hysds_commons/job_spec_utils.py @@ -0,0 +1,51 @@ +import hysds_commons.metadata_rest_utils + + +JOB_SPEC_INDEX="job_specs" +JOB_SPEC_TYPE="job_spec" + + +def get_job_spec_types(es_url, logger=None): + ''' + Get the job_spec listings from Elastic Search + @param es_url - elasticsearch URL + @return: list of job_spec ids + ''' + return hysds_commons.metadata_rest_utils.get_types(es_url, JOB_SPEC_INDEX, logger=logger) + + +def get_job_specs(es_url, logger=None): + ''' + Get all job spec docs from ES + @param es_url - elastic search URL + @return: dict representing anonymous object of job specs + ''' + return hysds_commons.metadata_rest_utils.get_all(es_url, JOB_SPEC_INDEX, JOB_SPEC_TYPE, logger=logger) + + +def get_job_spec(es_url, ident, logger=None): + ''' + Get a job_spec (JSON body) + @param es_url - elasticsearch URL + @param ident - identity of job_spec + @return: dict representing anonymous object of HySDS IO + ''' + return hysds_commons.metadata_rest_utils.get_by_id(es_url, JOB_SPEC_INDEX, JOB_SPEC_TYPE, ident, logger=logger) + + +def add_job_spec(es_url, obj, logger=None): + ''' + Ingests a job_spec into ES + @param es_url - elasticsearch URL + @param obj - object for ingestion into ES + ''' + return hysds_commons.metadata_rest_utils.add_metadata(es_url, JOB_SPEC_INDEX, JOB_SPEC_TYPE, obj, logger=logger) + + +def remove_job_spec(es_url, ident, logger=None): + ''' + Remove a container + @param es_url - elasticsearch URL + @param ident - id to delete + ''' + return hysds_commons.metadata_rest_utils.remove_metadata(es_url, JOB_SPEC_INDEX, JOB_SPEC_TYPE, ident, logger=logger) diff --git a/hysds_commons/job_utils.py b/hysds_commons/job_utils.py new file mode 100644 index 0000000..fdf4864 --- /dev/null +++ b/hysds_commons/job_utils.py @@ -0,0 +1,458 @@ +import os, pwd, json, traceback, types, collections + +from hysds_commons.job_spec_utils import get_job_spec +from hysds_commons.hysds_io_utils import get_hysds_io +from hysds_commons.container_utils import get_container +from hysds_commons.log_utils import logger + +from hysds.celery import app +from hysds.orchestrator import do_submit_job + + +def get_username(): + """Get username.""" + + try: return pwd.getpwuid(os.getuid())[0] + except: return None + + +def get_params_for_products_set(wiring, kwargs, passthrough=None, products=None): + ''' + Get parameters for parameters for set of products + @param wiring: wiring specification + @param kwargs: key word arguments from submitter + @param passthrough: rule containing passthrough arguments + @param products: a list of products + ''' + + params = {} + if products is None: return params + for prod in products: + get_params_for_submission(wiring, kwargs, passthrough, prod, params, aggregate=True) + return params + + +def get_params_for_submission(wiring, kwargs, passthrough=None, product=None, + params=None, aggregate=False): + ''' + Get params for submission for HySDS/Tosca style workflow + @param wiring - wiring specification + @param kwargs - arguments from user form + @param passthrough - rule + @param product - product + @param params - existing params + ''' + logger.info("wiring: %s" % json.dumps(wiring, indent=2)) + logger.info("kwargs: %s" % json.dumps(kwargs, indent=2)) + logger.info("passthrough: %s" % json.dumps(passthrough, indent=2)) + logger.info("product: %s" % json.dumps(product, indent=2)) + if params is None: + params = {} + for wire in wiring["params"]: + # Aggregated results from dataset_jpath:... are put into a list + if aggregate and wire["from"].startswith("dataset_jpath:"): + val = get_inputs(wire, kwargs, passthrough, product) + val = run_type_conversion(wire, val) + val = run_lambda(wire, val) + if not wire["name"] in params: + params[wire["name"]] = [] + params[wire["name"]].append(val) + # Non-aggregated and non-dataset_jpath fields are set once + elif not wire["name"] in params: + val = get_inputs(wire, kwargs, passthrough, product) + val = run_type_conversion(wire, val) + val = run_lambda(wire, val) + params[wire["name"]] = val + return params +def run_type_conversion(wire, val): + ''' + Run a conversion from the input string to a known type + @param wire: hysds-wiring record + @param val: value to convert + @returns: type converted value + ''' + #Unspefied types are text + param_type = wire.get("type", "text") + #Unfilled optional parameters get the empty string as a converted type + if wire.get("optional", False) and val.strip() == "": + return val.strip() + elif param_type == "number": + return float(val) + elif param_type == "boolean": + return val.lower() == "true" + elif param_type in ["region"]: + return json.loads(val) + else: + return val + +def run_lambda(wire, val): + ''' + Runs the lambda key as a lambda function with 1 arg, the previous value + @param wire - wiring spec to check for lambda + @param val + ''' + + if "lambda" in wire: + try: + if not wire["lambda"].startswith("lambda:") and not wire["lambda"].startswith("lambda "): + raise RuntimeError("[ERROR] Failed to run lambda function, must be lambda expression taking 0 or 1 inputs") + import functools + import lambda_builtins + namespace = { "functools":functools } + for name in dir(lambda_builtins): + if name.startswith("__"): + continue + namespace[name] = lambda_builtins.__dict__[name] + fn = eval(wire["lambda"], namespace, {}) + val = fn(val) + except Exception as e: + raise RuntimeError("[ERROR] Failed to run lambda function to fill {0}. {1}:{2}".format(wire["name"], type(e), e)) + return val + + +def get_inputs(param, kwargs, rule=None, product=None): + ''' + Update parameter to add in a value for the param + @param param - parameter to update + @param kwargs - inputs from user form + @param rule - (optional) rule hit to use to fill pass throughs + @param product - (optional) product hit for augmenting + ''' + + #Break out if value is known + if "value" in param: + ret = param["value"] + return ret + source = param.get("from", "unknown") + #Get a value + ret = param.get("default_value", None) + if source == "submitter": + ret = kwargs.get(param.get("name", "unknown"), None) + elif source == "passthrough" and not rule is None: + ret = rule.get(param["name"], None) + elif source.startswith("dataset_jpath:") and not product is None: + #If we are processing a list of products, create a list for outputs + ret = process_xpath(source.split(":")[1], product) + #Check value is found + if ret is None and not product is None and not rule is None: + raise RuntimeError("Failed to find '{0}' input from '{1}'".format(param.get("name", "unknown"), source)) + return ret + + +def process_xpath(xpath, trigger): + ''' + Process the xpath to extract data from a trigger + @param xpath - xpath location in trigger + @param trigger - trigger metadata to extract XPath + ''' + + ret = trigger + parts = xpath.replace("xpath.", "").split(".") + for part in parts: + if ret is None or part == "": + return ret + #Try to convert to integer, if possible, for list indicies + try: + part = int(part) + if len(ret) <= part: + ret = None + else: + ret = ret[part] + continue + except: + pass + ret = ret.get(part,None) + return ret + + +def match_inputs(param, context): + ''' + Update parameter to add in a value for the param + @param param - parameter to update + @param context - context of job specification + ''' + + #Break out if value is known + if "value" in param: + return + source = param.get("source", "unknown") + #Get a value + param["value"] = param.get("default_value", None) + if source == "submitter": + param["value"] = context.get("inputs", {}).get(param.get("name", "unknown"),None) + elif source == "context": + param["value"] = context.get(param.get("name"), None) + elif source.startswith("xpath."): + param["value"] = process_xpath(source, context.get("trigger",{})) + #Check value is found + if param["value"] is None: + raise RuntimeError("Failed to find '{0}' input from '{1}'".format(param.get("name", "unknown"), source)) + +def fill_localize(value,localize_urls): + ''' + Fill the localize product list. Must handle a value that is one of the following: + 1. A URL as a string, should be wrapped like {"url":stinrg} + 2. An object defining "url". Does not need wrapping + 3. Another list containing URLs/objects. Should be flattened + Note: this function recurses to handle sub-lists + @param value: a list conforming to points 1-3 above + @param localize_urls: in-out list to append localize object to + Note: this parameter is modified + ''' + # Check for string that need to be wrapped in + # localize format + if isinstance(value,basestring): + localize_urls.append({"url":value}) + # Check for objects that define the "url" field, and if so + # they can be passed right in + elif not getattr(value,"get", None) is None and not value.get("url",None) is None: + localize_urls.append(value) + # Other types of objects must throw an error + elif not getattr(value,"get", None) is None: + raise ValueError("Invalid object of type {0} trying to be localized. {1}".format(type(value),value)) + # Handle lists and other iterables by recursing + else: + for val in value: + fill_localize(val,localize_urls) + +def route_outputs(param, context, positional, localize_urls): + ''' + Route the input parameters to their destinations + @param param - parameter specification + @param context - context object to fill with context-destined items + @param positional - list to append positional arguments to + @param localize_urls - list to append localize urls to + ''' + + destination = param.get("destination", "unknown") + if destination == "positional": + context[param["name"]] = param.get("value", "") + positional.append(context[param["name"]]) + elif destination == "localize": + val = param.get("value", "") + fill_localize(val,localize_urls) + elif destination == "context": + context[param["name"]] = param.get("value", "") + + +def get_command_line(command, positional): + ''' + Gets the command line invocation for this submission by + concatenating the command, and positional arguments + @param command - command line from job spec + @param positional - positional arguments + @return: command strin with args, or None + ''' + + parts = [] + if not command is None: + parts.append(command) + for posit in positional: + #Escape any single quotes + if not isinstance(posit, basestring): + posit = "{0}".format(json.dumps(posit)) + posit = posit.replace("'","'\"'\"'") + #Add in encapsulating single quotes + parts.append("'{0}'".format(posit)) + ret = " ".join(parts) + return ret + + +def resolve_mozart_job(product, rule, hysdsio=None, es_hysdsio_url=None, queue=None): + ''' + Resolve Mozart job JSON. + @param product - product result body + @param rule - rule specification body + @param hysdsio - (optional) hysds-io body + @param es_hysdsio_url - (optional) url to request hysdsio data from + @param queue - (optional) job queue override + ''' + + logger.info("rule: %s" % json.dumps(rule, indent=2)) + logger.info("hysdsio: %s" % json.dumps(hysdsio, indent=2)) + logger.info("es_hysdsio_url: %s" % es_hysdsio_url) + logger.info("queue: %s" % queue) + + # override queue + queue = rule['queue'] if queue is None else queue + + # ensure priority is int + if rule['priority'] is not None: rule["priority"] = int(rule['priority']) + + # this is the common data for all jobs, and will be copied for each individual submission + if hysdsio is None and es_hysdsio_url is None: + message = "[ERROR] Must supply a hysds-io object or a ES-URL to request one" + logger.error(message) + raise RuntimeError(message) + elif hysdsio is None: + hysdsio = get_hysds_io(es_hysdsio_url, rule["job_type"], logger=logger) + + # initialize job JSON + job = { + "queue": queue, + "priority": rule["priority"], + "type": hysdsio["job-specification"], + "tags": json.dumps([rule["rule_name"], hysdsio["id"]]), + "username": rule.get("username", get_username()), + } + rule["name"] = rule["rule_name"] + #logger.info("job before get_params_for_submission: %s" % json.dumps(job, indent=2)) + + # resolve parameters for job JSON + if type(product) != type(dict()): + logger.info("Products: %s" % product) + params = get_params_for_products_set(hysdsio, json.loads(rule["kwargs"]), rule, product) + else: + logger.info("Product: %s" % product) + params = get_params_for_submission(hysdsio, json.loads(rule["kwargs"]), rule, product) + #logger.info("job after get_params_for_submission: %s" % json.dumps(job, indent=2)) + #logger.info("params from get_params_for_submission: %s" % json.dumps(params, indent=2)) + logger.info("params: %s" % json.dumps(params, indent=2)) + + # set params + job["params"] = json.dumps(params) + #logger.info("job after adding params: %s" % json.dumps(job, indent=2)) + + return job + + +def resolve_hysds_job(job_type=None, queue=None, priority=None, tags=None, + params=None, job_name=None, payload_hash=None, + enable_dedup=True, username=None): + ''' + Resolve HySDS job JSON. + @param job_type - type of the job spec to go find + @param queue - queue to submit to + @param priority - priority of job + @param tags - tags for this job + @param params - stringified dictionary representing job params + @param job_name - base job name override + @param payload_hash - user-generated payload hash + @param enable_dedup - flag to enable/disable job dedup + @param username - username + ''' + + # check args + if job_type is None: + raise RuntimeError("'type' must be supplied in request") + if queue is None: + raise RuntimeError("'queue' must be supplied in request") + if params is None: params = {} + elif isinstance(params, types.StringTypes): params = json.loads(params) + elif isinstance(params, dict): pass + else: raise RuntimeError("Invalid arg type 'params': {} {}".format(type(params), params)) + + # pull mozart job and container specs + specification = get_job_spec(app.conf['JOBS_ES_URL'], job_type) + container_id = specification.get("container", None) + container_spec = get_container(app.conf['JOBS_ES_URL'], container_id) + logger.info("Running from: {0} in container: {1}".format(job_type, container_id)) + + # resolve inputs/outputs + positional = [] + context = {} + localize_urls = specification.get("localize_urls", []) + logger.info("params: {}".format(specification.get("params", []))) + for param in specification.get("params", []): + #TODO: change to "check_inputs" + #match_inputs(param,context) + logger.info("param: {}".format(param)) + if not param["name"] in params: + raise RuntimeError("'params' must specify '{0}' parameter".format(param["name"])) + param["value"] = params[param["name"]] + route_outputs(param, context, positional, localize_urls) + + # build command line + cmd = get_command_line(specification.get("command", None), positional) + + # add docker value overlays + overlays = specification.get("imported_worker_files", {}) + + # get hard/soft time limits + time_limit = specification.get('time_limit', None) + soft_time_limit = specification.get('soft_time_limit', None) + + # initialize hysds job JSON + job = { + "job_type": "job:{0}".format(job_type), + "job_queue": queue, + "container_image_url": container_spec.get("url", None), + "container_image_name": container_spec.get("id", None), + "container_mappings": overlays, + "time_limit": time_limit, + "soft_time_limit": soft_time_limit, + "enable_dedup": enable_dedup, + "payload": { + "_command" : cmd, + "localize_urls": localize_urls, + } + } + + #add optional parameters + if job_name is not None: job["job_name"] = job_name + if "disk_usage" in specification: + job["payload"]["_disk_usage"] = specification.get("disk_usage") + if priority is not None: job["priority"] = priority + if payload_hash is not None: job["payload_hash"] = payload_hash + if username is not None: job["username"] = username + #deserialize tags, if needed + if isinstance(tags, types.StringTypes): tags = json.loads(tags) + if tags is not None: job["tags"] = tags + if tags is not None and len(tags) > 0: job["tag"] = tags[0] + job["payload"]["job_specification"] = specification + job["payload"]["container_specification"] = container_spec + + #Merge in context wihout overwrite + for k,v in context.iteritems(): + if not k in job["payload"]: + job["payload"][k] = v + + # add tag to payload for propagation + if "tag" in job and "tag" not in job["payload"]: + job["payload"]["tag"] = job["tag"] + + return job + + +def submit_hysds_job(job): + ''' + Submits a HySDS job via celery + @param job_type - type of the job spec to go find + @param queue - queue to submit to + @param priority - priority of job + @param tags - tags for this job + @param params - dictionary representing job params + ''' + + logger.info("Submitting job:\n{0}".format(json.dumps(job,indent=2))) + res = do_submit_job(job, 'jobs_processed') + return res.id + + +def submit_mozart_job(product, rule, hysdsio=None, es_hysdsio_url=None, + queue=None, job_name=None, payload_hash=None, + enable_dedup=True): + ''' + Resolve a Mozart job to a HySDS job and submit via celery + @param product - product result body + @param rule - rule specification body + @param hysdsio - (optional) hysds-io body + @param es_hysdsio_url - (optional) url to request hysdsio data from + @param queue - (optional) job queue override + @param job_name - (optional) base job name override + @param payload_hash - (optional) user-generated payload hash + @param enable_dedup - (optional) flag to enable/disable job dedup + ''' + + # resolve mozart job + moz_job = resolve_mozart_job(product, rule, hysdsio, es_hysdsio_url, queue) + logger.info("resolved mozart job: {}".format(json.dumps(moz_job, indent=2))) + + # resolve hysds job + job = resolve_hysds_job(moz_job['type'], moz_job['queue'], moz_job['priority'], + moz_job['tags'], moz_job['params'], job_name, + payload_hash, enable_dedup, moz_job['username']) + logger.info("resolved HySDS job: {}".format(json.dumps(job, indent=2))) + + # submit hysds job + return submit_hysds_job(job) diff --git a/hysds_commons/lambda_builtins.py b/hysds_commons/lambda_builtins.py new file mode 100644 index 0000000..e903c60 --- /dev/null +++ b/hysds_commons/lambda_builtins.py @@ -0,0 +1,64 @@ +''' +This module defines functions that can be used by the lambda processors +defined in the HySDS-IO files. It is an attempt to absract out common +functions that are used in most HySDS processing in order to be shared +across lambda implementations. + +@author mstarch +''' + +def get_best_url(urls, best_prefix=None): + ''' + Return a single URL starting with the given prefix. If the prefix is None + then return a URL not starting with "http". If all URLs start with "http" + return the first. + @param urls: list of urls to search + #param best_prefix: prefix to look for + @returns: best url + ''' + best = None + for url in urls: + #Return matching best + if not best_prefix is None and url.startswith(best_prefix): + return url + #Set best as first not http + if best is None or (not url.startswith("http") and best.startswith("http")): + best = url + return best + +def get_partial_products(ident, base_url, product_relative_paths): + ''' + Get a list of URLs to paths under the base URL + @param base_url: base url of the product + @param product_relative_paths: list of product relative paths + @returns: list of full paths + ''' + import os.path + localize = [] + for rel_path in product_relative_paths: + localize.append({ + "url": os.path.join(base_url, rel_path), + "local_path": os.path.join(ident, rel_path) + }) + return localize + +def region_to_bbox(region): + ''' + Converts from a hysdsio region type to a bbox. + @param region: region in GeoJSON format containing + a "coordinates" child + @return: MBR in [minLat, maxLat, minLon, maxLon] format + ''' + coordinates = region.get("coordinates") + min_lat = 90 + max_lat = -90 + min_lon = 360 + max_lon = -360 + #Must handle multi-shape coordinats + for shape in coordinates: + for point in shape: + min_lat = min(min_lat, point[1]) + max_lat = max(max_lat, point[1]) + min_lon = min(min_lon, point[0]) + max_lon = max(max_lon, point[0]) + return [min_lat, max_lat, min_lon, max_lon] diff --git a/hysds_commons/linux_utils.py b/hysds_commons/linux_utils.py new file mode 100644 index 0000000..6286c80 --- /dev/null +++ b/hysds_commons/linux_utils.py @@ -0,0 +1,31 @@ +import re +from subprocess import check_output, CalledProcessError + + +# compiled regexes +DEF_GATEWAY_RE = re.compile(r'^default\s+via\s+(.+)\s+dev') +DOCKER_RE = re.compile(r'docker') + + +def running_in_container(): + """Return True if caller is running in a container instance. False otherwise.""" + + with open("/proc/1/cgroup") as f: cgroup = f.read() + return True if DOCKER_RE.search(cgroup) else False + + +def get_gateway_ip(): + """Return IP address of default gateway.""" + + out = check_output(["ip", "route", "show", "default", "0.0.0.0/0"]) + match = DEF_GATEWAY_RE.search(out) + if not match: + raise RuntimeError("Failed to extract default gateway from ip route: %s" % out) + return match.group(1) + + +def get_container_host_ip(): + """Return the IP address of the container host if caller is running in a + container. Otherwise, returns the default localhost IP address.""" + + return "127.0.0.1" if not running_in_container() else get_gateway_ip() diff --git a/hysds_commons/log_utils.py b/hysds_commons/log_utils.py new file mode 100644 index 0000000..4734d86 --- /dev/null +++ b/hysds_commons/log_utils.py @@ -0,0 +1,9 @@ +import logging + + +log_format = "[%(asctime)s: %(levelname)s %(filename)s:%(funcName)s] %(message)s" +logging.basicConfig(format=log_format, level=logging.INFO) +logger = logging.getLogger('hysds_commons') + + +def get_logger(): return logger diff --git a/hysds_commons/metadata_rest_utils.py b/hysds_commons/metadata_rest_utils.py new file mode 100644 index 0000000..d07a59d --- /dev/null +++ b/hysds_commons/metadata_rest_utils.py @@ -0,0 +1,79 @@ +import requests, json +import request_utils + + +def get_types(es_url, es_index, logger=None): + ''' + Get the listing of spec IDs from an ES index + @param es_url: elastic search url + @param es_index: index to list from + @return: list of ids from given index + ''' + + query = {"query":{"match_all":{}}} + url = "{0}/{1}/_search".format(es_url, es_index) + es_url = "{0}/_search".format(es_url) + data=json.dumps(query) + results = request_utils.post_scrolled_json_responses(url, es_url, data=data, logger=logger) + return sorted([result["_id"] for result in results]) + + +def get_all(es_url, es_index, es_type, query=None, logger=None): + ''' + Get all spec documents in ES index + @param es_url: elastic search url + @param es_index - index containing id + @param es_type - index containing type + @return: list of all specification docs + ''' + + if query is None: query = {"query":{"match_all":{}}} + url = "{0}/{1}/_search".format(es_url, es_index) + es_url = "{0}/_search".format(es_url) + return request_utils.post_scrolled_json_responses(url, es_url, data=json.dumps(query), logger=logger) + + +def get_by_id(es_url, es_index, es_type, ident, logger=None): + ''' + Get a spec document by ID + @param es_url: elastic search url + @param es_index - index containing id + @param es_type - index containing type + @param ident - ID + @return: dict representing anonymous object of specifications + ''' + + if ident is None: + raise Exception("id must be supplied") + final_url = '{0}/{1}/{2}/{3}'.format(es_url, es_index, es_type, ident) + dataset_metadata = request_utils.get_requests_json_response(final_url, logger=logger) + #Navigate around Dataset metadata to get true specification + ret = dataset_metadata["_source"] + return ret + + +def add_metadata(es_url, es_index, es_type, obj, logger=None): + ''' + Ingests a metadata into the Mozart ElasticSearch index + @param es_url: elastic search url + @param es_index - ElasticSearch index to place object into + @param es_type - ElasticSearch type to place object into + @param obj - object for ingestion into ES + ''' + + #data = {"doc_as_upsert": True,"doc":obj} + final_url = "{0}/{1}/{2}/{3}".format(es_url, es_index, es_type, obj["id"]) + request_utils.requests_json_response("POST", final_url, json.dumps(obj), logger=logger) + + +def remove_metadata(es_url, es_index, es_type, ident, logger=None): + ''' + Remove a container + @param es_url: elastic search url + @param es_index - ElasticSearch index to place object into + @param es_type - ElasticSearch type to place object into + @param ident - id of container to delete + ''' + + final_url = "{0}/{1}/{2}/{3}".format(es_url, es_index, es_type, ident) + request_utils.requests_json_response("DELETE", final_url, logger=logger) diff --git a/hysds_commons/mozart_utils.py b/hysds_commons/mozart_utils.py new file mode 100644 index 0000000..0d6b908 --- /dev/null +++ b/hysds_commons/mozart_utils.py @@ -0,0 +1,71 @@ +import os, json +from hysds_commons.request_utils import requests_json_response + + +DEFAULT_MOZART_VERSION = "v0.1" + + +def mozart_call(mozart_url, method, data, version=DEFAULT_MOZART_VERSION, logger=None): + ''' + Call mozart method with data + @param mozart_url - url to mozart + @param method - method to call + @param data - data to supply to call + @param version - mozart API version + @param logger - logger to log to + ''' + + url = os.path.join(mozart_url, "api", version, method) + getpost = "GET" + if method == "job/submit": + getpost = "POST" + res = requests_json_response(getpost, url, data=data, verify=False, logger=logger) + return res["result"] + + +def get_job_spec(mozart_url, ident, version=DEFAULT_MOZART_VERSION, logger=None): + ''' + Queries Mozart for the job type + @param mozart_url - url to mozart + @param ident - id of the job type + @param version - mozart API version + @param logger - logger to log to + ''' + return mozart_call(mozart_url, "job_spec/type", {"id":ident}, version, logger) + + +def get_job_spec_list(mozart_url, version=DEFAULT_MOZART_VERSION, logger=None): + ''' + Queries Mozart for the job types + @param mozart_url - url to mozart + @param version - mozart API version + @param logger - logger to log to + ''' + + lst = mozart_call(mozart_url, "job_spec/list", {}, version, logger) + return lst + + +def get_queue_list(mozart_url, ident=None, version=DEFAULT_MOZART_VERSION, logger=None): + ''' + Queries Mozart for the active queues + @param mozart_url - url to mozart + @param version - mozart API version + @param logger - logger to log to + ''' + + data = {} + if not ident is None: + data = {"id":ident} + return mozart_call(mozart_url, "queue/list", data, version, logger) + + +def submit_job(mozart_url, data, version=DEFAULT_MOZART_VERSION, logger=None): + ''' + Submit a job with given data + @param mozart_url - url to mozart + @param data - data to submit as job + @param version - mozart API version + @param logger - logger to log to + ''' + return mozart_call(mozart_url, "job/submit", data, version, logger) diff --git a/hysds_commons/net_utils.py b/hysds_commons/net_utils.py new file mode 100644 index 0000000..8aeb15b --- /dev/null +++ b/hysds_commons/net_utils.py @@ -0,0 +1,13 @@ +import os, sys + +import hysds_commons.linux_utils + + +def get_container_host_ip(): + """Return the IP address of the container host if caller is running in a + container. Otherwise, returns the default localhost IP address.""" + + if sys.platform.startswith('linux'): + return hysds_commons.linux_utils.get_container_host_ip() + else: + raise NotImplementedError("Platform %s not supported." % sys.platform) diff --git a/hysds_commons/queue_utils.py b/hysds_commons/queue_utils.py new file mode 100644 index 0000000..8aa9035 --- /dev/null +++ b/hysds_commons/queue_utils.py @@ -0,0 +1,32 @@ +import os +from requests import HTTPError + +from hysds_commons.request_utils import get_requests_json_response +from hysds_commons.log_utils import logger + +from hysds.celery import app + + +HYSDS_QUEUES = ( + app.conf['JOBS_PROCESSED_QUEUE'], + app.conf['USER_RULES_JOB_QUEUE'], + app.conf['DATASET_PROCESSED_QUEUE'], + app.conf['USER_RULES_DATASET_QUEUE'], + app.conf['USER_RULES_TRIGGER_QUEUE'], +) + + +def get_all_queues(rabbitmq_admin_url): + ''' + List the queues available for job-running + Note: does not return celery internal queues + @param rabbitmq_admin_url: RabbitMQ admin URL + @return: list of queues + ''' + + try: data = get_requests_json_response(os.path.join(rabbitmq_admin_url, "api/queues")) + except HTTPError, e: + if e.response.status_code == 401: + logger.error("Failed to authenticate to {}. Ensure credentials are set in .netrc.".format(rabbitmq_admin_url)) + raise + return [ obj["name"] for obj in data if not obj["name"].startswith("celery") and obj["name"] not in HYSDS_QUEUES ] diff --git a/hysds_commons/request_utils.py b/hysds_commons/request_utils.py new file mode 100644 index 0000000..6f64867 --- /dev/null +++ b/hysds_commons/request_utils.py @@ -0,0 +1,86 @@ +import traceback, requests, json + + +def requests_json_response(method, url, data="{}", ignore_errors=False, auth=None, verify=True, logger=None): + ''' + Sends a request with supplied data and method + @param method - "GET" or "POST" method + @param url - url to request + @param data - data to send along with request + @return: dictionary representing JSON object + ''' + + try: + if method == "GET": + r = requests.get(url, data=data, auth=auth, verify=verify) + elif method == "POST": + r = requests.post(url, data=data, auth=auth, verify=verify) + elif method == "DELETE": + r = requests.delete(url, auth=auth, verify=verify) + else: + raise Exception("requests_json_response doesn't support request-method: {0}".format(method)) + r.raise_for_status() + return r.json() + except Exception as e: + message = "Failed to '{0}' to {1}. Exception: {2}:{3}.\nData: {4}".format(method, url, type(e), str(e), data) + #If the response is resolvable as JSON, return it in the message + try: + message = message + "\nResponse: {5}".format(json.dumps(r.json(), indent=2)) + except: + pass + #Log if the logger was passed in + if not logger is None: logger.warning(message) + if not ignore_errors: raise(e) + + +def get_requests_json_response(url, **kwargs): + ''' + Calls requests_json_response with "GET" argument + @param url - url param + @param kwargs - passthrough kwargs + ''' + return requests_json_response("GET", url, **kwargs) + + +def post_requests_json_response(url, **kwargs): + ''' + Calls requests_json_response with "POST" argument + @param url - url param + @param kwargs - passthrough kwargs + ''' + return requests_json_response("POST", url, **kwargs) + + +def post_scrolled_json_responses(url, es_url, generator=False, **kwargs): + ''' + Calls get_json_rsponse in a scrolling manner (ES compatible) + @param url - url to setup scan + @param es_url - es url to scan through + @param kwargs - pass through kwargs + @param generator - True if we should return a generator and not a list + @return: list of results from scrolled ES results + ''' + + def getResultsGenerator(): + if not url.rstrip("/").endswith("_search"): + raise Exception("Scrolling only works on search URLs. {0} incompatible.".format(url)) + setup_url = url + "?search_type=scan&scroll=10m&size=100" + result = post_requests_json_response(setup_url, **kwargs) + # Harvest scan-setup + count = result['hits']['total'] + scroll_id = result['_scroll_id'] + scroll_url = es_url + "/scroll?scroll=10m" + while True: + # Data is no longer a query, and now a scroll_id + kwargs["data"] = scroll_id + result = post_requests_json_response(scroll_url, **kwargs) + scroll_id = result['_scroll_id'] + if len(result['hits']['hits']) == 0: + break + for hit in result['hits']['hits']: + yield hit + + gen = getResultsGenerator() + if generator: + return gen + return [res for res in gen] diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..257c7cd --- /dev/null +++ b/setup.py @@ -0,0 +1,15 @@ +from setuptools import setup, find_packages +import hysds_commons + +setup( + name='hysds_commons', + version=hysds_commons.__version__, + long_description=hysds_commons.__description__, + url=hysds_commons.__url__, + packages=find_packages(), + include_package_data=True, + zip_safe=False, + install_requires=[ + 'requests>=2.7.0' + ] +)