Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
kawasima committed Sep 27, 2016
2 parents ee3bae4 + da3640d commit ef4ed73
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 29 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.6.0
0.7.0
2 changes: 1 addition & 1 deletion dev/dev.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
[job-streamer.control-bus.system :as system]))

(def dev-config
{:app {:middleware [wrap-stacktrace]}
{:app {:middleware [wrap-stacktrace :stacktrace]}
:datomic {:uri "datomic:mem://jobstreamer"}})

(def config
Expand Down
3 changes: 2 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
[net.unit8.wscl/websocket-classloader "0.2.1"]
[net.unit8.logback/logback-websocket-appender "0.1.0"]
[io.undertow/undertow-websockets-jsr "1.3.22.Final"]
[com.datomic/datomic-free "0.9.5372" :exclusions [org.slf4j/slf4j-api org.slf4j/slf4j-nop
[com.datomic/datomic-free "0.9.5394" :exclusions [org.slf4j/slf4j-api org.slf4j/slf4j-nop
com.amazonaws/aws-java-sdk]]
[org.jsoup/jsoup "1.9.2"]
[org.clojure/data.json "0.2.6"]
[datomic-schema "1.3.0"]
[liberator "0.14.1"]
[compojure "1.5.1"]
Expand Down
4 changes: 2 additions & 2 deletions src/clj/job_streamer/control_bus/component/agents.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

(defn list-resource [{:keys [agents]}]
(liberator/resource
:available-media-types ["application/edn"]
:available-media-types ["application/edn" "application/json"]
:allowed-methods [:get]
:handle-ok (fn [ctx]
(->> (vals @agents)
Expand All @@ -35,7 +35,7 @@

(defn entry-resource [{:keys [agents datomic]} instance-id & [cmd]]
(liberator/resource
:available-media-types ["application/edn"]
:available-media-types ["application/edn" "application/json"]
:allowed-methods [:get :put]
:put! (fn [ctx]
(when-let [uuid (UUID/fromString instance-id)]
Expand Down
6 changes: 3 additions & 3 deletions src/clj/job_streamer/control_bus/component/apps.clj
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@

(defn list-resource [{:keys [datomic applications]}]
(liberator/resource
:available-media-types ["application/edn"]
:available-media-types ["application/edn" "application/json"]
:allowed-methods [:get :post]
:malformed? #(validate (parse-body %)
:application/name v/required
Expand Down Expand Up @@ -106,7 +106,7 @@

(defn batch-components-resource [{:keys [datomic]} app-name]
(liberator/resource
:available-media-types ["application/edn"]
:available-media-types ["application/edn" "application/json"]
:allowed-methods [:get]
:handle-ok (fn [ctx]
(let [in-app (->> (d/query datomic
Expand All @@ -126,7 +126,7 @@

(defn stats-resource [{:keys [datomic agents]} app-name]
(liberator/resource
:available-media-types ["application/edn"]
:available-media-types ["application/edn" "application/json"]
:allowed-methods [:get]
:handle-ok (fn [ctx]
{:agents (count (ag/available-agents agents))
Expand Down
7 changes: 4 additions & 3 deletions src/clj/job_streamer/control_bus/component/calendar.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

(defn list-resource [{:keys [datomic scheduler]}]
(liberator/resource
:available-media-types ["application/edn"]
:available-media-types ["application/edn" "application/json"]
:allowed-methods [:get :post]
:malformed? #(validate (parse-body %)
:calendar/name v/required)
Expand All @@ -35,6 +35,7 @@
true))

:post! (fn [{cal :edn}]
(println cal)
(let [id (or (:db/id cal) (d/tempid :db.part/user))]
(if-let [old-id (d/query datomic
'{:find [?calendar .]
Expand Down Expand Up @@ -64,7 +65,7 @@

(defn entry-resource [{:keys [datomic scheduler]} name]
(liberator/resource
:available-media-types ["application/edn"]
:available-media-types ["application/edn" "application/json"]
:allowed-methods [:get :put :delete]
:malformed? (fn [ctx]
(or (validate (parse-body ctx)
Expand All @@ -77,7 +78,7 @@
'{:find [(pull ?e [:*]) .]
:in [$ ?n]
:where [[?e :calendar/name ?n]]} name)

:put! (fn [{cal :edn}]
(d/transact datomic
[{:db/id (:db/id cal)
Expand Down
85 changes: 71 additions & 14 deletions src/clj/job_streamer/control_bus/component/jobs.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
(:require [clojure.tools.logging :as log]
[clojure.edn :as edn]
[com.stuartsierra.component :as component]
[bouncer.core :as b]
[bouncer.validators :as v]
[liberator.core :as liberator]
[clojure.string :as str]
[clj-time.format :as f]
(job-streamer.control-bus [notification :as notification]
[validation :refer [validate]]
[util :refer [parse-body edn->datoms to-int]])
Expand All @@ -12,7 +15,7 @@
[scheduler :as scheduler]))
(:import [java.util Date]))

(defn- find-latest-execution
(defn find-latest-execution
"Find latest from given executions."
[executions]
(when executions
Expand Down Expand Up @@ -68,18 +71,71 @@
[?job :job/name ?job-name]]}
app-name job-name))

(defn- parse-query-since [q]
(let [since (.substring q (count "since:"))]
(when (b/valid? {:since since}
:since [[v/datetime (:date f/formatters)]])
{:since (f/parse (:date f/formatters) since)})))

(defn- parse-query-until [q]
(let [until (.substring q (count "until:"))]
(when (b/valid? {:until until}
:until [[v/datetime (:date f/formatters)]])
{:until (f/parse (:date f/formatters) until)})))

(defn- parse-query-exit-status [q]
(let [exit-status (.substring q (count "exit-status:"))]
(when (b/valid? {:exit-status exit-status}
:exit-status v/required)
{:exit-status exit-status})))

(defn parse-query [query]
(when (not-empty query)
(->> (str/split query #"\s")
(map #(cond
(.startsWith % "since:") (parse-query-since %)
(.startsWith % "until:") (parse-query-until %)
(.startsWith % "exit-status:") (parse-query-exit-status %)
:default {:job-name [%]}))
(apply merge-with concat {:job-name nil}))))

(defn find-all [{:keys [datomic]} app-name query & [offset limit]]
(let [base-query '{:find [?job]
:in [$ ?app-name ?query]
(let [qmap (parse-query query)
base-query '{:find [?job]
:in [$ ?app-name [?job-name-condition ...] ?since-condition ?until-condition ?exit-status-condition]
:where [[?app :application/name ?app-name]
[?app :application/jobs ?job]]}
jobs (d/query datomic
(if (not-empty query)
(update-in base-query [:where]
conj '[(fulltext $ :job/name ?query) [[?job ?job-name]]])
base-query)
app-name (or query ""))]
(cond-> base-query
(not-empty (:job-name qmap))
(update-in [:where] conj
'[?job :job/name ?job-name]
'[(.contains ^String ?job-name ?job-name-condition)])
(or (:since qmap) (:until qmap) (:exit-status qmap))
(update-in [:where] conj
'[?job :job/executions ?job-executions]
'[?job-execution :job-execution/create-time ?create-time]
'[(max ?create-time)]
'[?job-executions :job-execution/exit-status ?exit-status]
'[?job-executions :job-execution/end-time ?end-time])

(:since qmap)
(update-in [:where] conj
'[(>= ?end-time ?since-condition)])

(:until qmap)
(update-in [:where] conj
'[(<= ?end-time ?until-condition)])

(:exit-status qmap)
(update-in [:where] conj
'[(.contains ^String ?exit-status ?exit-status-condition)]))
app-name
(:job-name qmap [])
(:since qmap "")
(:until qmap "")
(:exit-status qmap ""))]

{:results (->> jobs
(drop (dec (or offset 0)))
(take (or limit 20))
Expand Down Expand Up @@ -212,7 +268,7 @@

(defn list-resource [{:keys [datomic scheduler] :as jobs} app-name]
(liberator/resource
:available-media-types ["application/edn"]
:available-media-types ["application/edn" "application/json"]
:allowed-methods [:get :post]
:malformed? (fn [ctx]
(validate (parse-body ctx)
Expand Down Expand Up @@ -268,12 +324,13 @@
:status-notification/exit-status
:status-notification/type] (:db/id sn))))
vec)}))))))
vec))))))
vec))))
:etag (str (int (/ (System/currentTimeMillis) 10000)))))


(defn entry-resource [{:keys [datomic scheduler] :as jobs} app-name job-name]
(liberator/resource
:available-media-types ["application/edn"]
:available-media-types ["application/edn" "application/json"]
:allowed-methods [:get :put :delete]
:malformed? #(parse-body %)
:exists? (when-let [[app-id job-id] (find-by-name jobs app-name job-name)]
Expand Down Expand Up @@ -325,7 +382,7 @@

(defn job-settings-resource [{:keys [datomic] :as jobs} app-name job-name & [cmd]]
(liberator/resource
:available-media-types ["application/edn"]
:available-media-types ["application/edn" "application/json"]
:allowed-methods [:get :delete :put]
:malformed? #(parse-body %)
:exists? (when-let [[app-id job-id] (find-by-name jobs app-name job-name)]
Expand Down Expand Up @@ -426,7 +483,7 @@

(defn executions-resource [{:keys [datomic] :as jobs} app-name job-name]
(liberator/resource
:available-media-types ["application/edn"]
:available-media-types ["application/edn" "application/json"]
:allowed-methods [:get :post]
:malformed? #(parse-body %)
:exists? (when-let [[app-id job-id] (find-by-name jobs app-name job-name)]
Expand Down Expand Up @@ -460,7 +517,7 @@

(defn execution-resource [{:keys [agents scheduler datomic] :as jobs} id & [cmd]]
(liberator/resource
:available-media-types ["application/edn"]
:available-media-types ["application/edn" "application/json"]
:allowed-methods [:get :put]
:malformed? #(parse-body %)
:exists? (fn [ctx]
Expand Down
2 changes: 1 addition & 1 deletion src/clj/job_streamer/control_bus/component/scheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@

(defn entry-resource [{:keys [datomic] :as scheduler} job-id & [cmd]]
(liberator/resource
:available-media-types ["application/edn"]
:available-media-types ["application/edn" "application/json"]
:allowed-methods [:post :put :delete]
:malformed? #(parse-body %)
:exists? (fn [ctx]
Expand Down
3 changes: 3 additions & 0 deletions src/clj/job_streamer/control_bus/endpoint/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
[clojure.tools.logging :as log]
[compojure.core :refer [ANY GET routes]]
[bouncer.validators :as v]
[ring.util.response :refer [content-type]]
(job-streamer.control-bus.component
[apps :as apps]
[jobs :as jobs]
Expand Down Expand Up @@ -75,6 +76,8 @@
(apps/batch-components-resource apps app-name))
(ANY "/:app-name/stats" [app-name]
(apps/stats-resource apps app-name))
(GET "/version" [] (-> {:body (clojure.string/replace (str "\"" (slurp "VERSION") "\"") "\n" "")}
(content-type "text/plain")))

;; For debug
;(GET "/logs" [] (pr-str (model/query '{:find [[(pull ?log [*]) ...]] :where [[?log :execution-log/level]]})))
Expand Down
2 changes: 1 addition & 1 deletion src/clj/job_streamer/control_bus/system.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
[api :refer [api-endpoint]])
[job-streamer.control-bus.endpoint.api :refer [api-endpoint]]))


(defn wrap-same-origin-policy [handler alias]
(fn [req]
(if (= (:request-method req) :options)
Expand All @@ -39,6 +38,7 @@
{:app {:middleware [[wrap-not-found :not-found]
[wrap-same-origin-policy :same-origin]
[wrap-defaults :defaults]]

:not-found "Resource Not Found"
:defaults (meta-merge api-defaults {})}})

Expand Down
16 changes: 15 additions & 1 deletion src/clj/job_streamer/control_bus/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
[clojure.edn :as edn]
[clojure.java.io :as io]
[datomic.api :as d]
[ring.util.request :refer [content-type]])
[ring.util.request :refer [content-type]]
[clojure.data.json :as json])
(:import [org.jsoup Jsoup]))

(defn to-int [n default-value]
Expand Down Expand Up @@ -126,13 +127,26 @@
java.lang.String body
(slurp (io/reader body)))))

(defn json-value-reader [key value]
(if (= key :calendar/holidays)
(for [date value] (java.sql.Date/valueOf date))
value))

(defn json->edn
"Convert a format from JSON to edn"
[json]
(json/read-str json
:key-fn keyword
:value-fn json-value-reader))

(defn parse-body [context]
(when (#{:put :post} (get-in context [:request :request-method]))
(try
(if-let [body (body-as-string context)]
(case (or (content-type (:request context)) (get-in context [:request :content-type]))
"application/edn" [false {:edn (edn/read-string body)}]
"application/xml" [false {:edn (xml->edn body)}]
"application/json" [false {:edn (json->edn body)}]
false)
false)
(catch Exception e
Expand Down
29 changes: 28 additions & 1 deletion test/clj/job_streamer/control_bus/component/jobs_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
[meta-merge.core :refer [meta-merge]]
[clojure.test :refer :all]
[clojure.pprint :refer :all]
[clojure.edn :as edn]))
[clojure.edn :as edn]
[clj-time.format :as f]))



Expand Down Expand Up @@ -71,3 +72,29 @@
(is (= 201 (-> (handler request) :status))))
(let [res (jobs/find-all (:jobs system) "default" nil)]
(is (= 1 (:hits res)))))))

(deftest parse-query
(testing "parse-query"
(let [result (jobs/parse-query "a b since:2016-09-01 until:2016-09-02 exit-status:COMPLETED")]
(is (= "a" (first (:job-name result))))
(is (= "2016-09-01" (f/unparse (:date f/formatters) (:since result))))
(is (= "2016-09-02" (f/unparse (:date f/formatters) (:until result))))
(is (= "COMPLETED" (:exit-status result)))))
(testing "nil query returns nil"
(let [result (jobs/parse-query nil)]
(is (nil? result))))
(testing "empty query returns nil"
(let [result (jobs/parse-query "")]
(is (nil? result))))

(testing "single simple query"
(let [result (jobs/parse-query "a")]
(is (= {:job-name '("a")} result))))

(testing "single simple query"
(let [result (jobs/parse-query "a")]
(is (= {:job-name '("a")} result))))

(testing "ignore breaking tokens in a query"
(let [result (jobs/parse-query "a since: until: since:xxx until:yyy")]
(is (= {:job-name '("a")} result)))))

0 comments on commit ef4ed73

Please sign in to comment.