From 9e64810fc101489a65521460de42eb49205fea72 Mon Sep 17 00:00:00 2001 From: Jeremy Phelps Date: Thu, 18 Jan 2018 17:26:30 -0800 Subject: [PATCH] Conditionally modify the context object instead of conditionally returning early. --- pydruid/client.py | 207 +++++++++------------------------------------- 1 file changed, 39 insertions(+), 168 deletions(-) diff --git a/pydruid/client.py b/pydruid/client.py index 6d70094c..4d583019 100755 --- a/pydruid/client.py +++ b/pydruid/client.py @@ -53,7 +53,7 @@ def _prepare_url_headers_and_body(self, query): username_password = \ b64encode(bytes('{}:{}'.format(self.username, self.password))) headers['Authorization'] = 'Basic {}'.format(username_password) - + return headers, querystr, url def _post(self, query): @@ -71,10 +71,8 @@ def _post(self, query): def topn(self, **kwargs): """ - A TopN query returns a set of the values in a given dimension, - sorted by a specified metric. Conceptually, a topN can be - thought of as an approximate GroupByQuery over a single - dimension with an Ordering spec. TopNs are + A TopN query returns a set of the values in a given dimension, sorted by a specified metric. Conceptually, a + topN can be thought of as an approximate GroupByQuery over a single dimension with an Ordering spec. TopNs are faster and more resource efficient than GroupBy for this use case. Required key/value pairs: @@ -83,8 +81,7 @@ def topn(self, **kwargs): :param str granularity: Aggregate data by hour, day, minute, etc., :param intervals: ISO-8601 intervals of data to query :type intervals: str or list - :param dict aggregations: A map from aggregator name to one of - the pydruid.utils.aggregators e.g., doublesum + :param dict aggregations: A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum :param str dimension: Dimension to run the query against :param str metric: Metric over which to sort the specified dimension by :param int threshold: How many of the top items to return @@ -94,10 +91,8 @@ def topn(self, **kwargs): Optional key/value pairs: - :param pydruid.utils.filters.Filter filter: Indicates which rows - of data to include in the query - :param post_aggregations: A dict with string key = 'post_aggregator_name', - and value pydruid.utils.PostAggregator + :param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query + :param post_aggregations: A dict with string key = 'post_aggregator_name', and value pydruid.utils.PostAggregator :param dict context: A dict of query context options Example: @@ -117,16 +112,14 @@ def topn(self, **kwargs): context={"timeout": 1000} ) >>> print top - >>> [{'timestamp': '2013-06-14T00:00:00.000Z', - 'result': [{'count': 22.0, 'user': "cool_user"}}]}] + >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': [{'count': 22.0, 'user': "cool_user"}}]}] """ query = self.query_builder.topn(kwargs) return self._post(query) def timeseries(self, **kwargs): """ - A timeseries query returns the values of the requested metrics (in aggregate) - for each timestamp. + A timeseries query returns the values of the requested metrics (in aggregate) for each timestamp. Required key/value pairs: @@ -134,18 +127,15 @@ def timeseries(self, **kwargs): :param str granularity: Time bucket to aggregate data by hour, day, minute, etc., :param intervals: ISO-8601 intervals for which to run the query on :type intervals: str or list - :param dict aggregations: A map from aggregator name to one of the - ``pydruid.utils.aggregators`` e.g., ``doublesum`` + :param dict aggregations: A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum :return: The query result :rtype: Query Optional key/value pairs: - :param pydruid.utils.filters.Filter filter: Indicates which rows of - data to include in the query - :param post_aggregations: A dict with string key = - 'post_aggregator_name', and value pydruid.utils.PostAggregator + :param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query + :param post_aggregations: A dict with string key = 'post_aggregator_name', and value pydruid.utils.PostAggregator :param dict context: A dict of query context options Example: @@ -157,24 +147,19 @@ def timeseries(self, **kwargs): datasource=twitterstream, granularity='hour', intervals='2013-06-14/pt1h', - aggregations=\ - {"count": doublesum("count"), "rows": count("rows")}, - post_aggregations=\ - {'percent': (Field('count') / Field('rows')) * Const(100))}, + aggregations={"count": doublesum("count"), "rows": count("rows")}, + post_aggregations={'percent': (Field('count') / Field('rows')) * Const(100))}, context={"timeout": 1000} ) >>> print counts - >>> [{'timestamp': '2013-06-14T00:00:00.000Z', - 'result': {'count': 9619.0, 'rows': 8007, - 'percent': 120.13238416385663}}] + >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': {'count': 9619.0, 'rows': 8007, 'percent': 120.13238416385663}}] """ query = self.query_builder.timeseries(kwargs) return self._post(query) def groupby(self, **kwargs): """ - A group-by query groups a results set (the requested aggregate - metrics) by the specified dimension(s). + A group-by query groups a results set (the requested aggregate metrics) by the specified dimension(s). Required key/value pairs: @@ -182,8 +167,7 @@ def groupby(self, **kwargs): :param str granularity: Time bucket to aggregate data by hour, day, minute, etc., :param intervals: ISO-8601 intervals for which to run the query on :type intervals: str or list - :param dict aggregations: A map from aggregator name to one of the - ``pydruid.utils.aggregators`` e.g., ``doublesum`` + :param dict aggregations: A map from aggregator name to one of the pydruid.utils.aggregators e.g., doublesum :param list dimensions: The dimensions to group by :return: The query result @@ -191,15 +175,11 @@ def groupby(self, **kwargs): Optional key/value pairs: - :param pydruid.utils.filters.Filter filter: Indicates which rows of - data to include in the query - :param pydruid.utils.having.Having having: Indicates which groups - in results set of query to keep - :param post_aggregations: A dict with string key = 'post_aggregator_name', - and value pydruid.utils.PostAggregator + :param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query + :param pydruid.utils.having.Having having: Indicates which groups in results set of query to keep + :param post_aggregations: A dict with string key = 'post_aggregator_name', and value pydruid.utils.PostAggregator :param dict context: A dict of query context options - :param dict limit_spec: A dict of parameters defining how to limit - the rows returned, as specified in the Druid api documentation + :param dict limit_spec: A dict of parameters defining how to limit the rows returned, as specified in the Druid api documentation Example: @@ -222,25 +202,8 @@ def groupby(self, **kwargs): ) >>> for k in range(2): ... print group[k] - >>> { - 'timestamp': '2013-10-04T00:00:00.000Z', - 'version': 'v1', - 'event': { - 'count': 1.0, - 'user_name': 'user_1', - 'reply_to_name': 'user_2', - } - } - >>> { - 'timestamp': '2013-10-04T00:00:00.000Z', - 'version': 'v1', - 'event': { - 'count': 1.0, - 'user_name': 'user_2', - 'reply_to_name': - 'user_3', - } - } + >>> {'timestamp': '2013-10-04T00:00:00.000Z', 'version': 'v1', 'event': {'count': 1.0, 'user_name': 'user_1', 'reply_to_name': 'user_2'}} + >>> {'timestamp': '2013-10-04T00:00:00.000Z', 'version': 'v1', 'event': {'count': 1.0, 'user_name': 'user_2', 'reply_to_name': 'user_3'}} """ query = self.query_builder.groupby(kwargs) return self._post(query) @@ -274,17 +237,11 @@ def segment_metadata(self, **kwargs): .. code-block:: python :linenos: - >>> meta = client.segment_metadata( - datasource='twitterstream', intervals = '2013-10-04/pt1h') + >>> meta = client.segment_metadata(datasource='twitterstream', intervals = '2013-10-04/pt1h') >>> print meta[0].keys() >>> ['intervals', 'id', 'columns', 'size'] >>> print meta[0]['columns']['tweet_length'] - >>> { - 'errorMessage': None, - 'cardinality': None, - 'type': 'FLOAT', - 'size': 30908008, - } + >>> {'errorMessage': None, 'cardinality': None, 'type': 'FLOAT', 'size': 30908008} """ query = self.query_builder.segment_metadata(kwargs) @@ -312,13 +269,7 @@ def time_boundary(self, **kwargs): >>> bound = client.time_boundary(datasource='twitterstream') >>> print bound - >>> [{ - 'timestamp': '2011-09-14T15:00:00.000Z', - 'result': { - 'minTime': '2011-09-14T15:00:00.000Z', - 'maxTime': '2014-03-04T23:44:00.000Z', - } - }] + >>> [{'timestamp': '2011-09-14T15:00:00.000Z', 'result': {'minTime': '2011-09-14T15:00:00.000Z', 'maxTime': '2014-03-04T23:44:00.000Z'}}] """ query = self.query_builder.time_boundary(kwargs) return self._post(query) @@ -337,12 +288,9 @@ def select(self, **kwargs): Optional key/value pairs: - :param pydruid.utils.filters.Filter filter: Indicates which rows of - data to include in the query - :param list dimensions: The list of dimensions to select. If left - empty, all dimensions are returned - :param list metrics: The list of metrics to select. If left empty, - all metrics are returned + :param pydruid.utils.filters.Filter filter: Indicates which rows of data to include in the query + :param list dimensions: The list of dimensions to select. If left empty, all dimensions are returned + :param list metrics: The list of metrics to select. If left empty, all metrics are returned :param dict context: A dict of query context options :return: The query result @@ -360,22 +308,8 @@ def select(self, **kwargs): paging_spec={'pagingIdentifies': {}, 'threshold': 1}, context={"timeout": 1000} ) - >>> print(raw_data) - >>> [{ - 'timestamp': '2013-06-14T00:00:00.000Z', - 'result': { - 'pagingIdentifiers': { - 'twitterstream_...08:00:00.000Z_v1': 1, - 'events': [{ - 'segmentId': 'twitterstr...000Z_v1', - 'offset': 0, - 'event': { - 'timestamp': '2013-06-14T00:00:00.000Z', - 'dim': 'value', - } - }] - } - }] + >>> print raw_data + >>> [{'timestamp': '2013-06-14T00:00:00.000Z', 'result': {'pagingIdentifiers': {'twitterstream_2013-06-14T00:00:00.000Z_2013-06-15T00:00:00.000Z_2013-06-15T08:00:00.000Z_v1': 1, 'events': [{'segmentId': 'twitterstream_2013-06-14T00:00:00.000Z_2013-06-15T00:00:00.000Z_2013-06-15T08:00:00.000Z_v1', 'offset': 0, 'event': {'timestamp': '2013-06-14T00:00:00.000Z', 'dim': 'value'}}]}}] """ query = self.query_builder.select(kwargs) return self._post(query) @@ -388,8 +322,7 @@ def export_tsv(self, dest_path): Use Query.export_tsv() method instead. """ if self.query_builder.last_query is None: - raise AttributeError( - "There was no query executed by this client yet. Can't export!") + raise AttributeError("There was no query executed by this client yet. Can't export!") else: return self.query_builder.last_query.export_tsv(dest_path) @@ -401,17 +334,15 @@ def export_pandas(self): Use Query.export_pandas() method instead """ if self.query_builder.last_query is None: - raise AttributeError( - "There was no query executed by this client yet. Can't export!") + raise AttributeError("There was no query executed by this client yet. Can't export!") else: return self.query_builder.last_query.export_pandas() class PyDruid(BaseDruidClient): """ - PyDruid contains the functions for creating and executing Druid queries. - Returns Query objects that can be used for exporting query results - into TSV files or pandas.DataFrame objects for subsequent analysis. + PyDruid contains the functions for creating and executing Druid queries. Returns Query objects that can be used + for exporting query results into TSV files or pandas.DataFrame objects for subsequent analysis. :param str url: URL of Broker node in the Druid cluster :param str endpoint: Endpoint that Broker listens for queries on @@ -460,18 +391,8 @@ class PyDruid(BaseDruidClient): } >>> print top.result - >>> [{ - 'timestamp': '2013-10-04T00:00:00.000Z', - 'result': [ - { - 'count': 7.0, - 'user_name': 'user_1', - }, - { - 'count': 6.0, - 'user_name': 'user_2', - }, - ]}] + >>> [{'timestamp': '2013-10-04T00:00:00.000Z', + 'result': [{'count': 7.0, 'user_name': 'user_1'}, {'count': 6.0, 'user_name': 'user_2'}]}] >>> df = top.export_pandas() >>> print df @@ -484,10 +405,9 @@ def __init__(self, url, endpoint): def ssl_context(self): ctx = ssl.create_default_context() - if not self.ignore_certificate_errors: - return ctx - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE + if self.ignore_certificate_errors: + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE return ctx def _post(self, query): @@ -512,52 +432,3 @@ def _post(self, query): else: query.parse(data) return query - - def scan(self, **kwargs): - """ - A scan query returns raw Druid rows - - Required key/value pairs: - - :param str datasource: Data source to query - :param str granularity: Time bucket to aggregate data by hour, day, minute, etc. - :param int limit: The maximum number of rows to return - :param intervals: ISO-8601 intervals for which to run the query on - :type intervals: str or list - - Optional key/value pairs: - - :param pydruid.utils.filters.Filter filter: Indicates which rows of - data to include in the query - :param list dimensions: The list of dimensions to select. If left - empty, all dimensions are returned - :param list metrics: The list of metrics to select. If left empty, - all metrics are returned - :param dict context: A dict of query context options - - :return: The query result - :rtype: Query - - Example: - - .. code-block:: python - :linenos: - - >>> raw_data = client.scan( - datasource=twitterstream, - granularity='all', - intervals='2013-06-14/pt1h', - limit=1, - context={"timeout": 1000} - ) - >>> print raw_data - >>> [{ - u'segmentId': u'zzzz', - u'columns': [u'__time', 'status', 'region'], - 'events': [{ - u'status': u'ok', 'region': u'SF', u'__time': 1509494400000, - }] - }] - """ - query = self.query_builder.scan(kwargs) - return self._post(query)