Skip to content

Commit

Permalink
[WIP] FEATURE Implement batching API (#17)
Browse files Browse the repository at this point in the history
* API Implement batching update API
* BUG Stop hitting `_refresh` api with empty batches
* Minor changes to batch index naming
* Add note about order of operations
* Fix spelling issue
  • Loading branch information
tractorcow authored and stevie-mayhew committed Mar 1, 2019
1 parent d8ba994 commit 4adaf18
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 13 deletions.
170 changes: 157 additions & 13 deletions src/ElasticaService.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
namespace Heyday\Elastica;

use Elastica\Client;
use Elastica\Document;
use Elastica\Exception\NotFoundException;
use Elastica\Index;
use Elastica\Query;
use Elastica\Response;
use Exception;
use InvalidArgumentException;
use LogicException;
use Psr\Log\LoggerInterface;
use SilverStripe\Control\Director;
use SilverStripe\Core\ClassInfo;
Expand Down Expand Up @@ -52,14 +55,30 @@ class ElasticaService

public $searchableExtensionClassName;

/**
* Unprocessed batch operations.
* Many-depth array:
* - First level is batch depth (e.g. nested batching)
* - Second level is associative array of types
* - Third level is a pair of keys 'index' (add/update) and 'delete' (remove)
* - Fourth level is the list of documents to index / delete
*
* @var Document[][][][]
*/
protected $batches = [];

const UPDATES = 'updates';

const DELETES = 'deletes';

/**
* ElasticaService constructor.
*
* @param Client $client
* @param string $indexName
* @param Client $client
* @param string $indexName
* @param LoggerInterface|null $logger Increases the memory limit while indexing.
* @param string $indexingMemory A memory limit string, such as "64M".
* @param string $searchableExtensionClassName
* @param string $indexingMemory A memory limit string, such as "64M".
* @param string $searchableExtensionClassName
*/
public function __construct(
Client $client,
Expand Down Expand Up @@ -102,8 +121,8 @@ protected function getIndexConfig()
/**
* Performs a search query and returns either a ResultList (SS template compatible) or an Elastica\ResultSet
* @param \Elastica\Query|string|array $query
* @param array $options Options defined in \Elastica\Search
* @param bool $returnResultList
* @param array $options Options defined in \Elastica\Search
* @param bool $returnResultList
* @return ResultList | \Elastica\ResultSet
*/
public function search($query, $options = null, $returnResultList = true)
Expand Down Expand Up @@ -151,7 +170,7 @@ public function deleteIndex()
* Either creates or updates a record in the index.
*
* @param Searchable|DataObject $record
* @return Response|null
* @return Response|null|bool Return response, or true if batched
* @throws Exception
*/
public function index($record)
Expand All @@ -172,10 +191,17 @@ public function index($record)

try {
$document = $record->getElasticaDocument();
$type = $record->getElasticaType();
$typeName = $record->getElasticaType();
$index = $this->getIndex();

$response = $index->getType($type)->addDocument($document);
// If batching
if ($this->isBatching()) {
$this->batchDocument($typeName, self::UPDATES, $document);
return true;
}

$type = $index->getType($typeName);
$response = $type->addDocument($document);
$index->refresh();

return $response;
Expand All @@ -185,9 +211,119 @@ public function index($record)
}
}

/**
* Detect if we are batching queries
*
* @return bool
*/
protected function isBatching()
{
return !empty($this->batches);
}

/**
* Pause all add / remove operations, batching these at the completion of a user-provided callback.
* For example, you might call batch with a closure that initiates ->index() on 20 records.
* On the conclusion of this closure, those 20 updates will be batched together into a single update
*
* @param callable $callback Callback within which to batch updates
* @param int $documentsProcessed Number of documents processed during this batch
* @return mixed result of $callback
* @throws Exception
*/
public function batch(callable $callback, &$documentsProcessed = 0)
{
try {
$this->batches[] = []; // Increase batch depth one level
return $callback();
} finally {
try {
$batch = array_pop($this->batches);
$documentsProcessed = $this->flushBatch($batch);
} catch (Exception $ex) {
$this->exception($ex);
}
}
}

/**
* Process a batch update
*
* @param Document[][][] $batch List of updates for this batch, grouped by type
* @return int Number of documents updated in this batch
*/
protected function flushBatch($batch)
{
$documentsProcessed = 0;

// process batches
$index = null;
foreach ($batch as $type => $changes) {
$typeObject = null;
foreach ($changes as $action => $documents) {
if (empty($documents)) {
continue;
}
$index = $index ?: $this->getIndex();
$typeObject = $typeObject ?: $index->getType($type);
$documentsProcessed += count($documents);
switch ($action) {
case self::UPDATES:
$typeObject->addDocuments($documents);
break;
case self::DELETES:
try {
$typeObject->deleteDocuments($documents);
} catch (NotFoundException $ex) {
// no-op if not found
}
break;
default:
throw new LogicException("Invalid batch action {$action}");
}
}
}
// Refresh if any documents updated
if ($documentsProcessed && $index) {
$index->refresh();
}
return $documentsProcessed;
}

/**
* Add document to batch query
*
* @param string $type elasticsearch type name
* @param string $action self::DELETES or self::UPDATES
* @param Document $document
*/
protected function batchDocument($type, $action, $document)
{
if (!is_string($type)) {
throw new InvalidArgumentException("Invalid type argument");
}
if (!is_string($action) || !in_array($action, [self::DELETES, self::UPDATES])) {
throw new InvalidArgumentException("Invalid action argument");
}
$batchIndex = count($this->batches) - 1;
// Ensure keys exist
if (!isset($this->batches[$batchIndex][$type])) {
$this->batches[$batchIndex][$type] = [];
}
// Ensure that DELETES occur before UPDATES in all instances
if (!isset($this->batches[$batchIndex][$type][self::DELETES])) {
$this->batches[$batchIndex][$type][self::DELETES] = [];
}
if (!isset($this->batches[$batchIndex][$type][self::UPDATES])) {
$this->batches[$batchIndex][$type][self::UPDATES] = [];
}
// Add document
$this->batches[$batchIndex][$type][$action][] = $document;
}

/**
* @param Searchable|DataObject $record
* @return Response|null
* @return Response|null|bool Response, or true if batched
* @throws Exception
*/
public function remove($record)
Expand All @@ -199,8 +335,16 @@ public function remove($record)

try {
$index = $this->getIndex();
$type = $index->getType($record->getElasticaType());
return $type->deleteDocument($record->getElasticaDocument());
$typeName = $record->getElasticaType();
$document = $record->getElasticaDocument();
// If batching
if ($this->isBatching()) {
$this->batchDocument($typeName, self::DELETES, $document);
return true;
}

$type = $index->getType($typeName);
return $type->deleteDocument($document);
} catch (NotFoundException $ex) {
// If deleted records already were deleted, treat as non-error
return null;
Expand Down Expand Up @@ -285,7 +429,7 @@ public function getIndexedClasses()
* Output message when item is indexed / removed
*
* @param DataObject $record
* @param string $action Action type
* @param string $action Action type
*/
protected function printActionMessage(DataObject $record, $action)
{
Expand Down
13 changes: 13 additions & 0 deletions src/Searchable.php
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,19 @@ public function reIndex($stage = Versioned::LIVE)
$this->updateDependentClasses();
}

/**
* Batch update all documents attached to the index for this record
*
* @param callable $callback
* @param int $documentsProcessed
* @return mixed
* @throws Exception
*/
public function batchIndex(callable $callback, &$documentsProcessed = 0)
{
return $this->service->batch($callback, $documentsProcessed);
}

/**
* Removes the record from the search index.
* @throws Exception
Expand Down

1 comment on commit 4adaf18

@bendubuisson
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tractorcow @stevie-mayhew , awesome work!

Please sign in to comment.