Skip to content

Commit

Permalink
done
Browse files Browse the repository at this point in the history
  • Loading branch information
antonkorotkov committed Aug 23, 2024
1 parent c38716a commit 2dec87e
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 76 deletions.
2 changes: 2 additions & 0 deletions .env-sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
RABBITMQ_DEFAULT_USER=
RABBITMQ_DEFAULT_PASS=
4 changes: 4 additions & 0 deletions app/.env-sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DB_CONNECTION_STRING=mongodb://mongodb:27017/coinMonitor
BOT_TOKEN=
WCI_KEY=
AMQP_CONNECTION_STRING=amqp://<>:<>@rabbitmq
1 change: 1 addition & 0 deletions app/bot/conversations/addMonitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ module.exports = ({ logger }) => _ => {
telegramId: ctx.chat.id,
coinId: ctx.state.id,
coin: ctx.state.coin,
coinName: ctx.state.name,
lastPrice: ctx.state.price,
threshold: {
type: monitorTypeCtx.match,
Expand Down
4 changes: 2 additions & 2 deletions app/bot/conversations/search.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ module.exports = options => bot => {
});
}

const onAddMonitor = async (id, coin, price, ctx) => {
ctx.state = { id, coin, price };
const onAddMonitor = async (id, coin, price, name, ctx) => {
ctx.state = { id, coin, price, name };
const stats = await ctx.conversation.active();

if (!Object.keys(stats).length)
Expand Down
2 changes: 1 addition & 1 deletion app/bot/keyboards/MarketDetailsKeyboard.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class MarketDetailsKeyboard extends AbstractPersonalizedCache {
if (!market)
return await ctx.reply('Oops... Looks like you were thinking for too long. Try again from search.');

await onAddMonitor(market.getId(), market.getCoin(), market.getPrice(true), ctx);
await onAddMonitor(market.getId(), market.getCoin(), market.getPrice(true), market.getName(), ctx);
}

if (ctx.callbackQuery.data.includes('deleteMonitor')) {
Expand Down
4 changes: 4 additions & 0 deletions app/db/schemas/Monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ const MonitorSchema = new Schema({
type: String,
required: true
},
coinName: {
type: String,
required: true
},
lastPrice: {
type: Number,
required: true
Expand Down
2 changes: 1 addition & 1 deletion app/market/MarketsService.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const round = require("../utils/round");
const Market = require("./Market");

const apiUrl = 'https://www.worldcoinindex.com/apiservice/v2getmarkets';
const fetchInterval = 60_000;
const fetchInterval = 120_000;

class MarketsService {
#log;
Expand Down
67 changes: 52 additions & 15 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,83 @@ services:
container_name: RabbitMQ
image: rabbitmq:management
ports:
- "5672:5672"
- "15672:15672"
- "15677:15672"
environment:
RABBITMQ_DEFAULT_USER:
RABBITMQ_DEFAULT_PASS:
networks:
- app-network
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 5s
timeout: 5s
retries: 5
logging:
driver: json-file
options:
max-size: 1m
max-file: 3

mongodb:
image: mongo:latest
container_name: MongoDB
ports:
- "27017:27017"
- "27077:27017"
networks:
- app-network
volumes:
- mongo-data:/data/db
logging:
driver: json-file
options:
max-size: 1m
max-file: 3

app:
container_name: TelegramBot
depends_on:
- mongodb
- rabbitmq
rabbitmq:
condition: service_healthy
mongodb:
condition: service_started
build:
context: ./app
dockerfile: Dockerfile
environment:
DB_CONNECTION_STRING:
AMQP_CONNECTION_STRING:
BOT_TOKEN:
WCI_KEY:
networks:
- app-network
logging:
driver: json-file
options:
max-size: 1m
max-file: 3

# worker:
# build:
# context: .
# dockerfile: Dockerfile
# environment:
# RABBITMQ_URL: amqp://rabbitmq
# networks:
# - app-network
# deploy:
# replicas: 3
worker:
depends_on:
rabbitmq:
condition: service_healthy
mongodb:
condition: service_started
build:
context: ./worker
dockerfile: Dockerfile
environment:
AMQP_CONNECTION_STRING:
DB_CONNECTION_STRING:
API_TOKEN:
networks:
- app-network
deploy:
replicas: 3
logging:
driver: json-file
options:
max-size: 1m
max-file: 3

networks:
app-network:
Expand Down
3 changes: 3 additions & 0 deletions worker/.env-sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
AMQP_CONNECTION_STRING=amqp://rabbitmq
DB_CONNECTION_STRING=mongodb://mongodb:27017/coinMonitor
API_TOKEN=
52 changes: 44 additions & 8 deletions worker/db/schemas/Monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ const MonitorSchema = new Schema({
type: String,
required: true
},
coinName: {
type: String,
required: true
},
lastPrice: {
type: Number,
required: true
Expand All @@ -32,14 +36,46 @@ const MonitorSchema = new Schema({
}
});

MonitorSchema.statics.createOrUpdate = function ({
telegramId, coinId, ...data
}) {
return this.findOneAndUpdate(
{ telegramId, coinId },
{ $set: data },
{ new: true, upsert: true }
);
/**
* @param {{ type: 'percentage' | 'fixed', value: number }} threshold
* @param {number} lastPrice
* @returns {number}
*/
const getExpectedDiff = ({ type, value }, lastPrice) => {
if (type === 'fixed')
return value;

if (type === 'percentage') {
return lastPrice / 100 * value;
}

return 0;
};

/**
* @param {number} newPrice
* @returns {boolean}
*/
MonitorSchema.methods.shouldNotify = function (newPrice) {
const monitorObject = this.toObject();
const expectedDiff = getExpectedDiff(monitorObject.threshold, monitorObject.lastPrice);

if (!expectedDiff)
return false;

if (Math.abs(newPrice - monitorObject.lastPrice) >= expectedDiff)
return true;

return false;
};

/**
* @param {number} price
*/
MonitorSchema.methods.updateLastPrice = function (price) {
return this.updateOne({
lastPrice: price
});
};

const Monitor = model('Monitor', MonitorSchema);
Expand Down
47 changes: 0 additions & 47 deletions worker/db/schemas/User.js

This file was deleted.

15 changes: 13 additions & 2 deletions worker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,21 @@ const { Api } = require('grammy');
const debug = require('debug')('WORKER');
const { connectToDatabase, mongoose } = require('./db/database');
const Monitor = require('./db/schemas/Monitor');
const round = require('./utils/round');

const amqpConnectionString = process.env.AMQP_CONNECTION_STRING ?? 'amqp://localhost';
const dbConnectionString = process.env.DB_CONNECTION_STRING ?? 'mongodb://localhost:27017/coinMonitor'
const queueName = process.env.AMQP_QUEUE_NAME ?? 'marketChanges';
const apiToken = process.env.API_TOKEN;

const formatMessage = (monitor, price) => {
const dir = price > monitor.lastPrice ? '🟢' : '🔴';
const diff = round(price - monitor.lastPrice);
const change = diff > 0 ? `+$${diff}` : `-$${Math.abs(diff)}`;

return `${dir} ${monitor.coinName} (${monitor.coin}): <code>$${Intl.NumberFormat().format(price)}</code> (${change})`;
};

const start = async () => {
try {
const connection = await amqp.connect(amqpConnectionString);
Expand All @@ -30,12 +39,14 @@ const start = async () => {

const monitors = await Monitor.find({ coinId });

debug('Found: ' + coinId + ' - ' + price);
for (const monitor of monitors) {
await api.sendMessage(monitor.telegramId, "Yo!");
const shouldNotify = monitor.shouldNotify(price);
if (shouldNotify && await api.sendMessage(monitor.telegramId, formatMessage(monitor, price), {parse_mode: 'HTML'}))
await monitor.updateLastPrice(price);
}

channel.ack(msg);
debug('Message processed successfully');
});
} catch (error) {
debug(`Error: ${error.message}`);
Expand Down
5 changes: 5 additions & 0 deletions worker/utils/round.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
const round = num => {
return Math.ceil(num * 10000) / 10000;
}

module.exports = round;

0 comments on commit 2dec87e

Please sign in to comment.