From a3a08c91df2d218e3e2b9a22d851ca297a6692a7 Mon Sep 17 00:00:00 2001 From: Greg Hale Date: Sun, 8 Jul 2018 10:23:44 -0400 Subject: [PATCH 1/7] Add preliminary phash ETL --- backend/facebook-ad-image-hashes/ChangeLog.md | 5 + backend/facebook-ad-image-hashes/LICENSE | 30 +++ backend/facebook-ad-image-hashes/Setup.hs | 2 + backend/facebook-ad-image-hashes/default.nix | 23 ++ .../facebook-ad-image-hashes.cabal | 51 ++++ backend/facebook-ad-image-hashes/shell.nix | 19 ++ .../src/CliOptions.hs | 124 +++++++++ .../facebook-ad-image-hashes/src/Queries.hs | 243 ++++++++++++++++++ .../facebook-ad-image-hashes/src/RunCli.hs | 25 ++ 9 files changed, 522 insertions(+) create mode 100644 backend/facebook-ad-image-hashes/ChangeLog.md create mode 100644 backend/facebook-ad-image-hashes/LICENSE create mode 100644 backend/facebook-ad-image-hashes/Setup.hs create mode 100644 backend/facebook-ad-image-hashes/default.nix create mode 100644 backend/facebook-ad-image-hashes/facebook-ad-image-hashes.cabal create mode 100644 backend/facebook-ad-image-hashes/shell.nix create mode 100644 backend/facebook-ad-image-hashes/src/CliOptions.hs create mode 100644 backend/facebook-ad-image-hashes/src/Queries.hs create mode 100644 backend/facebook-ad-image-hashes/src/RunCli.hs diff --git a/backend/facebook-ad-image-hashes/ChangeLog.md b/backend/facebook-ad-image-hashes/ChangeLog.md new file mode 100644 index 00000000..5b6aae8c --- /dev/null +++ b/backend/facebook-ad-image-hashes/ChangeLog.md @@ -0,0 +1,5 @@ +# Revision history for facebook-ad-image-hashes + +## 0.1.0.0 -- YYYY-mm-dd + +* First version. Released on an unsuspecting world. diff --git a/backend/facebook-ad-image-hashes/LICENSE b/backend/facebook-ad-image-hashes/LICENSE new file mode 100644 index 00000000..5af69111 --- /dev/null +++ b/backend/facebook-ad-image-hashes/LICENSE @@ -0,0 +1,30 @@ +Copyright (c) 2018, Greg Hale + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * Neither the name of Greg Hale nor the names of other + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/backend/facebook-ad-image-hashes/Setup.hs b/backend/facebook-ad-image-hashes/Setup.hs new file mode 100644 index 00000000..9a994af6 --- /dev/null +++ b/backend/facebook-ad-image-hashes/Setup.hs @@ -0,0 +1,2 @@ +import Distribution.Simple +main = defaultMain diff --git a/backend/facebook-ad-image-hashes/default.nix b/backend/facebook-ad-image-hashes/default.nix new file mode 100644 index 00000000..70819420 --- /dev/null +++ b/backend/facebook-ad-image-hashes/default.nix @@ -0,0 +1,23 @@ +{ mkDerivation, base, bytestring, errors, http-client, http-client-tls, HUnit, lrucaching +, optparse-applicative, c-phash, hs-phash, pkgconfig, postgresql-simple, resourcet, stdenv +, stm, streaming, streaming-concurrency +, streaming-postgresql-simple, text, zeromq +}: +mkDerivation { + pname = "facebook-ad-image-hashes"; + version = "0.1.0.0"; + src = ./.; + buildTools = [ pkgconfig ]; + isLibrary = true; + isExecutable = true; + libraryHaskellDepends = [ + base bytestring errors HUnit lrucaching optparse-applicative hs-phash http-client + http-client-tls postgresql-simple resourcet stm streaming streaming-concurrency + streaming-postgresql-simple text + ]; + executableHaskellDepends = [ base ]; + license = stdenv.lib.licenses.bsd3; + libraryPkgconfigDepends = [ c-phash zeromq ]; + testPkgconfigDepends = [ c-phash zeromq ]; + executablePkgconfigDepends = [ c-phash zeromq ]; +} diff --git a/backend/facebook-ad-image-hashes/facebook-ad-image-hashes.cabal b/backend/facebook-ad-image-hashes/facebook-ad-image-hashes.cabal new file mode 100644 index 00000000..91f3255a --- /dev/null +++ b/backend/facebook-ad-image-hashes/facebook-ad-image-hashes.cabal @@ -0,0 +1,51 @@ +-- Initial facebook-ad-image-hashes.cabal generated by cabal init. For +-- further documentation, see http://haskell.org/cabal/users-guide/ + +name: facebook-ad-image-hashes +version: 0.1.0.0 +-- synopsis: +-- description: +license: BSD3 +license-file: LICENSE +author: Greg Hale +maintainer: imalsogreg@gmail.com +-- copyright: +category: Web +build-type: Simple +extra-source-files: ChangeLog.md +cabal-version: >=1.10 + +library + exposed-modules: CliOptions + Queries + RunCli + build-depends: base >=4.9 && <4.11 + , bytestring + , directory + , errors + , filepath + , http-client + , http-client-tls + , HUnit + , lrucaching + , optparse-applicative + , phash + , postgresql-simple + , random + , resourcet + , stm + , streaming + , streaming-concurrency + , streaming-postgresql-simple + , mtl + , text + pkgconfig-depends: pHash, libzmq + hs-source-dirs: src + default-language: Haskell2010 + +executable hashes-cli + build-depends: base + , facebook-ad-image-hashes + hs-source-dirs: exec + main-is: Main.hs + default-language: Haskell2010 diff --git a/backend/facebook-ad-image-hashes/shell.nix b/backend/facebook-ad-image-hashes/shell.nix new file mode 100644 index 00000000..be9706a3 --- /dev/null +++ b/backend/facebook-ad-image-hashes/shell.nix @@ -0,0 +1,19 @@ +let myoverlay = self: super: { + haskellPackages = super.haskellPackages.override { + overrides = hself: hsuper: + let + dj = self.haskell.lib.doJailbreak; + dc = self.haskell.lib.dontCheck; + in + { + phash = null; + # hs-phash = dj (dc (hself.callPackage ../../../phash/default.nix {})); + # hs-phash = ((../../../phash/default.nix)); + hs-phash = hself.callPackage ../../../phash/default.nix { c-phash = pkgs.phash; }; + resourcet = dj (dc hsuper.resourcet_1_1_11); + postgresql-simple = dj (dc hsuper.postgresql-simple); + }; + }; + }; + pkgs = import ../../../nixpkgs { overlays = [ myoverlay ]; }; +in (pkgs.haskellPackages.callPackage ./. { c-phash = pkgs.phash; }).env \ No newline at end of file diff --git a/backend/facebook-ad-image-hashes/src/CliOptions.hs b/backend/facebook-ad-image-hashes/src/CliOptions.hs new file mode 100644 index 00000000..9344ef86 --- /dev/null +++ b/backend/facebook-ad-image-hashes/src/CliOptions.hs @@ -0,0 +1,124 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module CliOptions where + +import Control.Monad +import Control.Exception +import Data.Maybe +import Data.Monoid +import Data.Text (Text) +import qualified Data.Text as T +import qualified Database.PostgreSQL.Simple as PG +import Options.Applicative +import System.Environment +import System.IO + +getCommand :: IO Command +getCommand = execParser $ info (phashCommand <**> helper) + ( fullDesc + <> progDesc "Run commands for perceptual hashes against the ads database" + ) + +data Command = + DbTest PG.ConnectInfo + | DbResetHashes PG.ConnectInfo + | DbPopulateHashes PG.ConnectInfo + deriving (Show) + +phashCommand :: Parser Command +phashCommand = hsubparser $ + command "test-db" + (info (DbTest <$> dbConn) + (progDesc "Test connection to the ads database")) + <> command "reset-phashes" + (info (DbResetHashes <$> dbConn) + (progDesc "Clear the phash column in the ads database")) + <> command "populate-phashes" + (info (DbPopulateHashes <$> dbConn) + (progDesc "Compute phashes for images in the ads database")) + +-- data DbConnCfg = DbConnCfg +-- { dbUser :: Text +-- , dbPass :: Text +-- , dbHost :: Text +-- , dbPort :: Int +-- , dbName :: Text +-- } deriving (Show) + +dbConn :: Parser PG.ConnectInfo +dbConn = + PG.ConnectInfo + <$> strOption (long "dbhost" + <> short 'h' + <> help "Database Host" + <> value "localhost") + <*> option auto (long "dbport" + <> short 'p' + <> help "Database Port" + <> value 5432) + <*> strOption (long "dbuser" + <> short 'U' + <> help "Database User" + <> value "facebook_ads") + <*> strOption (long "dbpass" + <> short 'p' + <> help "Database Password" + <> value "password") + <*> strOption (long "dbname" + <> short 'd' + <> help "Database Name" + <> value "facebook_ads") + +-- Extra Utilities for allowing CLI parser to sample env vars +-- and dotenv files +type Env = [(Text, Text)] + +class FromText a where + fromText :: Text -> Either String a + +instance FromText Text where + fromText = Right + +environ :: (HasValue f, FromText a) => Text -> Env -> Mod f a +environ k env = maybe idm value . join $ parse <$> lookup k env + where + parse = either (const Nothing) Just . fromText + +-- Read in all env vars and any vars from +-- an environment variable file +importEnv :: Maybe FilePath + -- ^ Path to an environment variable, + -- @Nothing@ will default to `~/.env` + -> IO Env +importEnv envVarFile = do + env <- getEnvironment + dotEnv <- case envVarFile of + Nothing -> + (readFile ".env") `catch` (\(e :: SomeException) -> return "") + Just fp -> + readFile fp + let fileEnv = getFileEnv dotEnv + env' = map (\(k,v) -> (T.pack k, T.pack v)) env + return (env' ++ fileEnv) + where + + getFileEnv :: String -> Env + getFileEnv c = catMaybes . map splitPair . lines $ c + + stripLeadingSpace = dropWhile (\c -> elem c [' ', '\t']) + + -- Turn a line like + -- "HOST=0.0.0.0:8080 #The host to listen on" + -- into @Just ("HOST", "0.0.0.0:8080:")@ + -- or + -- "#This is a comment" + -- into @Nothing@ + splitPair :: String -> Maybe (Text, Text) + splitPair l = case takeWhile (/= '#') l of + "" -> Nothing + l' -> let (key, val) = break (== '=') l' + in if length key > 0 + && length (stripLeadingSpace val) > 0 + then Just (T.pack key, T.pack (stripLeadingSpace val)) + else Nothing diff --git a/backend/facebook-ad-image-hashes/src/Queries.hs b/backend/facebook-ad-image-hashes/src/Queries.hs new file mode 100644 index 00000000..32d4d2dd --- /dev/null +++ b/backend/facebook-ad-image-hashes/src/Queries.hs @@ -0,0 +1,243 @@ +{- +Queries here drive the phash generation ETL job +-} + +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE TypeApplications #-} + +module Queries ( + testDb + , resetPhashes + , populatePhashes + )where + +-------------------------------------------------------------------------------- +import Control.Concurrent +import Control.Concurrent.STM +import Control.Exception +import Data.LruCache as LRU +import Control.Monad.Trans.Resource +import Control.Monad.Except (runExceptT) +import Control.Concurrent.STM.TBQueue +import Control.Monad (unless) +import qualified Data.ByteString.Char8 as BS +import qualified Data.ByteString.Lazy as BSL +import Data.Int (Int64) +import qualified Data.Traversable as T +import Data.PHash +import qualified Data.Text as T +import qualified Data.Text.Encoding as E +import Data.Typeable +import qualified Database.PostgreSQL.Simple as PG +import qualified Database.PostgreSQL.Simple.Types as PG +import Database.PostgreSQL.Simple.SqlQQ (sql) +import Database.PostgreSQL.Simple.FromField as PG +import qualified Database.PostgreSQL.Simple.Streaming as PGStream +import Database.PostgreSQL.Simple.ToField as PG +import Database.PostgreSQL.Simple (Only(..), connect) +import qualified Network.HTTP.Client as HTTP +import qualified Network.HTTP.Client.TLS as HTTP +import Streaming +import qualified Streaming as S +import qualified Streaming.Prelude as S +import Streaming.Concurrent +import System.Directory +import System.Random +import System.FilePath + +import CliOptions + +------------------------------------------------------------------------------ +-- | Check several assumptions about the database +-- - does it have the right table? +-- - with the right columns for images and phashes? +-- - with the right types? +testDb :: PG.ConnectInfo -> IO () +testDb cfg = do + conn <- connect cfg + r <- PG.query_ @(Only Int) conn "select 1" + unless (r == [Only 1]) + (error $ "Strange result for query 'select 1': " ++ show r) + schm <- PG.query_ @(T.Text, T.Text) conn + [sql| SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = 'ads' + |] + unless ( elem ("phash", "ARRAY") schm && elem ("images", "ARRAY") schm) + (error $ "'ads table does not have 'phash' and 'images'\ + \columns with correct type ARRAY\n\n" ++ show schm) + + +------------------------------------------------------------------------------ +-- | Reset phashes column +resetPhashes :: PG.ConnectInfo -> IO () +resetPhashes cfg = do + conn <- connect cfg + r <- PG.execute_ conn "UPDATE ads SET phash = '{}'" + print $ "Updated " ++ show (r :: Int64) ++ " records in ads database" + + +------------------------------------------------------------------------------ +-- | Wrapper for id in the `ads` table +newtype AdId = AdId { getAdId :: T.Text } + deriving (Eq, Ord, Show, PG.FromField, PG.ToField) + + +------------------------------------------------------------------------------ +-- | Stream the image links from ads and write back the corresponding +-- perceptual hashes. +-- Several steps are performance sensitive, or may involve data +-- too large to reside in memory +-- +-- Our strategy will be to stream image URLs into a work queue, +-- work-steal the URLs off the queue for (a) download and (b) +-- hashing, write the hashes into an lru-cache (keyed by URL), +-- and batch the results for `UPDATE` queries to the ads table. +-- +-- TODO: Pull magic numbers into populate config CLI command +populatePhashes :: PG.ConnectInfo -> IO () +populatePhashes cfg = do + + manager <- HTTP.newTlsManager + conn <- connect cfg + + hashCache <- newTVarIO (LRU.empty 1000000) + + -- Inbox/Outbox/ParallelWorker provided by `withBufferedTransform` + runResourceT $ withBufferedTransform 5 + + -- Parallel workers share per-ad phashing + (adPhashes manager hashCache) + + -- Serial stream of incomig ads + (writeStreamBasket $ + PGStream.stream_ conn "SELECT id, images FROM ads WHERE phash = '{}';") + + -- Serial chunked stream of db updates + -- Chunking prevents us from doing a DB query + -- per row written + (\ob -> withStreamBasket ob + (S.mapM_ (doInsert conn) . S.mapped S.toList . S.chunksOf 5)) + + +------------------------------------------------------------------------------ +-- | The per-ad worker serially computes phashes for all that ad's images +-- Take a LRU cache in a TVar (for sharing between threads), and +-- return a callback appropriate for use by @withBufferedTransform@. +-- The callback receives URLs from the OutBasket parameter, +-- downloads the images, computes their hashes, batches the results, +-- and writes them into the InBasket. +-- +-- The LRU cache is updated by each record, and will be consulted before +-- downloading any URLs or computing their phashes. There is no TTL on +-- the cache, so if we want to invalidate it, we need to rerun the query +adPhashes :: HTTP.Manager + -> TVar (LRU.LruCache T.Text (Either T.Text PHash)) + -> OutBasket (AdId, PG.PGArray T.Text) + -> InBasket (AdId, PG.PGArray (Either T.Text PHash)) + -> ResourceT IO () +adPhashes manager hashCache outBasket inBasket = + withStreamBasket outBasket $ \outStream -> + let r = S.mapM (\(k,PG.PGArray urls) -> do + hashes <- mapM (liftIO . resolvePhash manager hashCache) urls + return (k, PG.PGArray hashes) + ) outStream + in writeStreamBasket r inBasket + + + +------------------------------------------------------------------------------ +-- | Insert a set of (Ad, phashes) pairs into the `ads` database +doInsert :: PG.Connection -> [(AdId, PG.PGArray (Either T.Text PHash))] -> ResourceT IO () +doInsert dbConn phashes = do + + let formatEntry hashOrError = case hashOrError of + Left err -> err + Right (PHash h) -> T.pack (show h) + inserts = fmap (\(k,v) -> (k, fmap formatEntry v)) $ phashes + + n <- liftIO $ PG.executeMany dbConn + [sql| UPDATE ads + SET phash = upd.phash + FROM (VALUES (?,?)) as upd(id,phash) + WHERE ads.id = upd.id + |] inserts + liftIO $ putStrLn $ "Writing " ++ show n ++ " records" + + + +------------------------------------------------------------------------------ +-- | For a URL, get the phash by either of two means: +-- 1. Find it in the supplied cache +-- 2. Download the URL, compute the phash, and update the cache +resolvePhash :: HTTP.Manager + -> TVar (LRU.LruCache T.Text (Either T.Text PHash)) + -> T.Text + -> IO (Either T.Text PHash) +resolvePhash manager hashCacheTVar url = do + + -- Check (and LRU-update) the cache in an STM transaction + cacheSearch <- atomically $ do + hashCache <- readTVar hashCacheTVar + case LRU.lookup url hashCache of + Nothing -> return Nothing + Just ( cachedVal, newCache ) -> do + writeTVar hashCacheTVar newCache + return (Just cachedVal) + + case cacheSearch of + Just hash -> return (hash) + Nothing -> do + + -- Get a filepath + -- TODO: Do this better. Temp filenames must be guaranteed unique and unixy + tmpPath <- + (\dir n -> concat [dir, [pathSeparator], "fbp-image-", show n]) + <$> getTemporaryDirectory + <*> randomRIO @Int (1,100000) + + -- TODO: error handling + httpReq <- HTTP.parseRequest (T.unpack url) + liftIO $ BSL.writeFile tmpPath . HTTP.responseBody =<< HTTP.httpLbs httpReq manager + + hash <- maybe (Left "pHash failure") Right <$> imageHash tmpPath + removePathForcibly tmpPath + + atomically $ modifyTVar hashCacheTVar (LRU.insert url hash) + + return $ hash + + +------------------------------------------------------------------------------ +-- | **Old** Yesterday's implemetation here was single-threaded in the worker +-- Take a LRU cache in a TVar (for sharing between threads), and +-- return a callback appropriate for use by @withBuffer@. The callback +-- receives URLs from the @Stream (Of [(AdId, PGArray T.Text)]) m a@ parameter, +-- downloads the images, computes their hashes, batches the results, +-- and writes them into the database. +-- +-- The LRU cache is updated by each record, and will be consulted before +-- downloading any URLs or computing their phashes. There is no TTL on +-- the cache, so if we want to invalidate it, we need to rerun the query +handleRecords :: PG.Connection + -> HTTP.Manager + -> TVar (LRU.LruCache T.Text (Either T.Text PHash)) + -> Stream (Of (AdId, PG.PGArray T.Text)) (ResourceT IO) () + -> ResourceT IO () +handleRecords dbConn manager hashCache urlStream = do + + liftIO (putStrLn "handleRecords") + + -- For each ads row in the stream, sequence all row's images through phash + let r = S.mapM (\(k, PG.PGArray urls) -> do + hashes <- mapM (liftIO . resolvePhash manager hashCache) urls + return (k, PG.PGArray hashes) + ) urlStream + + -- Collect groups of n rows into chunks for bulk DB insert + -- For now, n == 5 + S.mapM_ (doInsert dbConn) $ S.mapped S.toList $ S.chunksOf 5 r diff --git a/backend/facebook-ad-image-hashes/src/RunCli.hs b/backend/facebook-ad-image-hashes/src/RunCli.hs new file mode 100644 index 00000000..441ae202 --- /dev/null +++ b/backend/facebook-ad-image-hashes/src/RunCli.hs @@ -0,0 +1,25 @@ +{- This is the Intended Main entrypoint for the cli, + The 'real' main (which calls this one) is in `exec/Main.hs`. + The library has a main-like function so that it can be shared + with a test suite +-} + +module RunCli ( + runCommand + , main + ) where + +import CliOptions +import Queries + +------------------------------------------------------------------------------- +-- | Passthrough from CLI inputs to queries +runCommand :: Command -> IO () +runCommand (DbTest cfg) = testDb cfg +runCommand (DbResetHashes cfg) = resetPhashes cfg +runCommand (DbPopulateHashes cfg) = populatePhashes cfg + +------------------------------------------------------------------------------- +-- | Convenience entrypoint for main +main :: IO () +main = getCommand >>= runCommand From 3151666787fe681615d404c710199be79511fe21 Mon Sep 17 00:00:00 2001 From: Greg Hale Date: Tue, 10 Jul 2018 22:24:58 -0400 Subject: [PATCH 2/7] Use explicit transaction in phash ETL --- .../facebook-ad-image-hashes/src/Queries.hs | 103 ++++++------------ 1 file changed, 32 insertions(+), 71 deletions(-) diff --git a/backend/facebook-ad-image-hashes/src/Queries.hs b/backend/facebook-ad-image-hashes/src/Queries.hs index 32d4d2dd..9c606193 100644 --- a/backend/facebook-ad-image-hashes/src/Queries.hs +++ b/backend/facebook-ad-image-hashes/src/Queries.hs @@ -16,40 +16,32 @@ module Queries ( )where -------------------------------------------------------------------------------- -import Control.Concurrent -import Control.Concurrent.STM -import Control.Exception +import Control.Concurrent.STM (TVar, atomically, + modifyTVar, newTVarIO, + readTVar, writeTVar) import Data.LruCache as LRU -import Control.Monad.Trans.Resource -import Control.Monad.Except (runExceptT) -import Control.Concurrent.STM.TBQueue +import Control.Monad.Trans.Resource (ResourceT, runResourceT) import Control.Monad (unless) -import qualified Data.ByteString.Char8 as BS import qualified Data.ByteString.Lazy as BSL import Data.Int (Int64) -import qualified Data.Traversable as T -import Data.PHash +import Data.PHash (PHash(..), imageHash) import qualified Data.Text as T -import qualified Data.Text.Encoding as E -import Data.Typeable import qualified Database.PostgreSQL.Simple as PG import qualified Database.PostgreSQL.Simple.Types as PG import Database.PostgreSQL.Simple.SqlQQ (sql) import Database.PostgreSQL.Simple.FromField as PG import qualified Database.PostgreSQL.Simple.Streaming as PGStream import Database.PostgreSQL.Simple.ToField as PG -import Database.PostgreSQL.Simple (Only(..), connect) import qualified Network.HTTP.Client as HTTP import qualified Network.HTTP.Client.TLS as HTTP -import Streaming -import qualified Streaming as S +import Streaming (Stream, Of, chunksOf, + liftIO) import qualified Streaming.Prelude as S -import Streaming.Concurrent -import System.Directory -import System.Random -import System.FilePath - -import CliOptions +import qualified Streaming.Concurrent as S +import System.Directory (getTemporaryDirectory, + removePathForcibly) +import System.Random (randomRIO) +import System.FilePath (pathSeparator) ------------------------------------------------------------------------------ -- | Check several assumptions about the database @@ -58,9 +50,9 @@ import CliOptions -- - with the right types? testDb :: PG.ConnectInfo -> IO () testDb cfg = do - conn <- connect cfg - r <- PG.query_ @(Only Int) conn "select 1" - unless (r == [Only 1]) + conn <- PG.connect cfg + r <- PG.query_ @(PG.Only Int) conn "select 1" + unless (r == [PG.Only 1]) (error $ "Strange result for query 'select 1': " ++ show r) schm <- PG.query_ @(T.Text, T.Text) conn [sql| SELECT column_name, data_type @@ -76,7 +68,7 @@ testDb cfg = do -- | Reset phashes column resetPhashes :: PG.ConnectInfo -> IO () resetPhashes cfg = do - conn <- connect cfg + conn <- PG.connect cfg r <- PG.execute_ conn "UPDATE ads SET phash = '{}'" print $ "Updated " ++ show (r :: Int64) ++ " records in ads database" @@ -103,25 +95,25 @@ populatePhashes :: PG.ConnectInfo -> IO () populatePhashes cfg = do manager <- HTTP.newTlsManager - conn <- connect cfg + conn <- PG.connect cfg hashCache <- newTVarIO (LRU.empty 1000000) -- Inbox/Outbox/ParallelWorker provided by `withBufferedTransform` - runResourceT $ withBufferedTransform 5 + PG.withTransaction conn $ runResourceT $ S.withBufferedTransform 5 -- Parallel workers share per-ad phashing (adPhashes manager hashCache) -- Serial stream of incomig ads - (writeStreamBasket $ + (S.writeStreamBasket $ PGStream.stream_ conn "SELECT id, images FROM ads WHERE phash = '{}';") -- Serial chunked stream of db updates -- Chunking prevents us from doing a DB query -- per row written - (\ob -> withStreamBasket ob - (S.mapM_ (doInsert conn) . S.mapped S.toList . S.chunksOf 5)) + (\ob -> S.withStreamBasket ob + (S.mapM_ (doInsert conn) . S.mapped S.toList . chunksOf 5)) ------------------------------------------------------------------------------ @@ -137,22 +129,23 @@ populatePhashes cfg = do -- the cache, so if we want to invalidate it, we need to rerun the query adPhashes :: HTTP.Manager -> TVar (LRU.LruCache T.Text (Either T.Text PHash)) - -> OutBasket (AdId, PG.PGArray T.Text) - -> InBasket (AdId, PG.PGArray (Either T.Text PHash)) + -> S.OutBasket (AdId, PG.PGArray T.Text) + -> S.InBasket (AdId, PG.PGArray (Either T.Text PHash)) -> ResourceT IO () adPhashes manager hashCache outBasket inBasket = - withStreamBasket outBasket $ \outStream -> - let r = S.mapM (\(k,PG.PGArray urls) -> do - hashes <- mapM (liftIO . resolvePhash manager hashCache) urls - return (k, PG.PGArray hashes) - ) outStream - in writeStreamBasket r inBasket - + S.withStreamBasket outBasket $ \outStream -> + S.writeStreamBasket (S.mapM streamRow outStream) inBasket + where + streamRow (k, PG.PGArray urls) = do + hashes <- mapM (liftIO . resolvePhash manager hashCache) urls + return (k, PG.PGArray hashes) ------------------------------------------------------------------------------ -- | Insert a set of (Ad, phashes) pairs into the `ads` database -doInsert :: PG.Connection -> [(AdId, PG.PGArray (Either T.Text PHash))] -> ResourceT IO () +doInsert :: PG.Connection + -> [(AdId, PG.PGArray (Either T.Text PHash))] + -> ResourceT IO () doInsert dbConn phashes = do let formatEntry hashOrError = case hashOrError of @@ -169,7 +162,6 @@ doInsert dbConn phashes = do liftIO $ putStrLn $ "Writing " ++ show n ++ " records" - ------------------------------------------------------------------------------ -- | For a URL, get the phash by either of two means: -- 1. Find it in the supplied cache @@ -210,34 +202,3 @@ resolvePhash manager hashCacheTVar url = do atomically $ modifyTVar hashCacheTVar (LRU.insert url hash) return $ hash - - ------------------------------------------------------------------------------- --- | **Old** Yesterday's implemetation here was single-threaded in the worker --- Take a LRU cache in a TVar (for sharing between threads), and --- return a callback appropriate for use by @withBuffer@. The callback --- receives URLs from the @Stream (Of [(AdId, PGArray T.Text)]) m a@ parameter, --- downloads the images, computes their hashes, batches the results, --- and writes them into the database. --- --- The LRU cache is updated by each record, and will be consulted before --- downloading any URLs or computing their phashes. There is no TTL on --- the cache, so if we want to invalidate it, we need to rerun the query -handleRecords :: PG.Connection - -> HTTP.Manager - -> TVar (LRU.LruCache T.Text (Either T.Text PHash)) - -> Stream (Of (AdId, PG.PGArray T.Text)) (ResourceT IO) () - -> ResourceT IO () -handleRecords dbConn manager hashCache urlStream = do - - liftIO (putStrLn "handleRecords") - - -- For each ads row in the stream, sequence all row's images through phash - let r = S.mapM (\(k, PG.PGArray urls) -> do - hashes <- mapM (liftIO . resolvePhash manager hashCache) urls - return (k, PG.PGArray hashes) - ) urlStream - - -- Collect groups of n rows into chunks for bulk DB insert - -- For now, n == 5 - S.mapM_ (doInsert dbConn) $ S.mapped S.toList $ S.chunksOf 5 r From ca9bfd649490516f2c76e3177d79cad29c4d91d6 Mon Sep 17 00:00:00 2001 From: Greg Hale Date: Sat, 14 Jul 2018 17:45:32 -0400 Subject: [PATCH 3/7] Add search implementation, cli, and report generation --- backend/facebook-ad-image-hashes/default.nix | 4 +- .../facebook-ad-image-hashes.cabal | 5 + .../src/CliOptions.hs | 87 ++++-- .../facebook-ad-image-hashes/src/Queries.hs | 89 +++++- .../facebook-ad-image-hashes/src/Report.hs | 188 +++++++++++++ .../facebook-ad-image-hashes/src/RunCli.hs | 19 +- .../facebook-ad-image-hashes/src/Search.hs | 266 ++++++++++++++++++ 7 files changed, 620 insertions(+), 38 deletions(-) create mode 100644 backend/facebook-ad-image-hashes/src/Report.hs create mode 100644 backend/facebook-ad-image-hashes/src/Search.hs diff --git a/backend/facebook-ad-image-hashes/default.nix b/backend/facebook-ad-image-hashes/default.nix index 70819420..d1549be7 100644 --- a/backend/facebook-ad-image-hashes/default.nix +++ b/backend/facebook-ad-image-hashes/default.nix @@ -1,4 +1,4 @@ -{ mkDerivation, base, bytestring, errors, http-client, http-client-tls, HUnit, lrucaching +{ mkDerivation, aeson, base, blaze-html, bytestring, errors, http-client, http-client-tls, HUnit, kdt, lrucaching , optparse-applicative, c-phash, hs-phash, pkgconfig, postgresql-simple, resourcet, stdenv , stm, streaming, streaming-concurrency , streaming-postgresql-simple, text, zeromq @@ -11,7 +11,7 @@ mkDerivation { isLibrary = true; isExecutable = true; libraryHaskellDepends = [ - base bytestring errors HUnit lrucaching optparse-applicative hs-phash http-client + aeson base blaze-html bytestring errors HUnit kdt lrucaching optparse-applicative hs-phash http-client http-client-tls postgresql-simple resourcet stm streaming streaming-concurrency streaming-postgresql-simple text ]; diff --git a/backend/facebook-ad-image-hashes/facebook-ad-image-hashes.cabal b/backend/facebook-ad-image-hashes/facebook-ad-image-hashes.cabal index 91f3255a..5af5531c 100644 --- a/backend/facebook-ad-image-hashes/facebook-ad-image-hashes.cabal +++ b/backend/facebook-ad-image-hashes/facebook-ad-image-hashes.cabal @@ -18,8 +18,12 @@ cabal-version: >=1.10 library exposed-modules: CliOptions Queries + Report RunCli + Search build-depends: base >=4.9 && <4.11 + , aeson + , blaze-html , bytestring , directory , errors @@ -27,6 +31,7 @@ library , http-client , http-client-tls , HUnit + , kdt , lrucaching , optparse-applicative , phash diff --git a/backend/facebook-ad-image-hashes/src/CliOptions.hs b/backend/facebook-ad-image-hashes/src/CliOptions.hs index 9344ef86..07f78091 100644 --- a/backend/facebook-ad-image-hashes/src/CliOptions.hs +++ b/backend/facebook-ad-image-hashes/src/CliOptions.hs @@ -6,6 +6,7 @@ module CliOptions where import Control.Monad import Control.Exception import Data.Maybe +import Data.List.NonEmpty (NonEmpty(..)) import Data.Monoid import Data.Text (Text) import qualified Data.Text as T @@ -14,37 +15,74 @@ import Options.Applicative import System.Environment import System.IO +import Search + getCommand :: IO Command -getCommand = execParser $ info (phashCommand <**> helper) +getCommand = execParser $ info (pCommand <**> helper) ( fullDesc <> progDesc "Run commands for perceptual hashes against the ads database" ) -data Command = - DbTest PG.ConnectInfo - | DbResetHashes PG.ConnectInfo - | DbPopulateHashes PG.ConnectInfo +data Command + = CmdDbTest PG.ConnectInfo + | CmdResetHashes PG.ConnectInfo + | CmdPopulateHashes PG.ConnectInfo + | CmdSearch SearchOptions deriving (Show) -phashCommand :: Parser Command -phashCommand = hsubparser $ - command "test-db" - (info (DbTest <$> dbConn) - (progDesc "Test connection to the ads database")) - <> command "reset-phashes" - (info (DbResetHashes <$> dbConn) - (progDesc "Clear the phash column in the ads database")) - <> command "populate-phashes" - (info (DbPopulateHashes <$> dbConn) - (progDesc "Compute phashes for images in the ads database")) - --- data DbConnCfg = DbConnCfg --- { dbUser :: Text --- , dbPass :: Text --- , dbHost :: Text --- , dbPort :: Int --- , dbName :: Text --- } deriving (Show) + +pCommand :: Parser Command +pCommand = + (hsubparser $ + command "test-db" + (info (CmdDbTest <$> dbConn) + (progDesc "Test connection to the ads database")) + <> command "reset-phashes" + (info (CmdResetHashes <$> dbConn) + (progDesc "Clear the phash column in the ads database")) + <> command "populate-phashes" + (info (CmdPopulateHashes <$> dbConn) + (progDesc "Compute phashes for images in the ads database")) + <> command "search" + (info (CmdSearch <$> searchParser) + (progDesc "Search for similar images")) + ) + + +searchParser :: Parser SearchOptions +searchParser = SearchOptions + <$> some ( + (fmap Left (strOption (long "filepath" <> help "Filepath to query")) + <|> + fmap (Right . URL) (strOption (long "url" <> help "URL to query")) + ) + ) + <*> searchTypeParser + <*> (fmap Just (option auto (long "cache-file" + <> help "Cache filepath") + ) + <|> pure Nothing) + <*> (( flag' True (long "overwrite-cache") *> + ((\db thr -> OverwriteCache db thr) <$> + dbConn <*> + fmap IdentityGroupingThreshold (option auto (long "threshold")) + ) + ) <|> pure UseCache) + <*> (fmap Just (strOption (long "out" <> help "Generate report in html (with .htm or .html suffix) or json")) <|> pure Nothing) + + +searchTypeParser :: Parser SearchType +searchTypeParser = + fmap SearchKNearest + (option auto (long "k-nearest" <> help "Get the k nearest results")) + <|> (SearchFirstInRanges + <$> option auto (long "range-bounds" <> + help "List of boundaries for concentric ring search") + <*> option auto (long "n-examples" <> + help "Number of examples per ring range") + ) + <|> pure SearchNearest + dbConn :: Parser PG.ConnectInfo dbConn = @@ -70,6 +108,7 @@ dbConn = <> help "Database Name" <> value "facebook_ads") + -- Extra Utilities for allowing CLI parser to sample env vars -- and dotenv files type Env = [(Text, Text)] diff --git a/backend/facebook-ad-image-hashes/src/Queries.hs b/backend/facebook-ad-image-hashes/src/Queries.hs index 9c606193..8ba615c4 100644 --- a/backend/facebook-ad-image-hashes/src/Queries.hs +++ b/backend/facebook-ad-image-hashes/src/Queries.hs @@ -13,6 +13,9 @@ module Queries ( testDb , resetPhashes , populatePhashes + , fetchSortedPhashes + , downloadURLFile + , testConnectInfo -- TODO temporary )where -------------------------------------------------------------------------------- @@ -24,6 +27,7 @@ import Control.Monad.Trans.Resource (ResourceT, runResourceT) import Control.Monad (unless) import qualified Data.ByteString.Lazy as BSL import Data.Int (Int64) +import Data.Maybe (catMaybes) import Data.PHash (PHash(..), imageHash) import qualified Data.Text as T import qualified Database.PostgreSQL.Simple as PG @@ -39,9 +43,11 @@ import Streaming (Stream, Of, chunksOf, import qualified Streaming.Prelude as S import qualified Streaming.Concurrent as S import System.Directory (getTemporaryDirectory, + createDirectoryIfMissing, removePathForcibly) import System.Random (randomRIO) import System.FilePath (pathSeparator) +import Text.Read (readMaybe) ------------------------------------------------------------------------------ -- | Check several assumptions about the database @@ -185,16 +191,17 @@ resolvePhash manager hashCacheTVar url = do Just hash -> return (hash) Nothing -> do - -- Get a filepath - -- TODO: Do this better. Temp filenames must be guaranteed unique and unixy - tmpPath <- - (\dir n -> concat [dir, [pathSeparator], "fbp-image-", show n]) - <$> getTemporaryDirectory - <*> randomRIO @Int (1,100000) + -- -- Get a filepath + -- -- TODO: Do this better. Temp filenames must be guaranteed unique and unixy + -- tmpPath <- + -- (\dir n -> concat [dir, [pathSeparator], "fbp-image-", show n]) + -- <$> getTemporaryDirectory + -- <*> randomRIO @Int (1,100000) - -- TODO: error handling - httpReq <- HTTP.parseRequest (T.unpack url) - liftIO $ BSL.writeFile tmpPath . HTTP.responseBody =<< HTTP.httpLbs httpReq manager + -- -- TODO: error handling + -- httpReq <- HTTP.parseRequest (T.unpack url) + -- liftIO $ BSL.writeFile tmpPath . HTTP.responseBody =<< HTTP.httpLbs httpReq manager + tmpPath <- downloadURLFile manager url hash <- maybe (Left "pHash failure") Right <$> imageHash tmpPath removePathForcibly tmpPath @@ -202,3 +209,67 @@ resolvePhash manager hashCacheTVar url = do atomically $ modifyTVar hashCacheTVar (LRU.insert url hash) return $ hash + +downloadURLFile :: HTTP.Manager -> T.Text -> IO FilePath +downloadURLFile manager url = do + + dir <- getTemporaryDirectory + createDirectoryIfMissing True $ concat [dir, [pathSeparator], "fbp-images"] + + -- Get a filepath + -- TODO: Do this better. Temp filenames must be guaranteed unique and unixy + tmpPath <- + (\n -> concat [dir, [pathSeparator], "fbp-images", [pathSeparator], show n]) + <$> randomRIO @Int (1,100000) + + -- TODO: error handling + httpReq <- HTTP.parseRequest (T.unpack url) + liftIO $ BSL.writeFile tmpPath . HTTP.responseBody =<< HTTP.httpLbs httpReq manager + + return tmpPath + +-- phashURL :: HTTP.Manager -> T.Text -> IO (Either T.Text PHash) +-- phashURL manager url = do +-- imgFile <- downloadURLFile manager url +-- maybe (Left "pHash failure") Right <$> imageHash imgFile + + +-- For each ad, flatten the images and hashes +-- returning the list sorted by phash, with 0's removed +-- +-- For example, if we have ads: +-- +-- id: 111 +-- images: { example.com/1.png, example.com/2.png } +-- phash: { 123, 234 } +-- +-- id: 222 +-- images: { example.com/1.png, example.com/3.png } +-- phash: { 123, , 012 } +-- +-- Then the query returns: +-- +-- phash url +-- 012 example.com/3.png +-- 123 example.com/1.png +-- 123 example.com/1.png +-- 234 example.com/2.png +fetchSortedPhashes :: PG.ConnectInfo -> IO [(PHash, T.Text)] +fetchSortedPhashes cfg = do + + conn <- PG.connect cfg + rs <- PG.query_ conn + [sql| SELECT ars.phash, ars.url + FROM (SELECT unnest(phash) as phash, + unnest(images) as url + FROM ads) ars + WHERE ars.phash != '0' + ORDER BY ars.phash + |] + let readRow (hash, url) = case readMaybe hash of + Nothing -> Nothing + Just w64 -> Just (PHash w64, url) + return . catMaybes $ fmap readRow rs + +testConnectInfo :: PG.ConnectInfo +testConnectInfo = PG.ConnectInfo "localhost" 5432 "fbpac" "password" "fbpac" diff --git a/backend/facebook-ad-image-hashes/src/Report.hs b/backend/facebook-ad-image-hashes/src/Report.hs new file mode 100644 index 00000000..4bbdd1b7 --- /dev/null +++ b/backend/facebook-ad-image-hashes/src/Report.hs @@ -0,0 +1,188 @@ +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Report where + + +import Data.List (groupBy, scanl', sort, + sortBy) +import Data.Semigroup ((<>)) +import qualified Data.Text as T +import Data.Foldable (for_) +import Data.Traversable (for) +import Text.Blaze.Html.Renderer.Pretty +import Text.Blaze.Html5 hiding (head, map) +import qualified Text.Blaze.Html5 as HTML +import qualified Text.Blaze.Html5 hiding (head, map) +import Text.Blaze.Html5.Attributes hiding (for, title) +import Prelude hiding (div) +import Data.PHash (PHash(..), hammingDistance, + imageHash) + +import Search + +------------------------------------------------------------------------------ +-- | Sketch up an html page for the results of various types of searches +htmlReport :: [Either T.Text SearchResults] -> String +htmlReport res = renderHtml $ html $ do + HTML.head $ do + myHead + body $ do + for_ res $ \r -> div ! class_ "query" $ case r of + Left err -> div $ p (toMarkup err) + Right (NearestResult (inp, qHash) (PHash rHash, rURLs)) -> do + div ! class_ "q-input" $ inputPath inp + div ! class_ "result-header" $ do + toMarkup rHash + toMarkup ("Distance: " ++ show (hammingDistance qHash (PHash rHash))) + for_ rURLs (\(URL u) -> img ! src (toValue u)) + Right (KNearestResult (inp, qHash) rs) -> do + div ! class_ "entries" $ do + div $ do + h3 "Query" + div ! class_ "results" $ hashEntry [inp] (qHash) Nothing + div $ do + h3 "Results" + div ! class_ "results" $ for_ rs $ \(rHash, rUrls) -> + hashEntry (Right <$> rUrls) + rHash + (Just $ hammingDistance qHash rHash) + Right (NearestInRanges (inp, qHash) ranges rings) -> do + div $ do + h3 "Query" + div ! class_ "results" $ hashEntry [inp] (qHash) Nothing + div $ do + h3 "Results" + let rangePairs = zip (zip (0:ranges) ranges) rings + for_ rangePairs $ \(dRange, ringResults) -> + div ! class_ "radius-range" $ do + distanceRange dRange + div ! class_ "results" $ for_ ringResults $ \(rHash, rUrls) -> + hashEntry (Right <$> rUrls) + rHash + (Just $ hammingDistance qHash rHash) + + +------------------------------------------------------------------------------ +-- | Render a filepath or url to Html in our report +-- Filepaths are just printed as text. Urls produce thumbnail-sized images +inputPath :: Either FilePath URL -> Html +inputPath (Left fp) = toMarkup $ "Local file: " <> fp +inputPath (Right (URL url)) = img ! src (toValue url) + + +------------------------------------------------------------------------------ +-- | Render a distance range to html +distanceRange :: (Int, Int) -> Html +distanceRange (r0, r1) = + div ! class_ "radius-label" $ + toMarkup ("( " ++ show r0 ++ " - " ++ show r1 ++ " )") + + +------------------------------------------------------------------------------ +-- | Render a query or result image card, optionally with hamming distance +hashEntry :: [Either FilePath URL] -> PHash -> Maybe Int -> Html +hashEntry urls (PHash hash) mDistance = + div ! class_ "result-entry" $ do + div ! class_ "result-header" $ do + div . toMarkup $ hash + maybe mempty (div . toMarkup . ("Distance: " <>) . show) mDistance + div ! class_ "result-pics" $ do + for_ (take 1 urls) $ \inp -> + let (URL url) = assumeUrl "Render hash entry image" inp + in a ! href (toValue url) $ + (img ! src (toValue url) ! class_ "result-image") + div ! class_ "result-links" $ for_ (zip [1..] urls) $ \(i, inp) -> + let (URL url) = assumeUrl "Render hash entry link" inp + in a ! href (toValue url) $ toMarkup (i :: Int) + where + + assumeUrl :: String -> Either FilePath URL -> URL + assumeUrl errorHelper (Left fp) = + error $ errorHelper ++ + "\nViolated expectation that an input was a URL: " ++ fp + assumeUrl _ (Right url) = url + + +------------------------------------------------------------------------------ +-- | Define some inline css +myHead :: Html +myHead = HTML.style . toMarkup $ unlines [ + "body {" + , " background-color: hsl(0,0%,90%);" + , "}" + , "" + , ".query {" + , " display: flex;" + , " flex-direction: column;" + , " margin: 20px;" + , " padding: 20px;" + , " background-color: white;" + , "}" + , "" + , ".result-image, .query-image {" + , " height: 200px;" + , " margin: 0px;" + , "}" + , "" + , ".result-entry {" + , " display: flex;" + , " flex-direction: column;" + , " background-color: white;" + , " margin-right: 5px;" + , " margin-bottom: 5px;" + , "}" + , "" + , ".result-header, .query-header {" + , " display: flex;" + , " background-color: white;" + , "}" + , "" + , ".result-header > div, .query-header > div {" + , " margin: 3px;" + , " font-size: 16pt;" + , "}" + , "" + , ".result-pics {" + , " display: flex;" + , " x-overflow: scroll;" + , " background-color: white;" + , "}" + , "" + , ".result-links > a {" + , " margin: 3 px;" + , " text-decoration-line: none;" + , "}" + , "" + , ".results {" + , " display: flex;" + , " flex-direction: row;" + , " flex-wrap: wrap;" + , " margin: 20px;" + , " background-color: white;" + , "}" + , ".radius-range {" + , " display: flex;" + , " flex-direction: row;" + , " flex-wrap: wrap;" + , " align-items: center;" + , " margin: 0px;" + , "}" + ] + +-- Wireframe to code against +-- +-- Query +-- | 12345678 +-- | |-----| +-- | | | +-- | |-----| +-- +-- Results +-- | 12345679 Distance: 20 +-- | |-----| +-- | | | +-- | |-----| diff --git a/backend/facebook-ad-image-hashes/src/RunCli.hs b/backend/facebook-ad-image-hashes/src/RunCli.hs index 441ae202..a7c025aa 100644 --- a/backend/facebook-ad-image-hashes/src/RunCli.hs +++ b/backend/facebook-ad-image-hashes/src/RunCli.hs @@ -9,15 +9,28 @@ module RunCli ( , main ) where +import Data.List (isSuffixOf) +import System.IO (writeFile) + import CliOptions import Queries +import Search +import Report ------------------------------------------------------------------------------- -- | Passthrough from CLI inputs to queries runCommand :: Command -> IO () -runCommand (DbTest cfg) = testDb cfg -runCommand (DbResetHashes cfg) = resetPhashes cfg -runCommand (DbPopulateHashes cfg) = populatePhashes cfg +-- runCommand = print +runCommand (CmdDbTest cfg) = testDb cfg +runCommand (CmdResetHashes cfg) = resetPhashes cfg +runCommand (CmdPopulateHashes cfg) = populatePhashes cfg +runCommand (CmdSearch opts) = + runSearch opts >>= report (outputFile opts) + + where report Nothing = print + report (Just fp) | ".html" `isSuffixOf` fp = writeFile fp . htmlReport + | ".htm" `isSuffixOf` fp = writeFile fp . htmlReport + | otherwise = print ------------------------------------------------------------------------------- -- | Convenience entrypoint for main diff --git a/backend/facebook-ad-image-hashes/src/Search.hs b/backend/facebook-ad-image-hashes/src/Search.hs new file mode 100644 index 00000000..77e1a8d2 --- /dev/null +++ b/backend/facebook-ad-image-hashes/src/Search.hs @@ -0,0 +1,266 @@ +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE ScopedTypeVariables #-} + +module Search where + +import Data.Bifunctor (first) +import Data.Bits +import Data.Ord (comparing) +import Data.List (scanl', sort, sortBy, groupBy) +import Data.Semigroup ((<>)) +import qualified Data.Text as T +import qualified Data.Aeson as A +import qualified Data.Aeson.Parser as A +import Data.Maybe (listToMaybe) +import qualified Data.ByteString.Lazy as BSL +import Data.PHash (PHash(..), hammingDistance, imageHash) +import Data.Traversable (for) +import Data.Word +import qualified Database.PostgreSQL.Simple as PG +import qualified Data.KdMap.Dynamic as KD +import qualified Network.HTTP.Client as HTTP +import qualified Network.HTTP.Client.TLS as HTTP +import System.Directory (createDirectoryIfMissing, + getTemporaryDirectory, + removePathForcibly) +import System.FilePath (pathSeparator) + +import Queries + +------------------------------------------------------------------------------ +-- | Search Trees are KD-trees. We alias `Int` to `AxisType` in case we want +-- to explore other axis types later +newtype SearchTree = SearchTree + { getSearchTree :: KD.KdMap AxisType PHash [URL] } + deriving (Show) + +type AxisType = Int + + +------------------------------------------------------------------------------ +-- | Convert a PHash into a list of axis values +-- PHash is a 64-bit word, so we will break it into 8 word8's +-- This is probably not a meaningful grouping in the phash +-- space, but at least it it nevertheless gives us a +-- multidimensional space for organizing our phashes +-- +-- TODO: check if such a grouping _actually_ speeds +-- up our searches +-- TODO: This is super sketchy. I don't know how the dimensions of a +-- phash relate to the bits of the hash. So I just assume that each +-- Word8 is a single dimension. It's likely that making this the +-- basis function doesn't line up with the hamming distances +-- between images, so this will break the invariants of kd-trees +-- and result in lookups that fail to find the real best targets +hashToKdSpace :: PHash -> [AxisType] +hashToKdSpace (PHash word64) = map bitRange [0..7] + where + -- To take the nth element of a Word64, + -- 1) Create a Word8-sized mask (255 :: Word64) + -- 2) Shift it to the right by n words + -- 3) Mask the input with it + -- 4) Shift the result to the left by n words + bitRange n = + let iShift = 8 * n + mask = 255 `shiftL` iShift + masked = word64 .&. mask + in fromIntegral $ masked `shiftR` iShift + + +------------------------------------------------------------------------------ +-- | Import a list of (hash,[url]) pairs into a search tree +-- Decoding assumes tha the list is sorted by hash +loadCache :: FilePath -> IO SearchTree +loadCache cacheFile = do + f <- BSL.readFile cacheFile + + -- The empty kdtree defines its point-decomposition function (how to turn a + -- point into a list of dimensions), and custom distance function + let kdTree0 = KD.emptyWithDist + hashToKdSpace + (\x y -> (hammingDistance x y)^2) + + -- Decode the json as an association list of Word64 (raw phash) and [URL] + case A.decode @[(Word64, [URL])] f of + Nothing -> error $ "Failed to decode cache file: " ++ cacheFile + + -- On successful decode, wrap the Word64s in PHash type, import them into + -- a kd-tree, and wrap the result as type @SearchTree@ + Just ps -> return . SearchTree $ KD.batchInsert kdTree0 (fmap (first PHash) ps) + + +------------------------------------------------------------------------------ +-- | Fetch images and phashes out of the `ads` table, fold them into +-- search tree and cache that for use across search requests +-- +-- The cache differs from the images/hash query slightly +-- The database query's type is `[(PHash, URL)]`, and the cache's type +-- is `[(PHash, [URL])]`. We choose a phash similarity threshold, and for +-- consecutive query rows that are less different than the threshold, we +-- collect the URLs under the first entry's phash. +-- When the threshold is 0, the effect is that we deduplicate identical +-- images that appear under different rows from the `ads` table +-- TODO: should we deduplicate further by holding a Set of URLs rather +-- than a list? I think so +generateCache :: PG.ConnectInfo + -> IdentityGroupingThreshold + -> FilePath + -> IO () +generateCache cfg (IdentityGroupingThreshold thr) fp = do + + rs <- fetchSortedPhashes cfg + + -- Helper function defines how to collapse query rows with identical-enough + -- phash into a list of URLs under a single phash + let squashIdentityGroup xs = case xs of + [] -> error + "Impossible case: groupBy gave a self-similar set that's empty" + ((hash1, url1) : rest) -> (hash1, url1: (snd <$> rest)) + + -- group by phash similarity of consequitive rows (the sql query returns + -- rows sorted by phash), and squash the group members together under a + -- single phash + let rs' = fmap (first (\(PHash w64) -> w64)) + $ fmap squashIdentityGroup + $ groupBy (\a b -> hammingDistance (fst a) (fst b) < thr) rs + + BSL.writeFile fp (A.encode rs') + + +------------------------------------------------------------------------------ +-- | Compute default location for cache files, preparing parent directories +-- if necessary +defaultCacheFile :: IO FilePath +defaultCacheFile = do + td <- getTemporaryDirectory + createDirectoryIfMissing True $ concat [td, [pathSeparator], "fbp-cache"] + return $ + concat [td, [pathSeparator], "fbp-cache", [pathSeparator], "hashes"] + + +------------------------------------------------------------------------------ +-- | Toggle for using the existing cache, or generating a new one from +-- the given database and using the given identity grouping threshold +-- (explained in the haddock for @generateCache@) +data SearchCacheAction + = UseCache + | OverwriteCache PG.ConnectInfo IdentityGroupingThreshold + -- ^ Building a new cache requires a database connection + -- and an identity grouping threshold + deriving (Show) + + +------------------------------------------------------------------------------ +-- | Toggle for the type of output to produce +data SearchOutputFormat + = OutputJSON + -- ^ JSON blob + | OutputHTML + -- ^ HTML report + deriving (Eq, Show) + + +------------------------------------------------------------------------------ +-- | Type wrapper over raw text to signal that text is a URL +newtype URL = URL { getUrl :: T.Text } + deriving (Eq, A.FromJSON, A.ToJSON, Show) + + +------------------------------------------------------------------------------ +-- | Type wrapper over an Int to track that it is meant for use as +-- phash grouping threshold (used during search tree generation) +newtype IdentityGroupingThreshold = IdentityGroupingThreshold Int + deriving (Eq, Show) + + +------------------------------------------------------------------------------ +data SearchOptions = SearchOptions + { searchQuery :: [Either FilePath URL] + -- ^ FilePath or URL of images to search for + , searchType :: SearchType + , chacheFile :: Maybe FilePath + -- ^ An overide for the location to store our search tree + , overwriteCache :: SearchCacheAction + , outputFile :: Maybe FilePath + } deriving (Show) + + +------------------------------------------------------------------------------ +data SearchType + = SearchNearest + -- ^ Return the nearest result + | SearchKNearest Int + -- ^ Return the nearest k results + | SearchFirstInRanges [Int] Int + -- ^ Return sample results from the list of distance boundaries, + -- with at most n results per region + deriving (Eq, Show) + + +------------------------------------------------------------------------------ +-- | Wrapper for responses to different search requests +-- Each one contains data from the query, to help with interpreting +-- the results in contexts where the query isn't available +-- TODO: Holding query here data is ugly. Fix this. +data SearchResults = + NearestResult (Either FilePath URL, PHash) (PHash, [URL]) + | KNearestResult (Either FilePath URL, PHash) [(PHash, [URL])] + | NearestInRanges (Either FilePath URL, PHash) [Int] [[(PHash, [URL])]] + deriving (Show) + + +------------------------------------------------------------------------------ +-- | Interpret a @SearchOptions@ as a set of instructions for fetching or +-- regenerating the kd-tree data, then running one of the possible +-- lookups from @SearchType@ (nearest, k-nearest, nearest-in-ranges) +runSearch :: SearchOptions -> IO [Either T.Text SearchResults] +runSearch (SearchOptions queries sType fp overwrite _) = do + + manager <- HTTP.newTlsManager + + -- compute default cache file location, or + -- return the user's if they provided one + cacheFilePath <- maybe defaultCacheFile return fp + + -- If requested, regenerate cache from ads database + case overwrite of + UseCache -> return () + OverwriteCache conn thresh -> generateCache conn thresh cacheFilePath + + SearchTree kdt <- loadCache cacheFilePath + + -- Iterate over all images requested + for queries $ \q -> do + + -- Compute requested image's phash + imgFile :: FilePath <- either return (\(URL url) -> downloadURLFile manager url) q + hashResult <- imageHash imgFile + + return $ case hashResult of + Nothing -> Left $ "pHash error for " <> T.pack imgFile + Just hash -> Right $ case sType of + + -- @SearchNearest@ is handled by a simple call to the kd-tree library + SearchNearest -> NearestResult (q, hash) (KD.nearest kdt hash) + + -- @SearchKNearest@ is handled by simple calls to kd-tree library + SearchKNearest k -> KNearestResult (q, hash) (KD.kNearest kdt k hash) + + -- @SearchFirstInRange@ doesn't use kd-tree the library - instead it emits + -- all contained points into a list sorted by similarity to the query, and + -- consumes elememets from the list according to the requested distance + -- ranges + SearchFirstInRanges bounds k -> + let ranges = sort bounds + hashes = sortBy (comparing (hammingDistance hash . fst)) (KD.assocs kdt) + rings = scanl' (\(ps, lastRing) rad -> + let (thisCandidates, nextCandidates) = + break (\p -> hammingDistance (fst p) hash >= rad) ps + thisRing = take k thisCandidates + in (nextCandidates, thisRing) + ) (hashes, mempty) ranges + results = drop 1 $ fmap snd rings + in NearestInRanges (q, hash) ranges results From d104c92f89f0a2660eb718b668d40121b38abada Mon Sep 17 00:00:00 2001 From: Greg Hale Date: Mon, 16 Jul 2018 11:41:00 -0400 Subject: [PATCH 4/7] Add forgotten file to git repo --- backend/facebook-ad-image-hashes/exec/Main.hs | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 backend/facebook-ad-image-hashes/exec/Main.hs diff --git a/backend/facebook-ad-image-hashes/exec/Main.hs b/backend/facebook-ad-image-hashes/exec/Main.hs new file mode 100644 index 00000000..0c39f6ec --- /dev/null +++ b/backend/facebook-ad-image-hashes/exec/Main.hs @@ -0,0 +1,6 @@ +module Main where + +import qualified RunCli + +main :: IO () +main = RunCli.main From 73d147802cbe178c6a30f4f2ec8d2337ac4e99f3 Mon Sep 17 00:00:00 2001 From: Greg Hale Date: Wed, 18 Jul 2018 19:16:58 -0400 Subject: [PATCH 5/7] Soft failure when image can not be downloaded, extra checks during search --- .../facebook-ad-image-hashes/src/Queries.hs | 47 ++++++++++++------- .../facebook-ad-image-hashes/src/Search.hs | 28 ++++++++--- 2 files changed, 51 insertions(+), 24 deletions(-) diff --git a/backend/facebook-ad-image-hashes/src/Queries.hs b/backend/facebook-ad-image-hashes/src/Queries.hs index 8ba615c4..ed0bb154 100644 --- a/backend/facebook-ad-image-hashes/src/Queries.hs +++ b/backend/facebook-ad-image-hashes/src/Queries.hs @@ -15,6 +15,7 @@ module Queries ( , populatePhashes , fetchSortedPhashes , downloadURLFile + , countImageHashMisalignment , testConnectInfo -- TODO temporary )where @@ -22,7 +23,9 @@ module Queries ( import Control.Concurrent.STM (TVar, atomically, modifyTVar, newTVarIO, readTVar, writeTVar) +import Control.Exception (SomeException, try) import Data.LruCache as LRU +import Data.Semigroup ((<>)) import Control.Monad.Trans.Resource (ResourceT, runResourceT) import Control.Monad (unless) import qualified Data.ByteString.Lazy as BSL @@ -191,26 +194,19 @@ resolvePhash manager hashCacheTVar url = do Just hash -> return (hash) Nothing -> do - -- -- Get a filepath - -- -- TODO: Do this better. Temp filenames must be guaranteed unique and unixy - -- tmpPath <- - -- (\dir n -> concat [dir, [pathSeparator], "fbp-image-", show n]) - -- <$> getTemporaryDirectory - -- <*> randomRIO @Int (1,100000) + dlPath <- downloadURLFile manager url + case dlPath of + Left errMsg -> return $ Left errMsg + Right tmpPath -> do - -- -- TODO: error handling - -- httpReq <- HTTP.parseRequest (T.unpack url) - -- liftIO $ BSL.writeFile tmpPath . HTTP.responseBody =<< HTTP.httpLbs httpReq manager - tmpPath <- downloadURLFile manager url + hash <- maybe (Left "pHash failure") Right <$> imageHash tmpPath + removePathForcibly tmpPath - hash <- maybe (Left "pHash failure") Right <$> imageHash tmpPath - removePathForcibly tmpPath + atomically $ modifyTVar hashCacheTVar (LRU.insert url hash) - atomically $ modifyTVar hashCacheTVar (LRU.insert url hash) + return $ hash - return $ hash - -downloadURLFile :: HTTP.Manager -> T.Text -> IO FilePath +downloadURLFile :: HTTP.Manager -> T.Text -> IO (Either T.Text FilePath) downloadURLFile manager url = do dir <- getTemporaryDirectory @@ -224,9 +220,12 @@ downloadURLFile manager url = do -- TODO: error handling httpReq <- HTTP.parseRequest (T.unpack url) - liftIO $ BSL.writeFile tmpPath . HTTP.responseBody =<< HTTP.httpLbs httpReq manager + dlResult <- try $ HTTP.httpLbs httpReq manager >>= + BSL.writeFile tmpPath . HTTP.responseBody - return tmpPath + return $ case dlResult of + Left (e :: SomeException) -> Left $ "Download Failure on: " <> url + Right _ -> Right tmpPath -- phashURL :: HTTP.Manager -> T.Text -> IO (Either T.Text PHash) -- phashURL manager url = do @@ -271,5 +270,17 @@ fetchSortedPhashes cfg = do Just w64 -> Just (PHash w64, url) return . catMaybes $ fmap readRow rs +countImageHashMisalignment :: PG.ConnectInfo -> IO Int64 +countImageHashMisalignment cfg = do + conn <- PG.connect cfg + [r] <- PG.query_ conn + [sql| SELECT COUNT(*) + FROM (SELECT array_length(images,1) as i, + array_length(phash,1) as p + FROM ads) lengths + WHERE lengths.i != lengths.p + |] + return $ PG.fromOnly r + testConnectInfo :: PG.ConnectInfo testConnectInfo = PG.ConnectInfo "localhost" 5432 "fbpac" "password" "fbpac" diff --git a/backend/facebook-ad-image-hashes/src/Search.hs b/backend/facebook-ad-image-hashes/src/Search.hs index 77e1a8d2..689ed812 100644 --- a/backend/facebook-ad-image-hashes/src/Search.hs +++ b/backend/facebook-ad-image-hashes/src/Search.hs @@ -6,6 +6,9 @@ module Search where +import Control.Error (ExceptT(..), MaybeT(..), + runExceptT, noteT) +import Control.Monad (when) import Data.Bifunctor (first) import Data.Bits import Data.Ord (comparing) @@ -111,7 +114,13 @@ generateCache :: PG.ConnectInfo -> IO () generateCache cfg (IdentityGroupingThreshold thr) fp = do + nMisaligned <- countImageHashMisalignment cfg + when (nMisaligned > 0) $ + error "Database error: Some hash array did not match image array in size" + rs <- fetchSortedPhashes cfg + when (null rs) $ + error "No valid phashes found in database" -- Helper function defines how to collapse query rows with identical-enough -- phash into a list of URLs under a single phash @@ -233,15 +242,22 @@ runSearch (SearchOptions queries sType fp overwrite _) = do SearchTree kdt <- loadCache cacheFilePath -- Iterate over all images requested - for queries $ \q -> do + for queries $ \(q :: Either FilePath URL) -> runExceptT $ do -- Compute requested image's phash - imgFile :: FilePath <- either return (\(URL url) -> downloadURLFile manager url) q - hashResult <- imageHash imgFile + imgFile :: FilePath <- either + return + (\(URL url) -> ExceptT $ downloadURLFile manager url) + q + + -- imgFile <- either return (\(URL url) -> downloadURLFile manager url) q + hash <- noteT "phash error" $ MaybeT $ imageHash imgFile + + return $ case sType of - return $ case hashResult of - Nothing -> Left $ "pHash error for " <> T.pack imgFile - Just hash -> Right $ case sType of + -- return $ case hashResult of + -- Nothing -> Left $ "pHash error for " <> T.pack imgFile + -- Just hash -> case sType of -- @SearchNearest@ is handled by a simple call to the kd-tree library SearchNearest -> NearestResult (q, hash) (KD.nearest kdt hash) From 965ed5aacf12a62a0827613ee11c4b81ac3b7bc8 Mon Sep 17 00:00:00 2001 From: Greg Hale Date: Wed, 18 Jul 2018 19:51:59 -0400 Subject: [PATCH 6/7] Update phash and cache generation and checks Null is the new sentinal value for computing phashes during ETL During search, only non-null-phash rows are considered All considered rows must have images length == phash length Number of considered rows must be greater than 0 --- .../facebook-ad-image-hashes/src/Queries.hs | 21 +++++++++++-------- .../facebook-ad-image-hashes/src/Search.hs | 8 ++++--- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/backend/facebook-ad-image-hashes/src/Queries.hs b/backend/facebook-ad-image-hashes/src/Queries.hs index ed0bb154..98e9d44c 100644 --- a/backend/facebook-ad-image-hashes/src/Queries.hs +++ b/backend/facebook-ad-image-hashes/src/Queries.hs @@ -78,7 +78,7 @@ testDb cfg = do resetPhashes :: PG.ConnectInfo -> IO () resetPhashes cfg = do conn <- PG.connect cfg - r <- PG.execute_ conn "UPDATE ads SET phash = '{}'" + r <- PG.execute_ conn "UPDATE ads SET phash = null" print $ "Updated " ++ show (r :: Int64) ++ " records in ads database" @@ -116,7 +116,7 @@ populatePhashes cfg = do -- Serial stream of incomig ads (S.writeStreamBasket $ - PGStream.stream_ conn "SELECT id, images FROM ads WHERE phash = '{}';") + PGStream.stream_ conn "SELECT id, images FROM ads WHERE phash is null;") -- Serial chunked stream of db updates -- Chunking prevents us from doing a DB query @@ -263,6 +263,7 @@ fetchSortedPhashes cfg = do unnest(images) as url FROM ads) ars WHERE ars.phash != '0' + AND ars.phash IS NOT NULL ORDER BY ars.phash |] let readRow (hash, url) = case readMaybe hash of @@ -270,17 +271,19 @@ fetchSortedPhashes cfg = do Just w64 -> Just (PHash w64, url) return . catMaybes $ fmap readRow rs -countImageHashMisalignment :: PG.ConnectInfo -> IO Int64 +countImageHashMisalignment :: PG.ConnectInfo -> IO [T.Text] countImageHashMisalignment cfg = do conn <- PG.connect cfg - [r] <- PG.query_ conn - [sql| SELECT COUNT(*) - FROM (SELECT array_length(images,1) as i, - array_length(phash,1) as p - FROM ads) lengths + r <- PG.query_ conn + [sql| SELECT id + FROM (SELECT id, + coalesce(array_length(images,1), 0) as i, + coalesce(array_length(phash,1) , 0) as p + FROM ads + WHERE phash IS NOT NULL) lengths WHERE lengths.i != lengths.p |] - return $ PG.fromOnly r + return $ PG.fromOnly <$> r testConnectInfo :: PG.ConnectInfo testConnectInfo = PG.ConnectInfo "localhost" 5432 "fbpac" "password" "fbpac" diff --git a/backend/facebook-ad-image-hashes/src/Search.hs b/backend/facebook-ad-image-hashes/src/Search.hs index 689ed812..e4fcce3a 100644 --- a/backend/facebook-ad-image-hashes/src/Search.hs +++ b/backend/facebook-ad-image-hashes/src/Search.hs @@ -114,9 +114,11 @@ generateCache :: PG.ConnectInfo -> IO () generateCache cfg (IdentityGroupingThreshold thr) fp = do - nMisaligned <- countImageHashMisalignment cfg - when (nMisaligned > 0) $ - error "Database error: Some hash array did not match image array in size" + misaligned <- countImageHashMisalignment cfg + when (not $ null misaligned) $ + error $ + "Database error: Some hash array did not match image array in size: " + ++ show misaligned rs <- fetchSortedPhashes cfg when (null rs) $ From d982cc10ffc36fac805666825497898a4720d5cf Mon Sep 17 00:00:00 2001 From: Greg Hale Date: Fri, 20 Jul 2018 18:23:31 -0400 Subject: [PATCH 7/7] Add explicit type annotations in populate-hashes haskell and sql code Without the annotations, some possible combination of psql, haskell postgres libraries, and libpq was resulting in a type error during the sql query (`phash` treated as `text` rather than `text[]`) --- backend/facebook-ad-image-hashes/src/Queries.hs | 3 ++- backend/server/.env | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/backend/facebook-ad-image-hashes/src/Queries.hs b/backend/facebook-ad-image-hashes/src/Queries.hs index 98e9d44c..2676ae0c 100644 --- a/backend/facebook-ad-image-hashes/src/Queries.hs +++ b/backend/facebook-ad-image-hashes/src/Queries.hs @@ -160,11 +160,12 @@ doInsert dbConn phashes = do let formatEntry hashOrError = case hashOrError of Left err -> err Right (PHash h) -> T.pack (show h) + inserts :: [(AdId, PG.PGArray T.Text)] inserts = fmap (\(k,v) -> (k, fmap formatEntry v)) $ phashes n <- liftIO $ PG.executeMany dbConn [sql| UPDATE ads - SET phash = upd.phash + SET phash = ARRAY[upd.phash] FROM (VALUES (?,?)) as upd(id,phash) WHERE ads.id = upd.id |] inserts diff --git a/backend/server/.env b/backend/server/.env index 1f15aa8b..996ca05d 100644 --- a/backend/server/.env +++ b/backend/server/.env @@ -1,5 +1,5 @@ -DATABASE_URL=postgres://localhost/facebook_ads -TEST_DATABASE_URL=postgres://localhost/facebook_ads_test +DATABASE_URL=postgresql://fbpac:password@localhost:5432/fbpac +TEST_DATABASE_URL=postgres://fbpac:password@localhost:5432/fbpac HOST=0.0.0.0:8080 RUST_LOG=info -ADMIN_PASSWORD=somepasswordok +ADMIN_PASSWORD=password