diff --git a/.env-sample b/.env-sample new file mode 100644 index 0000000..209307e --- /dev/null +++ b/.env-sample @@ -0,0 +1,2 @@ +RABBITMQ_DEFAULT_USER= +RABBITMQ_DEFAULT_PASS= \ No newline at end of file diff --git a/app/.env-sample b/app/.env-sample new file mode 100644 index 0000000..9e98f46 --- /dev/null +++ b/app/.env-sample @@ -0,0 +1,4 @@ +DB_CONNECTION_STRING=mongodb://mongodb:27017/coinMonitor +BOT_TOKEN= +WCI_KEY= +AMQP_CONNECTION_STRING=amqp://<>:<>@rabbitmq \ No newline at end of file diff --git a/app/bot/conversations/addMonitor.js b/app/bot/conversations/addMonitor.js index b8b2cc4..a49222b 100644 --- a/app/bot/conversations/addMonitor.js +++ b/app/bot/conversations/addMonitor.js @@ -49,6 +49,7 @@ module.exports = ({ logger }) => _ => { telegramId: ctx.chat.id, coinId: ctx.state.id, coin: ctx.state.coin, + coinName: ctx.state.name, lastPrice: ctx.state.price, threshold: { type: monitorTypeCtx.match, diff --git a/app/bot/conversations/search.js b/app/bot/conversations/search.js index 8a732fa..502e3d6 100644 --- a/app/bot/conversations/search.js +++ b/app/bot/conversations/search.js @@ -27,8 +27,8 @@ module.exports = options => bot => { }); } - const onAddMonitor = async (id, coin, price, ctx) => { - ctx.state = { id, coin, price }; + const onAddMonitor = async (id, coin, price, name, ctx) => { + ctx.state = { id, coin, price, name }; const stats = await ctx.conversation.active(); if (!Object.keys(stats).length) diff --git a/app/bot/keyboards/MarketDetailsKeyboard.js b/app/bot/keyboards/MarketDetailsKeyboard.js index edcb3d6..20512d8 100644 --- a/app/bot/keyboards/MarketDetailsKeyboard.js +++ b/app/bot/keyboards/MarketDetailsKeyboard.js @@ -19,7 +19,7 @@ class MarketDetailsKeyboard extends AbstractPersonalizedCache { if (!market) return await ctx.reply('Oops... Looks like you were thinking for too long. Try again from search.'); - await onAddMonitor(market.getId(), market.getCoin(), market.getPrice(true), ctx); + await onAddMonitor(market.getId(), market.getCoin(), market.getPrice(true), market.getName(), ctx); } if (ctx.callbackQuery.data.includes('deleteMonitor')) { diff --git a/app/db/schemas/Monitor.js b/app/db/schemas/Monitor.js index 9923b49..b3dbbf2 100644 --- a/app/db/schemas/Monitor.js +++ b/app/db/schemas/Monitor.js @@ -15,6 +15,10 @@ const MonitorSchema = new Schema({ type: String, required: true }, + coinName: { + type: String, + required: true + }, lastPrice: { type: Number, required: true diff --git a/app/market/MarketsService.js b/app/market/MarketsService.js index 9257c4b..b616209 100644 --- a/app/market/MarketsService.js +++ b/app/market/MarketsService.js @@ -2,7 +2,7 @@ const round = require("../utils/round"); const Market = require("./Market"); const apiUrl = 'https://www.worldcoinindex.com/apiservice/v2getmarkets'; -const fetchInterval = 60_000; +const fetchInterval = 120_000; class MarketsService { #log; diff --git a/docker-compose.yml b/docker-compose.yml index 1c74b74..8d7d785 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,46 +3,83 @@ services: container_name: RabbitMQ image: rabbitmq:management ports: - - "5672:5672" - - "15672:15672" + - "15677:15672" + environment: + RABBITMQ_DEFAULT_USER: + RABBITMQ_DEFAULT_PASS: networks: - app-network + healthcheck: + test: rabbitmq-diagnostics -q ping + interval: 5s + timeout: 5s + retries: 5 + logging: + driver: json-file + options: + max-size: 1m + max-file: 3 mongodb: image: mongo:latest container_name: MongoDB ports: - - "27017:27017" + - "27077:27017" networks: - app-network volumes: - mongo-data:/data/db + logging: + driver: json-file + options: + max-size: 1m + max-file: 3 app: container_name: TelegramBot depends_on: - - mongodb - - rabbitmq + rabbitmq: + condition: service_healthy + mongodb: + condition: service_started build: context: ./app dockerfile: Dockerfile environment: DB_CONNECTION_STRING: + AMQP_CONNECTION_STRING: BOT_TOKEN: WCI_KEY: networks: - app-network + logging: + driver: json-file + options: + max-size: 1m + max-file: 3 - # worker: - # build: - # context: . - # dockerfile: Dockerfile - # environment: - # RABBITMQ_URL: amqp://rabbitmq - # networks: - # - app-network - # deploy: - # replicas: 3 + worker: + depends_on: + rabbitmq: + condition: service_healthy + mongodb: + condition: service_started + build: + context: ./worker + dockerfile: Dockerfile + environment: + AMQP_CONNECTION_STRING: + DB_CONNECTION_STRING: + API_TOKEN: + networks: + - app-network + deploy: + replicas: 3 + logging: + driver: json-file + options: + max-size: 1m + max-file: 3 networks: app-network: diff --git a/worker/.env-sample b/worker/.env-sample new file mode 100644 index 0000000..d09da7a --- /dev/null +++ b/worker/.env-sample @@ -0,0 +1,3 @@ +AMQP_CONNECTION_STRING=amqp://rabbitmq +DB_CONNECTION_STRING=mongodb://mongodb:27017/coinMonitor +API_TOKEN= \ No newline at end of file diff --git a/worker/db/schemas/Monitor.js b/worker/db/schemas/Monitor.js index 9923b49..89ad620 100644 --- a/worker/db/schemas/Monitor.js +++ b/worker/db/schemas/Monitor.js @@ -15,6 +15,10 @@ const MonitorSchema = new Schema({ type: String, required: true }, + coinName: { + type: String, + required: true + }, lastPrice: { type: Number, required: true @@ -32,14 +36,46 @@ const MonitorSchema = new Schema({ } }); -MonitorSchema.statics.createOrUpdate = function ({ - telegramId, coinId, ...data -}) { - return this.findOneAndUpdate( - { telegramId, coinId }, - { $set: data }, - { new: true, upsert: true } - ); +/** + * @param {{ type: 'percentage' | 'fixed', value: number }} threshold + * @param {number} lastPrice + * @returns {number} + */ +const getExpectedDiff = ({ type, value }, lastPrice) => { + if (type === 'fixed') + return value; + + if (type === 'percentage') { + return lastPrice / 100 * value; + } + + return 0; +}; + +/** + * @param {number} newPrice + * @returns {boolean} + */ +MonitorSchema.methods.shouldNotify = function (newPrice) { + const monitorObject = this.toObject(); + const expectedDiff = getExpectedDiff(monitorObject.threshold, monitorObject.lastPrice); + + if (!expectedDiff) + return false; + + if (Math.abs(newPrice - monitorObject.lastPrice) >= expectedDiff) + return true; + + return false; +}; + +/** + * @param {number} price + */ +MonitorSchema.methods.updateLastPrice = function (price) { + return this.updateOne({ + lastPrice: price + }); }; const Monitor = model('Monitor', MonitorSchema); diff --git a/worker/db/schemas/User.js b/worker/db/schemas/User.js deleted file mode 100644 index d65c9ed..0000000 --- a/worker/db/schemas/User.js +++ /dev/null @@ -1,47 +0,0 @@ -const mongoose = require('mongoose'); -const { Schema, model } = mongoose; - -const UserSchema = new Schema({ - telegramId: { - type: String, - unique: true, - immutable: true - }, - nickname: { - type: String - }, - firstName: { - type: String - }, - lastName: { - type: String - }, - createdAt: { - type: Date, - default: Date.now - }, - isActive: { - type: Boolean - } -}); - -UserSchema.statics.createOrUpdate = function ({ - telegramId, nickname, firstName, lastName -}) { - return this.findOneAndUpdate( - { telegramId }, - { $set: { nickname, firstName, lastName, isActive: true }, $setOnInsert: { createdAt: Date.now() } }, - { new: true, upsert: true } - ); -}; - -UserSchema.statics.deactivate = function (telegramId) { - return this.findOneAndUpdate( - { telegramId }, - { $set: { isActive: false } } - ); -}; - -const User = model('User', UserSchema); - -module.exports = User; \ No newline at end of file diff --git a/worker/index.js b/worker/index.js index fe51a26..a3c8dda 100644 --- a/worker/index.js +++ b/worker/index.js @@ -4,12 +4,21 @@ const { Api } = require('grammy'); const debug = require('debug')('WORKER'); const { connectToDatabase, mongoose } = require('./db/database'); const Monitor = require('./db/schemas/Monitor'); +const round = require('./utils/round'); const amqpConnectionString = process.env.AMQP_CONNECTION_STRING ?? 'amqp://localhost'; const dbConnectionString = process.env.DB_CONNECTION_STRING ?? 'mongodb://localhost:27017/coinMonitor' const queueName = process.env.AMQP_QUEUE_NAME ?? 'marketChanges'; const apiToken = process.env.API_TOKEN; +const formatMessage = (monitor, price) => { + const dir = price > monitor.lastPrice ? '🟢' : '🔴'; + const diff = round(price - monitor.lastPrice); + const change = diff > 0 ? `+$${diff}` : `-$${Math.abs(diff)}`; + + return `${dir} ${monitor.coinName} (${monitor.coin}): $${Intl.NumberFormat().format(price)} (${change})`; +}; + const start = async () => { try { const connection = await amqp.connect(amqpConnectionString); @@ -30,12 +39,14 @@ const start = async () => { const monitors = await Monitor.find({ coinId }); - debug('Found: ' + coinId + ' - ' + price); for (const monitor of monitors) { - await api.sendMessage(monitor.telegramId, "Yo!"); + const shouldNotify = monitor.shouldNotify(price); + if (shouldNotify && await api.sendMessage(monitor.telegramId, formatMessage(monitor, price), {parse_mode: 'HTML'})) + await monitor.updateLastPrice(price); } channel.ack(msg); + debug('Message processed successfully'); }); } catch (error) { debug(`Error: ${error.message}`); diff --git a/worker/utils/round.js b/worker/utils/round.js new file mode 100644 index 0000000..54f35c7 --- /dev/null +++ b/worker/utils/round.js @@ -0,0 +1,5 @@ +const round = num => { + return Math.ceil(num * 10000) / 10000; +} + +module.exports = round; \ No newline at end of file