Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to set topic message retention time in topic-detail page #420

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions src/main/java/kafdrop/controller/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@
import kafdrop.service.TopicNotFoundException;
import kafdrop.util.MessageFormat;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.stereotype.*;
import org.springframework.ui.*;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.ModelAndView;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -162,4 +162,21 @@ public String createTopic(CreateTopicVO createTopicVO, Model model) {
model.addAttribute("brokersCount", kafkaMonitor.getBrokers().size());
return "topic-create";
}

@ApiOperation(value = "createTopic", notes = "Create topic")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success", response = String.class)
})
@PostMapping(path = "/{name:.+}/set-retention-period", produces = MediaType.APPLICATION_JSON_VALUE)
public ModelAndView createTopic(@PathVariable("name") String topicName,
UpdateTopicRetentionVO updateTopicRetentionVO,
Model model) {
try {
kafkaMonitor.addTopicMessageRetentionPeriodInMs(topicName, updateTopicRetentionVO.getRetentionTime());
} catch (Exception ex) {
model.addAttribute("errorMessage", ex.getMessage());
}

return new ModelAndView("redirect:/topic/" + topicName);
}
}
14 changes: 14 additions & 0 deletions src/main/java/kafdrop/model/UpdateTopicRetentionVO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kafdrop.model;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiParam;
import lombok.Data;
import lombok.RequiredArgsConstructor;

@Data
@RequiredArgsConstructor
@ApiModel("Update topic retention model")
public final class UpdateTopicRetentionVO {
@ApiParam("Retention period in ms")
int retentionTime;
}
21 changes: 21 additions & 0 deletions src/main/java/kafdrop/service/KafkaHighLevelAdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,27 @@ void deleteTopic(String topic) {
}
}

void addTopicMessageRetentionPeriodInMs(String topic, int retentionTimeInMs) {
ConfigResource resource = new ConfigResource(Type.TOPIC, topic);

ConfigEntry configEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionTimeInMs));
AlterConfigOp config = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET);

Collection<AlterConfigOp> alterConfigOps = new ArrayList<>();
alterConfigOps.add(config);

Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
configs.put(resource, alterConfigOps);

try {
adminClient.incrementalAlterConfigs(configs);
LOG.info("Topic {} retention time successfully set to {}ms", topic, retentionTimeInMs);
} catch (Exception e) {
LOG.error("Error while update retention period topic", e);
throw new KafkaAdminClientException(e);
}
}

Collection<AclBinding> listAcls() {
final Collection<AclBinding> aclsBindings;
try {
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/kafdrop/service/KafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,7 @@ SearchResultsVO searchMessages(String topic,
*/
void deleteTopic(String topic);

List<AclVO> getAcls();
void addTopicMessageRetentionPeriodInMs(String topic, int retentionTimeInMs);

List<AclVO> getAcls();
}
5 changes: 5 additions & 0 deletions src/main/java/kafdrop/service/KafkaMonitorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ public void deleteTopic(String topic) {
highLevelAdminClient.deleteTopic(topic);
}

@Override
public void addTopicMessageRetentionPeriodInMs(String topic, int retentionTimeInMs) {
highLevelAdminClient.addTopicMessageRetentionPeriodInMs(topic, retentionTimeInMs);
}

@Override
public List<AclVO> getAcls() {
final var acls = highLevelAdminClient.listAcls();
Expand Down
29 changes: 29 additions & 0 deletions src/main/resources/templates/topic-detail.ftlh
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,24 @@
</tbody>
</table>
</#if>
<form id="update-retention-period-form" action="<@spring.url '/topic/${topic.name}/set-retention-period'/>" method="POST">
<table table table-bordered table-sm small style="margin-top: 20px">
<tbody>
<tr>
<td>Retention period in ms </td>
</tr>
<tr>
<td align="center"><input type="number" name="retentionTime" required></td>
</tr>
</tbody>
</table>
<button style="margin-top: 5px; height: 50%" class="btn btn-success" type="submit"> <i class="fa"></i> Set </button>
</form>
<#if errorMessage??>
<p>Error setting retention period: ${errorMessage}</p>
<#elseif topicName??>
<p>Successfully set retention period</p>
</#if>
</div>

</div>
Expand Down Expand Up @@ -170,6 +188,7 @@
<script>
$(document).ready(function () {
let removalConfirmed = false;
let updateRetentionConfirmed = false;

$('#delete-topic-form').submit(function (event) {
if (!removalConfirmed) {
Expand All @@ -180,5 +199,15 @@
}
}
});

$('#update-retention-period-form').submit(function (event) {
if (!updateRetentionConfirmed) {
event.preventDefault();
if(confirm('Are you sure you want to update the topic retention time?')) {
updateRetentionConfirmed = true;
$('#update-retention-period-form').submit();
}
}
});
});
</script>
Loading