Overview:
- Oracle Database: store user information
- Websocket: realtime tranfer message (See here)
- RabbitMQ: STOMP broker, keep track of subscriptions and broadcasts messages to subscribed users. (Alternation for default in-memory STOMP broker of Spring) (See here)
- Redis: in-memory database, use for store WebSocket Session, Chat Room information and messages. (See here)
- Apache Zookeeper and Apache Kafka (use these tool for study purpose)
- Apache Kafka: (See here)
- Docker
ProducerKafkaConfiguration.java ConsumerKafkaConfiguration.java
- Dynamic create/delete topic, also change Consumer topic at runtime:
@Service
@Slf4j
public class KafkaService {
@Autowired
private AdminClient adminClient;
@Autowired
ConcurrentKafkaListenerContainerFactory<String, Message> listenerContainerFactory;
@Autowired
ConsumerFactory<String, Message> consumerFactory;
ConcurrentMessageListenerContainer<String, Message> listenerContainer;
public void changeTopic(String topic) throws InterruptedException {
log.info("Changing topic to: {}", topic);
if(listenerContainer != null) {
listenerContainer.stop();
Thread.sleep(2000);
listenerContainer.destroy();
Thread.sleep(2000);
}
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setGroupId(RandomStringUtils.randomAlphanumeric(3));
containerProperties.setMessageListener((MessageListener<String, Message>) message -> {
System.out.println("Kafka listener, topic: " + message.topic().toString() + ", message content: " + message.value().getContent());
});
listenerContainer = new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties);
listenerContainer.start();
}
public void createTopic(String topic) {
adminClient.createTopics(List.of(TopicBuilder.name(topic).build()));
}
public void deleteTopic(String topic) {
adminClient.deleteTopics(List.of(topic));
}
}
docker compose up -d