diff --git a/composer.json b/composer.json index faa3c58..ff9acc8 100644 --- a/composer.json +++ b/composer.json @@ -13,6 +13,7 @@ "php": "~7.4.0|~8.0.0|~8.1.0|~8.2.0|~8.3.0", "ext-pcntl": "*", "ext-json": "*", + "ext-mbstring": "*", "symfony/dependency-injection": "^4.3", "symfony/config": "^4.3", "symfony/yaml": "^4.3", @@ -33,7 +34,7 @@ "symfony/property-info": "^4.3", "doctrine/annotations": "^1.8", "ramsey/uuid": "^3.8", - "webgriffe/amp-elasticsearch": "dev-php8-support as 2.1", + "webgriffe/amp-elasticsearch": "^2.1", "pagerfanta/pagerfanta": "^2.4", "symfony/deprecation-contracts": "^2.1", "amphp/http-server-form-parser": "^1.1" diff --git a/esb.yml.sample b/esb.yml.sample index 432348b..67e5ef6 100644 --- a/esb.yml.sample +++ b/esb.yml.sample @@ -30,6 +30,11 @@ services: flows: sample_flow: # The flow "code" and will be the Beanstalkd tube name description: Sample Flow # The flow description + es_index: # Optional: the create/update ElasticSearch index API body (see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html). This is useful if you want to control index mapping, settings and aliases. + settings: # For example you can set the total_fields limit to an higher (or lower) value: + index: + total_fields: + limit: 2000 producer: service: My\Esb\Producer # A producer service ID defined above batch_size: 1200 # Jobs are produced in batches of 1200 jobs. Optional: default is 1000 diff --git a/src/FlowConfiguration.php b/src/FlowConfiguration.php index 47ea2a6..5caa211 100644 --- a/src/FlowConfiguration.php +++ b/src/FlowConfiguration.php @@ -24,6 +24,7 @@ public function getConfigTreeBuilder(): TreeBuilder ->arrayPrototype() ->children() ->scalarNode('description')->isRequired()->cannotBeEmpty()->end() + ->variableNode('es_index')->defaultNull()->end() ->arrayNode('producer') ->children() ->scalarNode('service')->isRequired()->end() diff --git a/src/Model/FlowConfig.php b/src/Model/FlowConfig.php index 874174a..50f1066 100644 --- a/src/Model/FlowConfig.php +++ b/src/Model/FlowConfig.php @@ -102,4 +102,12 @@ public function getProducerBatchSize(): int { return $this->config['producer']['batch_size']; } + + /** + * @return array|null + */ + public function getElasticSearchIndexCreateOrUpdateBody(): ?array + { + return $this->config['es_index'] ?? null; + } } diff --git a/src/Service/ElasticSearch.php b/src/Service/ElasticSearch.php index b614443..9d32ca2 100644 --- a/src/Service/ElasticSearch.php +++ b/src/Service/ElasticSearch.php @@ -91,6 +91,14 @@ public function getClient(): Client return $this->client; } + /** + * @param array|null $createOrUpdateBody + */ + public function setElasticSearchIndex(string $indexName, array $createOrUpdateBody = null): Amp\Promise + { + return $this->client->createOrUpdateIndex($indexName, $createOrUpdateBody); + } + /** * @param JobInterface $job * @param string $indexName diff --git a/src/Service/QueueManager.php b/src/Service/QueueManager.php index f4206ec..ec6248f 100644 --- a/src/Service/QueueManager.php +++ b/src/Service/QueueManager.php @@ -72,6 +72,19 @@ public function __construct( public function boot(): Promise { return call(function () { + if ($this->flowConfig->getElasticSearchIndexCreateOrUpdateBody() !== null) { + yield $this->elasticSearch->setElasticSearchIndex( + $this->flowConfig->getTube(), + $this->flowConfig->getElasticSearchIndexCreateOrUpdateBody() + ); + $this->logger->debug( + 'Successfully set ElasticSearch index', + [ + 'index' => $this->flowConfig->getTube(), + 'body' => $this->flowConfig->getElasticSearchIndexCreateOrUpdateBody() + ] + ); + } //Producer yield $this->beanstalkClient->use($this->flowConfig->getTube());