forked from Outils-PeerTube/listener-rss-aggregator
implementation @database/sqlite + test addapted
This commit is contained in:
@ -1,11 +1,7 @@
|
||||
import EventEmitter from "events";
|
||||
|
||||
import { ListenerRss, ListenerRSSInfos } from "listener-rss";
|
||||
|
||||
interface ManageListenerInfo {
|
||||
timeloop?: number;
|
||||
path?: string;
|
||||
}
|
||||
import { SqliteTools } from "./sqlite-tools";
|
||||
|
||||
/**
|
||||
* Permit to manage a ListenerRSS array, data storage and event aggregation
|
||||
@ -14,83 +10,48 @@ export class ManageListener extends EventEmitter {
|
||||
listenerArray: ListenerRss[] = [];
|
||||
looprunning: boolean = false;
|
||||
|
||||
// manage aggregation
|
||||
timeloop?: number;
|
||||
buffUpdates: any[] = [];
|
||||
buffNewEntries: any[] = [];
|
||||
sqliteDb: SqliteTools;
|
||||
|
||||
constructor(info?: ManageListenerInfo) {
|
||||
constructor(path: string) {
|
||||
super();
|
||||
if (info) {
|
||||
if (info.timeloop) {
|
||||
this.timeloop = info.timeloop;
|
||||
}
|
||||
if (info.path) {
|
||||
const fs = require("fs");
|
||||
const tmp = fs.readFileSync(info.path);
|
||||
const configs: ListenerRSSInfos[] = JSON.parse(tmp);
|
||||
configs.forEach((config) => this.addNewListener(config));
|
||||
}
|
||||
}
|
||||
this.sqliteDb = new SqliteTools(path);
|
||||
}
|
||||
|
||||
async load() {
|
||||
await this.sqliteDb.ensureTableExists();
|
||||
const configs: ListenerRSSInfos[] = await this.sqliteDb.fetchAll();
|
||||
|
||||
configs.forEach((config) => {
|
||||
const newListener = new ListenerRss(config);
|
||||
this.listenerArray.push(newListener);
|
||||
this.settingEvents(newListener);
|
||||
});
|
||||
}
|
||||
|
||||
settingEvents(newListener: ListenerRss): void {
|
||||
if (this.timeloop !== undefined) {
|
||||
newListener.on("update", (obj) => this.buffUpdates.push(obj));
|
||||
newListener.on("newEntries", (obj) => this.buffNewEntries.push(obj));
|
||||
// /!\ todo /!\ threat error with aggregation
|
||||
// newListener.on("error", (err) => this.emit("error", err));
|
||||
} else {
|
||||
newListener.on("update", (obj) => this.emit("update", obj));
|
||||
newListener.on("newEntries", (obj) => this.emit("newEntries", obj));
|
||||
newListener.on("error", (err) => this.emit("error", err));
|
||||
}
|
||||
newListener.on("update", (obj) => this.emit("update", obj));
|
||||
newListener.on("newEntries", (obj) => this.emit("newEntries", obj));
|
||||
newListener.on("error", (err) => this.emit("error", err));
|
||||
}
|
||||
|
||||
addNewListener(info: ListenerRSSInfos): void {
|
||||
async addNewListener(info: ListenerRSSInfos) {
|
||||
const newListener = new ListenerRss(info);
|
||||
this.listenerArray.push(newListener);
|
||||
this.settingEvents(newListener);
|
||||
|
||||
await this.sqliteDb.insertListener(newListener);
|
||||
}
|
||||
|
||||
save(path: string): void {
|
||||
if (path.endsWith(".json")) {
|
||||
const fs = require("fs");
|
||||
fs.writeFileSync(
|
||||
path,
|
||||
JSON.stringify(
|
||||
this.listenerArray.map((listener) => listener.getProperty())
|
||||
)
|
||||
);
|
||||
}
|
||||
async save() {
|
||||
await this.sqliteDb.updateAll(
|
||||
this.listenerArray.map((listener) => listener.getProperty())
|
||||
);
|
||||
}
|
||||
|
||||
startAll(): void {
|
||||
if (!this.looprunning) {
|
||||
this.looprunning = true;
|
||||
this.listenerArray.forEach((listener) => listener.start());
|
||||
|
||||
if (this.timeloop) {
|
||||
const fun: () => void = async () => {
|
||||
this.emit("update", this.buffUpdates);
|
||||
if (this.buffNewEntries.length) {
|
||||
console.log(this.buffNewEntries.length);
|
||||
this.emit("newEntries", this.buffNewEntries);
|
||||
}
|
||||
|
||||
this.buffUpdates = [];
|
||||
this.buffNewEntries = [];
|
||||
};
|
||||
|
||||
(async () => {
|
||||
while (this.looprunning) {
|
||||
await fun();
|
||||
await new Promise((res) =>
|
||||
setTimeout(res, this.timeloop ? this.timeloop * 1000 : 0)
|
||||
);
|
||||
}
|
||||
})();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
71
src/sqlite-tools.ts
Normal file
71
src/sqlite-tools.ts
Normal file
@ -0,0 +1,71 @@
|
||||
import { ListenerRSSInfos } from "listener-rss";
|
||||
|
||||
import connect, { sql } from "@databases/sqlite";
|
||||
|
||||
export class SqliteTools {
|
||||
path?: string;
|
||||
|
||||
constructor(path?: string) {
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
async ensureTableExists() {
|
||||
const db = connect(this.path);
|
||||
|
||||
let req = sql`CREATE TABLE IF NOT EXISTS listeners
|
||||
(
|
||||
address TEXT NOT NULL UNIQUE,
|
||||
customfields TEXT DEFAULT '[]',
|
||||
timeloop INTEGER DEFAULT 300,
|
||||
last_entries_links TEXT DEFAULT '[]',
|
||||
PRIMARY KEY (address),
|
||||
CHECK(timeloop >= 0)
|
||||
);`;
|
||||
|
||||
await db.query(req);
|
||||
await db.dispose();
|
||||
}
|
||||
|
||||
async fetchAll(): Promise<ListenerRSSInfos[]> {
|
||||
const db = connect(this.path);
|
||||
|
||||
let req = sql`SELECT *
|
||||
FROM listeners`;
|
||||
const rows = await db.query(req);
|
||||
await db.dispose();
|
||||
return rows.map((row: any) => ({
|
||||
address: row["address"],
|
||||
customfields: JSON.parse(row["customfields"]),
|
||||
timeloop: row["timeloop"],
|
||||
lastEntriesLinks: JSON.parse(row["last_entries_links"]),
|
||||
}));
|
||||
}
|
||||
|
||||
async insertListener(listener: ListenerRSSInfos) {
|
||||
const db = connect(this.path);
|
||||
|
||||
let req = sql`INSERT INTO listeners (address, timeloop, customfields, last_entries_links)
|
||||
VALUES (${listener.address},
|
||||
${listener.timeloop},
|
||||
${JSON.stringify(listener.customfields)},
|
||||
${JSON.stringify(listener.lastEntriesLinks)})`;
|
||||
await db.query(req);
|
||||
await db.dispose();
|
||||
}
|
||||
|
||||
async updateAll(listeners: ListenerRSSInfos[]) {
|
||||
const db = connect(this.path);
|
||||
|
||||
await db.tx(async (transaction) => {
|
||||
for (const listener of listeners) {
|
||||
let req = sql`UPDATE listeners
|
||||
SET last_entries_links = ${JSON.stringify(
|
||||
listener.lastEntriesLinks
|
||||
)}
|
||||
WHERE address = ${listener.address}`;
|
||||
await transaction.query(req);
|
||||
}
|
||||
});
|
||||
await db.dispose();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user