diff --git a/README.md b/README.md index 85d74ed..5bab642 100644 --- a/README.md +++ b/README.md @@ -80,11 +80,18 @@ flows: initial_polling_interval: 1000 # Optional: initial polling delay that a worker waits when it has to wait for a dependency that is not idle maximum_polling_interval: 60000 # Optional: maximum polling delay that a worker waits when it has to wait for a dependency that is not idle polling_interval_multiplier: 2 # Optional: polling delay increase factor whenever a worker is waiting for a dependency that is not idle - es_index: # Optional: the create/update ElasticSearch index API body (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html). This is useful if you want to control index mapping, settings and aliases. - settings: # For example you can set the total_fields limit to an higher (or lower) value: - index: + es_index_settings: # Optional: the update ElasticSearch index API settings (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html#update-index-settings-api-request-body). This is useful if you want to control index settings. + index: # For example you can set the total_fields limit to an higher (or lower) value + mapping: total_fields: limit: 2000 + es_index_mapping: # Optional: the update ElasticSearch index API mapping (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html#put-mapping-api-request-body). This is useful if you want to control index mapping. + properties: + title: + type: text + es_index_aliases: # Optional: the update ElasticSearch index API aliases (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-add-alias.html#add-alias-api-request-body). This is useful if you want to control index aliases. + my_alias: + is_hidden: true other_flow_1: # ... diff --git a/esb.yml.sample b/esb.yml.sample index 67e5ef6..7682e8e 100644 --- a/esb.yml.sample +++ b/esb.yml.sample @@ -30,11 +30,19 @@ services: flows: sample_flow: # The flow "code" and will be the Beanstalkd tube name description: Sample Flow # The flow description - es_index: # Optional: the create/update ElasticSearch index API body (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html). This is useful if you want to control index mapping, settings and aliases. - settings: # For example you can set the total_fields limit to an higher (or lower) value: - index: - total_fields: - limit: 2000 + es_index_settings: # Optional: the update ElasticSearch index API settings (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html#update-index-settings-api-request-body). This is useful if you want to control index settings. + index: + mapping: + total_fields: + limit: 2000 + es_index_mapping: # Optional: the update ElasticSearch index API mapping (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html#put-mapping-api-request-body). This is useful if you want to control index mapping. + properties: + title: + type: text + es_index_aliases: # Optional: the update ElasticSearch index API aliases (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-add-alias.html#add-alias-api-request-body). This is useful if you want to control index aliases. + my_alias: + is_hidden: true + producer: service: My\Esb\Producer # A producer service ID defined above batch_size: 1200 # Jobs are produced in batches of 1200 jobs. Optional: default is 1000 diff --git a/src/FlowConfiguration.php b/src/FlowConfiguration.php index 5caa211..e1c9e0e 100644 --- a/src/FlowConfiguration.php +++ b/src/FlowConfiguration.php @@ -24,7 +24,9 @@ public function getConfigTreeBuilder(): TreeBuilder ->arrayPrototype() ->children() ->scalarNode('description')->isRequired()->cannotBeEmpty()->end() - ->variableNode('es_index')->defaultNull()->end() + ->variableNode('es_index_settings')->defaultNull()->end() + ->variableNode('es_index_mapping')->defaultNull()->end() + ->variableNode('es_index_aliases')->defaultNull()->end() ->arrayNode('producer') ->children() ->scalarNode('service')->isRequired()->end() diff --git a/src/Model/FlowConfig.php b/src/Model/FlowConfig.php index 50f1066..4d83910 100644 --- a/src/Model/FlowConfig.php +++ b/src/Model/FlowConfig.php @@ -106,8 +106,31 @@ public function getProducerBatchSize(): int /** * @return array|null */ - public function getElasticSearchIndexCreateOrUpdateBody(): ?array + public function getElasticSearchIndexUpdateSettingsBody(): ?array { - return $this->config['es_index'] ?? null; + return $this->config['es_index_settings'] ?? null; + } + + /** + * @return array|null + */ + public function getElasticSearchIndexUpdateMappingBody(): ?array + { + return $this->config['es_index_mapping'] ?? null; + } + + /** + * @return array|null + */ + public function getElasticSearchIndexUpdateAliasesBody(): ?array + { + return $this->config['es_index_aliases'] ?? null; + } + + public function hasAdditionalIndexConfiguration(): bool + { + return $this->getElasticSearchIndexUpdateSettingsBody() !== null || + $this->getElasticSearchIndexUpdateMappingBody() !== null || + $this->getElasticSearchIndexUpdateAliasesBody() !== null; } } diff --git a/src/Service/ElasticSearch.php b/src/Service/ElasticSearch.php index 62aa118..77794f0 100644 --- a/src/Service/ElasticSearch.php +++ b/src/Service/ElasticSearch.php @@ -92,11 +92,49 @@ public function getClient(): Client } /** - * @param array|null $createOrUpdateBody + * @return Amp\Promise */ - public function setElasticSearchIndex(string $indexName, array $createOrUpdateBody = null): Amp\Promise + public function indexExists(string $indexName): Amp\Promise { - return $this->client->createOrUpdateIndex($indexName, $createOrUpdateBody); + return Amp\call(function () use ($indexName) { + try { + $response = yield $this->client->existsIndex($indexName); + if ($response->getStatusCode() === 200) { + return true; + } + return false; + } catch (Error $error) { + if ($error->getCode() === 404) { + return false; + } + throw $error; + } + }); + } + + /** + * @param array|null $indexName + */ + public function setElasticSearchIndex(string $indexName): Amp\Promise + { + return $this->client->createIndex($indexName); + } + + + public function setElasticSearchIndexSettings(string $indexName, array $updateSettingsBody = null): Amp\Promise + { + return $this->client->updateIndexSettings($indexName, $updateSettingsBody); + } + + + public function setElasticSearchIndexMapping(string $indexName, array $updateMappingBody = null): Amp\Promise + { + return $this->client->updateMappings($indexName, $updateMappingBody); + } + + public function setElasticSearchIndexAlias(string $indexName, string $aliasName, array $aliasBody = null): Amp\Promise + { + return $this->client->createOrUpdateAlias($indexName, $aliasName, $aliasBody); } /** diff --git a/src/Service/QueueManager.php b/src/Service/QueueManager.php index 8570192..1ce13bb 100644 --- a/src/Service/QueueManager.php +++ b/src/Service/QueueManager.php @@ -73,18 +73,62 @@ public function __construct( public function boot(): Promise { return call(function () { - if ($this->flowConfig->getElasticSearchIndexCreateOrUpdateBody() !== null) { - yield $this->elasticSearch->setElasticSearchIndex( - $this->flowConfig->getTube(), - $this->flowConfig->getElasticSearchIndexCreateOrUpdateBody() - ); - $this->logger->debug( - 'Successfully set ElasticSearch index', - [ - 'index' => $this->flowConfig->getTube(), - 'body' => $this->flowConfig->getElasticSearchIndexCreateOrUpdateBody() - ] - ); + if ($this->flowConfig->hasAdditionalIndexConfiguration()) { + $indexExists = yield $this->elasticSearch->indexExists($this->flowConfig->getTube()); + if (!$indexExists) { + yield $this->elasticSearch->setElasticSearchIndex($this->flowConfig->getTube()); + $this->logger->debug( + 'Successfully created ElasticSearch index', + ['index' => $this->flowConfig->getTube()] + ); + } + + if ($this->flowConfig->getElasticSearchIndexUpdateSettingsBody()) { + yield $this->elasticSearch->setElasticSearchIndexSettings( + $this->flowConfig->getTube(), + $this->flowConfig->getElasticSearchIndexUpdateSettingsBody() + ); + $this->logger->debug( + 'Successfully set ElasticSearch index settings', + [ + 'index' => $this->flowConfig->getTube(), + 'body' => $this->flowConfig->getElasticSearchIndexUpdateSettingsBody() + ] + ); + } + + if ($this->flowConfig->getElasticSearchIndexUpdateMappingBody()) { + yield $this->elasticSearch->setElasticSearchIndexMapping( + $this->flowConfig->getTube(), + $this->flowConfig->getElasticSearchIndexUpdateMappingBody() + ); + $this->logger->debug( + 'Successfully set ElasticSearch index mapping', + [ + 'index' => $this->flowConfig->getTube(), + 'body' => $this->flowConfig->getElasticSearchIndexUpdateMappingBody() + ] + ); + } + + if ($this->flowConfig->getElasticSearchIndexUpdateAliasesBody()) { + $elasticSearchIndexUpdateAliases = $this->flowConfig->getElasticSearchIndexUpdateAliasesBody(); + foreach ($elasticSearchIndexUpdateAliases as $aliasName => $aliasBody) { + yield $this->elasticSearch->setElasticSearchIndexAlias( + $this->flowConfig->getTube(), + $aliasName, + $aliasBody + ); + $this->logger->debug( + 'Successfully set ElasticSearch index alias', + [ + 'index' => $this->flowConfig->getTube(), + 'alias' => $aliasName, + 'body' => $aliasBody + ] + ); + } + } } //Producer yield $this->beanstalkClient->use($this->flowConfig->getTube()); diff --git a/tests/Integration/ElasticSearchIndexingTest.php b/tests/Integration/ElasticSearchIndexingTest.php index b81660e..e1dcc00 100644 --- a/tests/Integration/ElasticSearchIndexingTest.php +++ b/tests/Integration/ElasticSearchIndexingTest.php @@ -226,6 +226,61 @@ function ($log) { $this->assertCount(1, $successfullyIndexedLog); } + /** + * @test + */ + public function itUpdatesIndexSettingsAccordingToFlowConfigTest() + { + $producerDir = vfsStream::url('root/producer_dir'); + $workerFile = vfsStream::url('root/worker.data'); + self::createKernel([ + 'services' => [ + DummyFilesystemRepeatProducer::class => ['arguments' => [$producerDir]], + DummyFilesystemWorker::class => ['arguments' => [$workerFile]], + ], + 'flows' => [ + self::FLOW_CODE => [ + 'description' => 'ElasticSearch Indexing Test Repeat Flow', + 'es_index_settings' => ['index' => ['mapping' => ['total_fields' => ['limit' => 2000]]]], + 'producer' => ['service' => DummyFilesystemRepeatProducer::class], + 'worker' => ['service' => DummyFilesystemWorker::class], + ] + ] + ]); + mkdir($producerDir); + Loop::delay( + 200, + function () use ($producerDir) { + $veryLargeDocument = json_encode(array_fill_keys(range(1, 1001), 'value')); + file_put_contents($producerDir . DIRECTORY_SEPARATOR . 'job1', $veryLargeDocument); + touch($producerDir . DIRECTORY_SEPARATOR . 'job2'); + } + ); + $this->stopWhen(function () { + $successLog = array_filter( + $this->logHandler()->getRecords(), + function ($log) { + return strpos($log['message'], 'Successfully worked a Job') !== false; + } + ); + return count($successLog) >= 2; + }); + self::$kernel->boot(); + + Promise\wait($this->esClient->refresh()); + $search = Promise\wait($this->esClient->uriSearchOneIndex(self::FLOW_CODE, '')); + $this->assertCount(1, $search['hits']['hits']); + $this->assertFalse($this->logHandler()->hasErrorThatContains('Job could not be indexed in ElasticSearch')); + $logRecords = $this->logHandler()->getRecords(); + $successfullyIndexedLog = array_filter( + $logRecords, + function ($log) { + return strpos($log['message'], 'Successfully enqueued a new Job') !== false; + } + ); + $this->assertCount(2, $successfullyIndexedLog); + } + private function assertForEachJob(callable $callable, array $jobsData) { /** @var Serializer $serializer */