Griffin measure module needs two configuration files to define the parameters of execution, one is for environment, the other is for dq job.
{
"spark": {
"log.level": "WARN",
"checkpoint.dir": "hdfs:///griffin/streaming/cp",
"batch.interval": "1m",
"process.interval": "5m",
"config": {
"spark.default.parallelism": 5,
"spark.task.maxFailures": 5,
"spark.streaming.kafkaMaxRatePerPartition": 1000,
"spark.streaming.concurrentJobs": 4,
"spark.yarn.maxAppAttempts": 5,
"spark.yarn.am.attemptFailuresValidityInterval": "1h",
"spark.yarn.max.executor.failures": 120,
"spark.yarn.executor.failuresValidityInterval": "1h",
"spark.hadoop.fs.hdfs.impl.disable.cache": true
}
},
"persist": [
{
"type": "log",
"config": {
"max.log.lines": 100
}
}, {
"type": "hdfs",
"config": {
"path": "hdfs:///griffin/streaming/persist",
"max.lines.per.file": 10000
}
}
],
"info.cache": [
{
"type": "zk",
"config": {
"hosts": "<zookeeper host ip>:2181",
"namespace": "griffin/infocache",
"lock.path": "lock",
"mode": "persist",
"init.clear": true,
"close.clear": false
}
}
]
}
Above lists environment parameters.
- spark: This field configures spark and spark streaming parameters.
- log.level: Level of spark log.
- checkpoint.dir: Check point directory of spark streaming, for streaming mode.
- batch.interval: Interval of dumping streaming data, for streaming mode.
- process.interval: Interval of processing dumped streaming data, for streaming mode.
- config: Configuration of spark parameters.
- persist: This field configures list of metrics persist parameters, multiple persist ways are supported. Details of persist configuration here.
- info.cache: This field configures list of information cache parameters, multiple cache ways are supported. It is only for streaming dq case. Details of info cache configuration here.
- type: Metrics persist type, "log", "hdfs" and "http".
- config: Configure parameters of each persist type.
- log persist
- max.log.lines: the max lines of log.
- hdfs persist
- path: hdfs path to persist metrics
- max.persist.lines: the max lines of total persist data.
- max.lines.per.file: the max lines of each persist file.
- http persist
- api: api to submit persist metrics.
- method: http method, "post" default.
- mongo persist
- url: url of mongo db.
- database: database name.
- collection: collection name.
- log persist
- type: Information cache type, "zk" for zookeeper cache.
- config: Configure parameters of info cache type.
- zookeeper cache
- hosts: zookeeper hosts list as a string, separated by comma.
- namespace: namespace of cache info, "" as default.
- lock.path: path of lock info, "lock" as default.
- mode: create mode of zookeeper node, "persist" as default.
- init.clear: clear cache info when initialize, true default.
- close.clear: clear cache info when close connection, false default.
- zookeeper cache
{
"name": "accu_batch",
"process.type": "batch",
"data.sources": [
{
"name": "src",
"connectors": [
{
"type": "avro",
"version": "1.7",
"config": {
"file.path": "<path>/<to>",
"file.name": "<source-file>.avro"
}
}
]
}, {
"name": "tgt",
"connectors": [
{
"type": "avro",
"version": "1.7",
"config": {
"file.path": "<path>/<to>",
"file.name": "<target-file>.avro"
}
}
]
}
],
"evaluate.rule": {
"rules": [
{
"dsl.type": "griffin-dsl",
"dq.type": "accuracy",
"name": "accu",
"rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code",
"details": {
"source": "source",
"target": "target",
"miss": "miss_count",
"total": "total_count",
"matched": "matched_count"
},
"metric": {
"name": "accu"
},
"record": {
"name": "missRecords"
}
}
]
}
}
Above lists DQ job configure parameters.
- name: Name of DQ job.
- process.type: Process type of DQ job, "batch" or "streaming".
- data.sources: List of data sources in this DQ job.
- name: Name of this data source, it should be different from other data sources.
- connectors: List of data connectors combined as the same data source. Details of data connector configuration here.
- evaluateRule: Evaluate rule parameters of this DQ job.
- dsl.type: Default dsl type of all the rules.
- rules: List of rules, to define every rule step. Details of rule configuration here.
- type: Data connector type, "avro", "hive", "text-dir" for batch mode, "kafka" for streaming mode.
- version: Version string of data connector type.
- config: Configure parameters of each data connector type.
- avro data connector
- file.path: avro file path, optional, "" as default.
- file.name: avro file name.
- hive data connector
- database: data base name, optional, "default" as default.
- table.name: table name.
- where: where conditions string, split by ",", optional.
e.g.
dt=20170410 AND hour=15, dt=20170411 AND hour=15, dt=20170412 AND hour=15
- text dir data connector
- dir.path: parent directory path.
- data.dir.depth: integer, depth of data directories, 0 as default.
- success.file: success file name,
- done.file:
- avro data connector
- dsl.type: Rule dsl type, "spark-sql", "df-opr" and "griffin-dsl".
- dq.type: DQ type of this rule, only for "griffin-dsl" type, supporting "accuracy" and "profiling".
- name (step information): Result table name of this rule, optional for "griffin-dsl" type.
- rule: The rule string.
- details: Details of this rule, optional.
- accuracy dq type detail configuration
- source: the data source name which as source in accuracy, default is the name of first data source in "data.sources" if not configured.
- target: the data source name which as target in accuracy, default is the name of second data source in "data.sources" if not configured.
- miss: the miss count name in metric, optional.
- total: the total count name in metric, optional.
- matched: the matched count name in metric, optional.
- profiling dq type detail configuration
- source: the data source name which as source in profiling, default is the name of first data source in "data.sources" if not configured. If the griffin-dsl rule contains from clause, this parameter is ignored.
- uniqueness dq type detail configuration
- source: name of data source to measure uniqueness.
- target: name of data source to compare with. It is always the same as source, or more than source.
- unique: the unique count name in metric, optional.
- total: the total count name in metric, optional.
- dup: the duplicate count name in metric, optional.
- num: the duplicate number name in metric, optional.
- duplication.array: optional, if set as a non-empty string, the duplication metric will be computed, and the group metric name is this string.
- distinctness dq type detail configuration
- source: name of data source to measure uniqueness.
- target: name of data source to compare with. It is always the same as source, or more than source.
- distinct: the unique count name in metric, optional.
- total: the total count name in metric, optional.
- dup: the duplicate count name in metric, optional.
- accu_dup: the accumulate duplicate count name in metric, optional, only in streaming mode and "with.accumulate" enabled.
- num: the duplicate number name in metric, optional.
- duplication.array: optional, if set as a non-empty string, the duplication metric will be computed, and the group metric name is this string.
- with.accumulate: optional, default is true, if set as false, in streaming mode, the data set will not compare with old data to check distinctness.
- timeliness dq type detail configuration
- source: name of data source to measure timeliness.
- latency: the latency column name in metric, optional.
- threshold: optional, if set as a time string like "1h", the items with latency more than 1 hour will be record.
- accuracy dq type detail configuration
- metric: Configuration of metric export.
- name: name of metric.
- collect.type: collect metric as the type set, including "default", "entries", "array", "map", optional.
- record: Configuration of record export.
- name: name of record.
- data.source.cache: optional, if set as data source name, the cache of this data source will be updated by the records, always used in streaming accuracy case.
- origin.DF: avaiable only if "data.source.cache" is set, the origin data frame name of records.