Griffin DSL is designed for DQ measurement, as a SQL-like language, trying to describe the DQ domain request.
Griffin DSL is SQL-like, case insensitive, and easy to learn.
- logical operation: not, and, or, in, between, like, is null, is nan, =, !=, <>, <=, >=, <, >
- mathematical operation: +, -, *, /, %
- sql statement: as, where, group by, having, order by, limit
null, nan, true, false
not, and, or
in, between, like, is
select, distinct, from, as, where, group, by, having, order, desc, asc, limit
!, &&, ||, =, !=, <, >, <=, >=, <>
+, -, *, /, %
(, )
., [, ]
- string: any string surrounded with a pair of " or ', with escape character \ if any request.
e.g."test"
,'string 1'
,"hello \" world \" "
- number: double or integer number.
e.g.123
,33.5
- time: an integer with unit in a string, will be translated to an integer number by millisecond.
e.g.3d
,5h
,4ms
- boolean: boolean value directly.
e.g.true
,false
- selection head: data source name.
e.g.source
,target
,`my table name`
- all field selection: * or with data source name ahead.
e.g.*
,source.*
,target.*
- field selection: field name or with data source name ahead.
e.g.source.age
,target.name
,user_id
- index selection: interget between square brackets "[]" with field name ahead.
e.g.source.attributes[3]
- function selection: function name with brackets "()", with field name ahead or not.
e.g.count(*)
,*.count()
,source.user_id.count()
,max(source.age)
- alias: declare an alias after a selection.
e.g.source.user_id as id
,target.user_name as name
- math factor: literal or function or selection or math exression with brackets.
e.g.123
,max(1, 2, 3, 4)
,source.age
,(source.age + 13)
- unary math expression: unary math operator with factor.
e.g.-(100 - source.score)
- binary math expression: math factors with binary math operators.
e.g.source.age + 13
,score * 2 + ratio
- in: in clause like sql.
e.g.source.country in ("USA", "CHN", "RSA")
- between: between clause like sql.
e.g.source.age between 3 and 30
,source.age between (3, 30)
- like: like clause like sql.
e.g.source.name like "%abc%"
- is null: is null operator like sql.
e.g.source.desc is not null
- is nan: check if the value is not a number, the syntax like
is null
e.g.source.age is not nan
- logical factor: math expression or logical expressions above or other logical expressions with brackets.
e.g.(source.user_id = target.user_id AND source.age > target.age)
- unary logical expression: unary logical operator with factor.
e.g.NOT source.has_data
,!(source.age = target.age)
- binary logical expression: logical factors with binary logical operators, including
and
,or
and comparison operators.
e.g.source.age = target.age OR source.ticket = target.tck
- expression: logical expression and math expression.
- argument: expression.
- function: function name with arguments between brackets.
e.g.max(source.age, target.age)
,count(*)
- select clause: the result columns like sql select clause, we can ignore the word "select" in Griffin DSL.
e.g.select user_id.count(), age.max() as max
,source.user_id.count() as cnt, source.age.min()
- from clause: the table name like sql from clause, in which the data source name must be one of data source names or the output table name of the former rule steps, we can ignore this clause by configoring the data source name.
e.g.from source
,from `target`
- where clause: the filter condition like sql where clause, optional.
e.g.where source.id = target.id and source.age = target.age
- group-by clause: like the group-by clause in sql, optional. Optional having clause could be following.
e.g.group by cntry
,group by gender having count(*) > 50
- order-by clause: like the order-by clause, optional.
e.g.order by name
,order by first_name desc, age asc
- limit clause: like the limit clause in sql, optional.
e.g.limit 5
Accuracy rule expression in Griffin DSL is a logical expression, telling the mapping relation between data sources.
e.g. source.id = target.id and source.name = target.name and source.age between (target.age, target.age + 5)
Profiling rule expression in Griffin DSL is a sql-like expression, with select clause ahead, following optional from clause, where clause, group-by clause, order-by clause, limit clause in order.
e.g. source.gender, source.id.count() where source.age > 20 group by source.gender
, select country, max(age), min(age), count(*) as cnt from source group by country order by cnt desc limit 5
Uniqueness rule expression in Griffin DSL is a list of selection expressions separated by comma, indicates the columns to check if is unique.
e.g. name, age
, name, (age + 1) as next_age
Distinctness rule expression in Griffin DSL is a list of selection expressions separated by comma, indicates the columns to check if is distinct.
e.g. name, age
, name, (age + 1) as next_age
Timeliness rule expression in Griffin DSL is a list of selection expressions separated by comma, indicates the input time and output time (calculate time as default if not set).
e.g. ts
, ts, end_ts
Griffin DSL is defined for DQ measurement, to describe DQ domain problem.
Actually, in Griffin, we get Griffin DSL rules, translate them into spark-sql rules for calculation in spark-sql engine.
In DQ domain, there're multiple dimensions, we need to translate them in different ways.
For accuracy, we need to get the match count between source and target, the rule describes the mapping relation between data sources. Griffin needs to translate the dsl rule into multiple sql rules.
For example, the dsl rule is source.id = target.id and source.name = target.name
, which represents the match condition of accuracy. After the translation, the sql rules are as below:
- get miss items from source:
SELECT source.* FROM source LEFT JOIN target ON coalesce(source.id, '') = coalesce(target.id, '') and coalesce(source.name, '') = coalesce(target.name, '') WHERE (NOT (source.id IS NULL AND source.name IS NULL)) AND (target.id IS NULL AND target.name IS NULL)
, save as tablemiss_items
. - get miss count:
SELECT COUNT(*) AS miss FROM miss_items
, save as tablemiss_count
. - get total count from source:
SELECT COUNT(*) AS total FROM source
, save as tabletotal_count
. - get accuracy metric:
SELECT miss_count.miss AS miss, total_count.total AS total, (total_count.total - miss_count.miss) AS matched FROM miss_count FULL JOIN total_count
, save as tableaccuracy
.
After the translation, the metrics will be persisted in table accuracy
.
For profiling, the request is always the aggregation function of data, the rule is mainly the same as sql, but only supporting select
, from
, where
, group-by
, having
, order-by
, limit
clauses, which can describe most of the profiling requests. If any complicate request, you can use sql rule directly to describe it.
For example, the dsl rule is source.cntry, source.id.count(), source.age.max() group by source.cntry
, which represents the profiling requests. After the translation, the sql rule is as below:
- profiling sql rule:
SELECT source.cntry, count(source.id), max(source.age) FROM source GROUP BY source.cntry
, save as tableprofiling
.
After the translation, the metrics will be persisted in table profiling
.
For uniqueness, or called duplicate, is to find out the duplicate items of data, and rollup the items count group by duplicate times.
For example, the dsl rule is name, age
, which represents the duplicate requests, in this case, source and target are the same data set. After the translation, the sql rule is as below:
- get distinct items from source:
SELECT name, age FROM source
, save as tablesrc
. - get all items from target:
SELECT name, age FROM target
, save as tabletgt
. - join two tables:
SELECT src.name, src.age FROM tgt RIGHT JOIN src ON coalesce(src.name, '') = coalesce(tgt.name, '') AND coalesce(src.age, '') = coalesce(tgt.age, '')
, save as tablejoined
. - get items duplication:
SELECT name, age, (count(*) - 1) AS dup FROM joined GROUP BY name, age
, save as tablegrouped
. - get total metric:
SELECT count(*) FROM source
, save as tabletotal_metric
. - get unique record:
SELECT * FROM grouped WHERE dup = 0
, save as tableunique_record
. - get unique metric:
SELECT count(*) FROM unique_record
, save as tableunique_metric
. - get duplicate record:
SELECT * FROM grouped WHERE dup > 0
, save as tabledup_record
. - get duplicate metric:
SELECT dup, count(*) AS num FROM dup_records GROUP BY dup
, save as tabledup_metric
.
After the translation, the metrics will be persisted in table dup_metric
.
For distinctness, is to find out the duplicate items of data, the same as uniqueness in batch mode, but with some differences in streaming mode.
In most time, you need distinctness other than uniqueness.
For example, the dsl rule is name, age
, which represents the distinct requests, in this case, source and target are the same data set. After the translation, the sql rule is as below:
- total count of source:
SELECT COUNT(*) AS total FROM source
, save as tabletotal_count
. - group by fields:
SELECT name, age, (COUNT(*) - 1) AS dup, TRUE AS dist FROM source GROUP BY name, age
, save as tabledup_count
. - distinct metric:
SELECT COUNT(*) AS dist_count FROM dup_count WHERE dist
, save as tabledistinct_metric
. - source join distinct metric:
SELECT source.*, dup_count.dup AS dup, dup_count.dist AS dist FROM source LEFT JOIN dup_count ON source.name = dup_count.name AND source.age = dup_count.age
, save as tabledist_joined
. - add row number:
SELECT *, ROW_NUMBER() OVER (DISTRIBUTE BY name, age SORT BY dist) row_num FROM dist_joined
, save as tablerow_numbered
. - duplicate records:
SELECT name, age, dup FROM row_numbered WHERE NOT dist OR row_num > 1
, save as tabledup_records
. - duplicate metric:
SELECT name, age, dup, COUNT(*) AS num FROM dup_records GROUP BY name, age, dup
, save as tabledup_metric
.
After the translation, the metrics will be persisted in table distinct_metric
and dup_metric
.
For timeliness, is to measure the latency of each item, and get the statistics of the latencies.
For example, the dsl rule is ts, out_ts
, the first column means the input time of item, the second column means the output time of item, if not set, __tmst
will be the default output time column. After the translation, the sql rule is as below:
- get input and output time column:
SELECT *, ts AS _bts, out_ts AS _ets FROM source
, save as tableorigin_time
. - get latency:
SELECT *, (_ets - _bts) AS latency FROM origin_time
, save as tablelat
. - get timeliness metric:
SELECT CAST(AVG(latency) AS BIGINT) AS avg, MAX(latency) AS max, MIN(latency) AS min FROM lat
, save as tabletime_metric
.
After the translation, the metrics will be persisted in table time_metric
.
You can simply use Griffin DSL rule to describe your problem in DQ domain, for some complicate requirement, you can also use some alternative rules supported by Griffin.
Griffin supports spark-sql directly, you can write rule in sql like this:
{
"dsl.type": "spark-sql",
"name": "source",
"rule": "SELECT count(id) AS cnt, max(timestamp) AS fresh_time FROM source"
}
Griffin will calculate it in spark-sql engine directly.
Griffin supports some other operations on data frame in spark, like converting json string data frame into extracted data frame with extracted object schema. For example:
{
"dsl.type": "df-opr",
"name": "ext_source",
"rule": "from_json",
"details": {
"df.name": "json_source"
}
}
Griffin will do the operation to extract json strings.
Actually, you can also extend the df-opr engine and df-opr adaptor in Griffin to support more types of data frame operations.
Griffin engine runs on spark, it might work in two phases, pre-proc phase and run phase.
- Pre-proc phase: Griffin calculates data source directly, to get appropriate data format, as a preparation for DQ calculation. In this phase, you can use df-opr and spark-sql rules.
After preparation, to support streaming DQ calculation, a timestamp column will be added in each row of data, so the data frame in run phase contains an extra column named "__tmst". - Run phase: Griffin calculates with prepared data, to get the DQ metrics. In this phase, you can use griffin-dsl, spark-sql rules, and a part of df-opr rules.
For griffin-dsl rule, griffin translates it into spark-sql rule with a group-by condition for column "__tmst", it's useful for especially streaming DQ calculation.
But for spark-sql rule, griffin use it directly, you need to add the "__tmst" column in your spark-sql rule explicitly, or you can't get correct metrics result after calculation.