Skip to content

Commit

Permalink
chore: introduced basic fetch remote missing events
Browse files Browse the repository at this point in the history
  • Loading branch information
ggazzo committed Dec 17, 2024
1 parent 882da23 commit c60f77a
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 26 deletions.
34 changes: 34 additions & 0 deletions packages/homeserver/src/plugins/mongodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ export const routerWithMongodb = (db: Db) =>
);
};

const getEventsByIds = async (roomId: string, eventIds: string[]) => {
return eventsCollection
.find({ "event.room_id": roomId, "event._id": { $in: eventIds } })
.toArray();
};
const getDeepEarliestAndLatestEvents = async (
roomId: string,
earliest_events: string[],
Expand Down Expand Up @@ -165,6 +170,30 @@ export const routerWithMongodb = (db: Db) =>
return id;
};

const createEvent = async (event: EventBase) => {
const id = generateId(event);
await eventsCollection.insertOne({
_id: id,
event,
});

return id;
};

const removeEventFromStaged = async (roomId: string, id: string) => {
await eventsCollection.updateOne(
{ _id: id, "event.room_id": roomId },
{ $unset: { staged: 1 } },
);
};

const getOldestStagedEvent = async (roomId: string) => {
return eventsCollection.findOne(
{ staged: true, "event.room_id": roomId },
{ sort: { "event.origin_server_ts": 1 } },
);
};

return {
serversCollection,
getValidPublicKeyFromLocal,
Expand All @@ -175,7 +204,12 @@ export const routerWithMongodb = (db: Db) =>
getMissingEventsByDeep,
getLastEvent,
getAuthEvents,

removeEventFromStaged,
getEventsByIds,
getOldestStagedEvent,
createStagingEvent,
createEvent,
};
})(),
);
Expand Down
47 changes: 47 additions & 0 deletions packages/homeserver/src/procedures/processPDU.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import type { EventBase } from "@hs/core/src/events/eventBase";
import type { SignedJson } from "../signJson";
import type { HashedEvent } from "../authentication";
import type { EventStore } from "../plugins/mongodb";

