forked from Outils-PeerTube/listener-rss-aggregator
issue 4 and 5
This commit is contained in:
@ -1 +1 @@
|
||||
export { ManageListener } from "./manage-listener";
|
||||
export { ListenerRssAggregator } from "./listener-rss-aggregator";
|
||||
|
98
src/listener-rss-aggregator.ts
Normal file
98
src/listener-rss-aggregator.ts
Normal file
@ -0,0 +1,98 @@
|
||||
import EventEmitter from "events";
|
||||
|
||||
import { ListenerRss, ListenerRSSInfos } from "listener-rss";
|
||||
import { SqliteTools } from "./sqlite-tools";
|
||||
|
||||
/**
|
||||
* Permit to manage a ListenerRSS array, data storage and event aggregation
|
||||
*/
|
||||
export class ListenerRssAggregator extends EventEmitter {
|
||||
private listenerArray: Map<string, ListenerRss> = new Map(); // renomer listenerMap
|
||||
private looprunning: boolean = false;
|
||||
|
||||
private sqliteDb: SqliteTools;
|
||||
|
||||
constructor(path: string) {
|
||||
super();
|
||||
this.sqliteDb = new SqliteTools(path);
|
||||
this.load();
|
||||
}
|
||||
|
||||
async getAllConfigs(): Promise<ListenerRSSInfos[]> {
|
||||
return await this.sqliteDb.fetchAll();
|
||||
}
|
||||
|
||||
private async load() {
|
||||
const configs = await this.getAllConfigs();
|
||||
|
||||
configs.forEach((config) => {
|
||||
this.addNewListener(config);
|
||||
});
|
||||
}
|
||||
|
||||
private addNewListener(info: ListenerRSSInfos): ListenerRss {
|
||||
const newListener = new ListenerRss(info);
|
||||
this.listenerArray.set(newListener.address, newListener);
|
||||
|
||||
newListener.on("update", (obj) => this.emit("update", obj));
|
||||
newListener.on("newEntries", (obj) => this.emit("newEntries", obj));
|
||||
newListener.on("error", (err) => this.emit("error", err));
|
||||
|
||||
return newListener;
|
||||
}
|
||||
|
||||
private removeOldListener(adr: string): void {
|
||||
const oldListener = this.listenerArray.get(adr);
|
||||
|
||||
if (!oldListener) return;
|
||||
|
||||
oldListener.stop();
|
||||
oldListener
|
||||
.removeAllListeners("update")
|
||||
.removeAllListeners("newEntries")
|
||||
.removeAllListeners("error");
|
||||
this.listenerArray.delete(adr);
|
||||
}
|
||||
|
||||
async registerListener(info: ListenerRSSInfos) {
|
||||
if (this.listenerArray.has(info.address)) return;
|
||||
|
||||
const listener = this.addNewListener(info);
|
||||
await this.sqliteDb.insertListener(listener);
|
||||
}
|
||||
|
||||
async unregisterListener(adr: string): Promise<void> {
|
||||
if (!this.listenerArray.has(adr)) return;
|
||||
|
||||
this.removeOldListener(adr);
|
||||
await this.sqliteDb.deleteListener(adr);
|
||||
}
|
||||
|
||||
async saveOverride(expectedConfig: ListenerRSSInfos[]) {
|
||||
const actualConfig = await this.getAllConfigs();
|
||||
|
||||
for (const newItem of expectedConfig.filter(
|
||||
(item) => !actualConfig.includes(item)
|
||||
))
|
||||
this.registerListener(newItem);
|
||||
|
||||
for (const oldItem of actualConfig.filter(
|
||||
(item) => !expectedConfig.includes(item)
|
||||
))
|
||||
this.unregisterListener(oldItem.address);
|
||||
}
|
||||
|
||||
startAll(): void {
|
||||
if (this.looprunning) return;
|
||||
|
||||
this.looprunning = true;
|
||||
for (const item of this.listenerArray.values()) item.start();
|
||||
}
|
||||
|
||||
stopAll(): void {
|
||||
if (!this.looprunning) return;
|
||||
|
||||
this.looprunning = false;
|
||||
for (const item of this.listenerArray.values()) item.stop();
|
||||
}
|
||||
}
|
@ -1,64 +0,0 @@
|
||||
import EventEmitter from "events";
|
||||
|
||||
import { ListenerRss, ListenerRSSInfos } from "listener-rss";
|
||||
import { SqliteTools } from "./sqlite-tools";
|
||||
|
||||
/**
|
||||
* Permit to manage a ListenerRSS array, data storage and event aggregation
|
||||
*/
|
||||
export class ManageListener extends EventEmitter {
|
||||
listenerArray: ListenerRss[] = [];
|
||||
looprunning: boolean = false;
|
||||
|
||||
sqliteDb: SqliteTools;
|
||||
|
||||
constructor(path: string) {
|
||||
super();
|
||||
this.sqliteDb = new SqliteTools(path);
|
||||
}
|
||||
|
||||
async load() {
|
||||
await this.sqliteDb.ensureTableExists();
|
||||
const configs: ListenerRSSInfos[] = await this.sqliteDb.fetchAll();
|
||||
|
||||
configs.forEach((config) => {
|
||||
this.addNewListener(config);
|
||||
});
|
||||
}
|
||||
|
||||
private addNewListener(info: ListenerRSSInfos) : ListenerRss {
|
||||
const newListener = new ListenerRss(info);
|
||||
this.listenerArray.push(newListener);
|
||||
|
||||
newListener.on("update", (obj) => this.emit("update", obj));
|
||||
newListener.on("newEntries", (obj) => this.emit("newEntries", obj));
|
||||
newListener.on("error", (err) => this.emit("error", err));
|
||||
|
||||
return newListener;
|
||||
}
|
||||
|
||||
async registerListener(info: ListenerRSSInfos) {
|
||||
const listener = this.addNewListener(info);
|
||||
await this.sqliteDb.insertListener(listener);
|
||||
}
|
||||
|
||||
async save() {
|
||||
await this.sqliteDb.updateAll(
|
||||
this.listenerArray.map((listener) => listener.getProperty())
|
||||
);
|
||||
}
|
||||
|
||||
startAll(): void {
|
||||
if (!this.looprunning) {
|
||||
this.looprunning = true;
|
||||
this.listenerArray.forEach((listener) => listener.start());
|
||||
}
|
||||
}
|
||||
|
||||
stopAll(): void {
|
||||
if (this.looprunning) {
|
||||
this.looprunning = false;
|
||||
this.listenerArray.forEach((listener) => listener.stop());
|
||||
}
|
||||
}
|
||||
}
|
@ -9,7 +9,9 @@ export class SqliteTools {
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
async withDB<Type>(callback: (db: DatabaseConnection) => Promise<Type>): Promise<Type> {
|
||||
async withDB<Type>(
|
||||
callback: (db: DatabaseConnection) => Promise<Type>
|
||||
): Promise<Type> {
|
||||
const db = connect(this.path);
|
||||
try {
|
||||
return callback(db);
|
||||
@ -29,13 +31,12 @@ export class SqliteTools {
|
||||
PRIMARY KEY (address),
|
||||
CHECK(timeloop >= 0)
|
||||
);`;
|
||||
|
||||
|
||||
await db.query(req);
|
||||
});
|
||||
}
|
||||
|
||||
async fetchAll(): Promise<ListenerRSSInfos[]> {
|
||||
|
||||
const rows = await this.withDB(async (db) => {
|
||||
let req = sql`SELECT *
|
||||
FROM listeners`;
|
||||
@ -61,6 +62,14 @@ export class SqliteTools {
|
||||
});
|
||||
}
|
||||
|
||||
async deleteListener(adr: string) {
|
||||
await this.withDB(async (db) => {
|
||||
let req = sql`DELETE FROM listeners
|
||||
WHERE address = ${adr}`;
|
||||
await db.query(req);
|
||||
});
|
||||
}
|
||||
|
||||
async updateAll(listeners: ListenerRSSInfos[]) {
|
||||
await this.withDB(async (db) => {
|
||||
await db.tx(async (transaction) => {
|
||||
|
Reference in New Issue
Block a user