export const processPDUsByRoomId = async (
roomId: string,
pdus: SignedJson<HashedEvent<EventBase>>[],
validatePdu: (pdu: SignedJson<HashedEvent<EventBase>>) => Promise<void>,
getEventsByIds: (roomId: string, eventIds: string[]) => Promise<EventStore[]>,
createStagingEvent: (event: EventBase) => Promise<string>,
createEvent: (event: EventBase) => Promise<string>,
processMissingEvents: (roomId: string) => Promise<boolean>,
generateId: (pdu: SignedJson<HashedEvent<EventBase>>) => string,
) => {
const resultPDUs = {} as {
[key: string]: Record<string, unknown>;
};
for (const pdu of pdus) {
const pid = generateId(pdu);
try {
await validatePdu(pdu);
resultPDUs[pid] = {};

const events = await getEventsByIds(roomId, pdu.prev_events);

const missing = pdu.prev_events.filter(
(event) => !events.find((e) => e._id === event),
);

if (!missing.length) {
await createStagingEvent(pdu);

Check warning on line 32 in packages/homeserver/src/procedures/processPDU.ts

View workflow job for this annotation

GitHub Actions / Code Quality Checks(lint, test, tsc)

32 line is not covered with tests
} else {
await createEvent(pdu);
}
} catch (error) {
resultPDUs[pid] = { error } as any;
}
void (async () => {
while (await processMissingEvents(roomId));
})();
}

return {
pdus: resultPDUs,
};
};
32 changes: 29 additions & 3 deletions packages/homeserver/src/routes/federation/sendTransaction.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ describe("/send/:txnId", () => {
createStagingEvent: async () => {
return;
},
getEventsByIds: async () => {
return [];
},
createEvent: async () => {
return;
},
getOldestStagedEvent: async () => {
return;
},
serversCollection: {
findOne: async () => {
return;
Expand Down Expand Up @@ -140,7 +149,9 @@ describe("/send/:txnId", () => {
expect(resp.status).toBe(200);
expect(data).toHaveProperty("pdus");
expect(data.pdus).toStrictEqual({
[id]: {},
pdus: {
[id]: {},
},
});
});

Expand Down Expand Up @@ -188,7 +199,11 @@ describe("/send/:txnId", () => {
expect(resp.status).toBe(200);
expect(data).toHaveProperty("pdus");
expect(data.pdus).toStrictEqual({
[id]: {},
pdus: {
[id]: {
error: {},
},
},
});
});
});
Expand Down Expand Up @@ -238,6 +253,15 @@ describe("/send/:txnId using real case", () => {
createStagingEvent: async () => {
return;
},
getEventsByIds: async () => {
return [];
},
createEvent: async () => {
return;
},
getOldestStagedEvent: async () => {
return;
},
serversCollection: {
findOne: async () => {
return;
Expand Down Expand Up @@ -307,7 +331,9 @@ describe("/send/:txnId using real case", () => {
expect(resp.status).toBe(200);
expect(data).toHaveProperty("pdus");
expect(data.pdus).toStrictEqual({
[`${id}`]: {},
pdus: {
[`${id}`]: {},
},
});
});
});
Expand Down
111 changes: 88 additions & 23 deletions packages/homeserver/src/routes/federation/sendTransaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,32 @@ import {
import { isConfigContext } from "../../plugins/isConfigContext";
import { MatrixError } from "../../errors";
import { isRoomMemberEvent } from "@hs/core/src/events/m.room.member";
import { makeRequest } from "../../makeRequest";
import { isMutexContext, routerWithMutex } from "../../plugins/mutex";
import { processPDUsByRoomId } from "../../procedures/processPDU";

export const sendTransactionRoute = new Elysia().put(
"/send/:txnId",
async ({ params, body, ...context }) => {
export const sendTransactionRoute = new Elysia()
.use(routerWithMutex)
.put("/send/:txnId", async ({ params, body, ...context }) => {
if (!isConfigContext(context)) {
throw new Error("No config context");

Check warning on line 28 in packages/homeserver/src/routes/federation/sendTransaction.ts

View workflow job for this annotation

GitHub Actions / Code Quality Checks(lint, test, tsc)

28 line is not covered with tests
}
if (!isMongodbContext(context)) {
throw new Error("No mongodb context");

Check warning on line 31 in packages/homeserver/src/routes/federation/sendTransaction.ts

View workflow job for this annotation

GitHub Actions / Code Quality Checks(lint, test, tsc)

31 line is not covered with tests
}
if (!isMutexContext(context)) {
throw new Error("No mutex context");

Check warning on line 34 in packages/homeserver/src/routes/federation/sendTransaction.ts

View workflow job for this annotation

GitHub Actions / Code Quality Checks(lint, test, tsc)

34 line is not covered with tests
}

const {
config,
mongo: { eventsCollection, createStagingEvent },
mongo: {
getEventsByIds,
createStagingEvent,
createEvent,
removeEventFromStaged,
getOldestStagedEvent,
},
} = context;

const { pdus, edus = [] } = body as any;
Expand All @@ -39,7 +51,6 @@ export const sendTransactionRoute = new Elysia().put(
throw new MatrixError("400", "Too many edus");
}

console.log("1");
const isValidPDU = (
pdu: any,
): pdu is SignedJson<HashedEvent<EventBase>> => {
Expand Down Expand Up @@ -160,27 +171,81 @@ export const sendTransactionRoute = new Elysia().put(
}
};

const resultPDUs = {} as {
[key: string]: Record<string, unknown>;
/**
* Based on the fetched events from the remote server, we check if there are any new events (that haven't been stored yet)
* @param fetchedEvents
* @returns
*/

const getNewEvents = async (
roomId: string,
fetchedEvents: EventBase[],
) => {
const fetchedEventsIds = fetchedEvents.map(generateId);
const storedEvents = await getEventsByIds(roomId, fetchedEventsIds);
return fetchedEvents
.filter(
(event) => !storedEvents.find((e) => e._id === generateId(event)),
)
.sort((a, b) => a.depth - b.depth);
};

for (const [roomId, pdus] of pdusByRoomId) {
// const roomVersion = getRoomVersion
for (const pdu of pdus) {
try {
await validatePdu(pdu);
resultPDUs[`${generateId(pdu)}`] = {};
void createStagingEvent(pdu);
} catch (e) {
console.error("error validating pdu", e);
resultPDUs[`${generateId(pdu)}`] = e as any;
}
const processMissingEvents = async (roomId: string) => {
const lock = await context.mutex.request(roomId);
if (!lock) {
return false;
}
const event = await getOldestStagedEvent(roomId);

if (!event) {
return false;
}
}

return {
pdus: resultPDUs,
const { _id: pid, event: pdu } = event;

const fetchedEvents = await makeRequest({
method: "POST",
domain: pdu.origin,
uri: `/_matrix/federation/v1/get_missing_events/${pdu.room_id}`,
body: {
earliest_events: pdu.prev_events,
latest_events: [pid],
limit: 10,
min_depth: 10,
},
signingName: config.name,
});

const newEvents = await getNewEvents(roomId, fetchedEvents.events);
// in theory, we have all the new events
await removeEventFromStaged(roomId, pid);

for await (const event of newEvents) {
await createStagingEvent(event);
}

await lock.release();

return true;
};
const result = {
pdus: {},
};
for await (const [roomId, pdus] of pdusByRoomId) {
Object.assign(
result.pdus,
await processPDUsByRoomId(
roomId,
pdus,
validatePdu,
getEventsByIds,
createStagingEvent,
createEvent,
processMissingEvents,
generateId,
),
);
}
return result;
}
},
);
});

0 comments on commit c60f77a

Please sign in to comment.