Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 65 additions & 21 deletions lib/FredyPipelineExecutioner.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@ import { distanceMeters } from './services/listings/distanceCalculator.js';
import { getUserSettings } from './services/storage/settingsStorage.js';
import { updateListingDistance } from './services/storage/listingsStorage.js';
import booleanPointInPolygon from '@turf/boolean-point-in-polygon';
import { extractNumber } from './utils/extract-number.js';

/**
* @typedef {Object} Listing
* @property {string} id Stable unique identifier (hash) of the listing.
* @property {string} title Title or headline of the listing.
* @property {string} [address] Optional address/location text.
* @property {string} [price] Optional price text/value.
* @property {string} [size] Optional size text/value.
* @property {string} [rooms] Optional number of rooms text/value.
* @property {string} [url] Link to the listing detail page.
* @property {any} [meta] Provider-specific additional metadata.
* @property {number | null} [roomsInt] Optional number of rooms.
* @property {number | null} [sizeInt] Optional size of the listing.
* @property {number | null} [priceInt] Optional price of the listing.
*/

/**
Expand All @@ -48,7 +54,9 @@ import booleanPointInPolygon from '@turf/boolean-point-in-polygon';
* 5) Identify new listings (vs. previously stored hashes)
* 6) Persist new listings
* 7) Filter out entries similar to already seen ones
* 8) Dispatch notifications
* 8) Filter out entries that do not match the job's specFilter
* 9) Filter out entries that do not match the job's spatialFilter
* 10) Dispatch notifications
*/
class FredyPipelineExecutioner {
/**
Expand All @@ -62,20 +70,25 @@ class FredyPipelineExecutioner {
* @param {string} providerConfig.crawlContainer CSS selector for the container holding listing items.
* @param {(raw:any)=>Listing} providerConfig.normalize Function to convert raw scraped data into a Listing shape.
* @param {(listing:Listing)=>boolean} providerConfig.filter Function to filter out unwanted listings.
*
* @param {Object} job Job configuration.
* @param {string} job.id Job ID.
* @param {Object} job.notificationAdapter Notification configuration passed to notification adapters.
* @param {Object | null} job.spatialFilter Optional spatial filter configuration.
* @param {Object | null} job.specFilter Optional listing specifications (minRooms, minSize, maxPrice).
*
* @param {(url:string, waitForSelector?:string)=>Promise<void>|Promise<Listing[]>} [providerConfig.getListings] Optional override to fetch listings.
* @param {Object} notificationConfig Notification configuration passed to notification adapters.
* @param {Object} spatialFilter Optional spatial filter configuration.
* @param {string} providerId The ID of the provider currently in use.
* @param {string} jobKey Key of the job that is currently running (from within the config).
* @param {SimilarityCache} similarityCache Cache instance for checking similar entries.
* @param browser
*/
constructor(providerConfig, notificationConfig, spatialFilter, providerId, jobKey, similarityCache, browser) {
constructor(providerConfig, job, providerId, similarityCache, browser) {
this._providerConfig = providerConfig;
this._notificationConfig = notificationConfig;
this._spatialFilter = spatialFilter;
this._jobNotificationConfig = job.notificationAdapter;
this._jobKey = job.id;
this._jobSpecFilter = job.specFilter;
this._jobSpatialFilter = job.spatialFilter;
this._providerId = providerId;
this._jobKey = jobKey;
this._similarityCache = similarityCache;
this._browser = browser;
}
Expand All @@ -96,6 +109,7 @@ class FredyPipelineExecutioner {
.then(this._save.bind(this))
.then(this._calculateDistance.bind(this))
.then(this._filterBySimilarListings.bind(this))
.then(this._filterBySpecs.bind(this))
.then(this._filterByArea.bind(this))
.then(this._notify.bind(this))
.catch(this._handleError.bind(this));
Expand Down Expand Up @@ -128,16 +142,15 @@ class FredyPipelineExecutioner {
* @returns {Promise<Listing[]>} Resolves with listings that are within the area (or not filtered if no area is set).
*/
_filterByArea(newListings) {
const polygonFeatures = this._spatialFilter?.features?.filter((f) => f.geometry?.type === 'Polygon');
const polygonFeatures = this._jobSpatialFilter?.features?.filter((f) => f.geometry?.type === 'Polygon');

// If no area filter is set, return all listings
if (!polygonFeatures?.length) {
return newListings;
}

const filteredIds = [];
// Filter listings by area - keep only those within the polygon
const keptListings = newListings.filter((listing) => {
const filteredListings = newListings.filter((listing) => {
// If listing doesn't have coordinates, keep it (don't filter out)
if (listing.latitude == null || listing.longitude == null) {
return true;
Expand All @@ -147,18 +160,34 @@ class FredyPipelineExecutioner {
const point = [listing.longitude, listing.latitude]; // GeoJSON format: [lon, lat]
const isInPolygon = polygonFeatures.some((feature) => booleanPointInPolygon(point, feature));

if (!isInPolygon) {
filteredIds.push(listing.id);
}

return isInPolygon;
});

if (filteredIds.length > 0) {
deleteListingsById(filteredIds);
return filteredListings;
}

/**
* Filter listings based on its specifications (minRooms, minSize, maxPrice).
*
* @param {Listing[]} newListings New listings to filter.
* @returns {Promise<Listing[]>} Resolves with listings that pass the specification filters.
*/
_filterBySpecs(newListings) {
const { minRooms, minSize, maxPrice } = this._jobSpecFilter || {};

// If no specs are set, return all listings
if (!minRooms && !minSize && !maxPrice) {
return newListings;
}

return keptListings;
const filtered = newListings.filter((listing) => {
if (minRooms && listing.roomsInt && listing.roomsInt < minRooms) return false;
if (minSize && listing.sizeInt && listing.sizeInt < minSize) return false;
if (maxPrice && listing.priceInt && listing.priceInt > maxPrice) return false;
return true;
});

return filtered;
}

/**
Expand Down Expand Up @@ -195,7 +224,16 @@ class FredyPipelineExecutioner {
* @returns {Listing[]} Normalized listings.
*/
_normalize(listings) {
return listings.map(this._providerConfig.normalize);
return listings.map((listing) => {
const normalized = this._providerConfig.normalize(listing);
// TODO: every provider should return price, size and rooms in type number. This makes it more strong and strict of the provider output. String formats like "m², Zi,..." should not be part and can be added on fe or massages. Move this logic into the provider-specific normalize function.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make sure this is done in the pr?

return {
...normalized,
priceInt: extractNumber(normalized.price),
sizeInt: extractNumber(normalized.size),
roomsInt: extractNumber(normalized.rooms),
};
});
}

/**
Expand All @@ -206,7 +244,13 @@ class FredyPipelineExecutioner {
* @returns {Listing[]} Filtered listings that pass validation and provider filter.
*/
_filter(listings) {
const keys = Object.keys(this._providerConfig.crawlFields);
// i removed it because crawlFields might be different than fields which are required.
// like for kleinanzeigen we have tags (includes multiple fields) but will be than extract at normalize, and deleted because its only internal used.
// I would suggest that we define a standard list like (id, price, rooms, size, title, link, description, address, image, url)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get this, but I think what we then should do is letting each provider define their list of required params. A standard function that is available in every provider which returns an array of required keys..

// it might be that some of this props value is null, wich is ok without id, link, title
// Also this might be not needed when using typings with typescript. I would suggest to move the whole project to typescript to have save typings.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like typescript for various reasons which would take way to long to explain here, but Fredy won't move to Typescript ;)

//const keys = Object.keys(this._providerConfig.crawlFields);
const keys = ['id', 'link', 'title'];
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not have a fixed list in here.

const filteredListings = listings.filter((item) => keys.every((key) => key in item));
return filteredListings.filter(this._providerConfig.filter);
}
Expand Down Expand Up @@ -240,7 +284,7 @@ class FredyPipelineExecutioner {
if (newListings.length === 0) {
throw new NoNewListingsWarning();
}
const sendNotifications = notify.send(this._providerId, newListings, this._notificationConfig, this._jobKey);
const sendNotifications = notify.send(this._providerId, newListings, this._jobNotificationConfig, this._jobKey);
return Promise.all(sendNotifications).then(() => newListings);
}

Expand Down
2 changes: 2 additions & 0 deletions lib/api/routes/jobRouter.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ jobRouter.post('/', async (req, res) => {
enabled,
shareWithUsers = [],
spatialFilter = null,
specFilter = null,
} = req.body;
const settings = await getSettings();
try {
Expand All @@ -197,6 +198,7 @@ jobRouter.post('/', async (req, res) => {
notificationAdapter,
shareWithUsers,
spatialFilter,
specFilter,
});
} catch (error) {
res.send(new Error(error));
Expand Down
3 changes: 3 additions & 0 deletions lib/api/routes/listingsRouter.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ listingsRouter.get('/table', async (req, res) => {
sortfield = null,
sortdir = 'asc',
freeTextFilter,
filterByJobSettings,
} = req.query || {};

// normalize booleans (accept true, 'true', 1, '1' for true; false, 'false', 0, '0' for false)
Expand All @@ -37,6 +38,7 @@ listingsRouter.get('/table', async (req, res) => {
};
const normalizedActivity = toBool(activityFilter);
const normalizedWatch = toBool(watchListFilter);
const normalizedFilterByJobSettings = toBool(filterByJobSettings) ?? true;

let jobFilter = null;
let jobIdFilter = null;
Expand All @@ -56,6 +58,7 @@ listingsRouter.get('/table', async (req, res) => {
jobIdFilter: jobIdFilter,
providerFilter,
watchListFilter: normalizedWatch,
filterByJobSettings: normalizedFilterByJobSettings,
sortField: sortfield || null,
sortDir: sortdir === 'desc' ? 'desc' : 'asc',
userId: req.session.currentUser,
Expand Down
10 changes: 7 additions & 3 deletions lib/provider/kleinanzeigen.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ let appliedBlackList = [];
let appliedBlacklistedDistricts = [];

function normalize(o) {
const size = o.size || '--- m²';
const parts = (o.tags || '').split('·').map((p) => p.trim());
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

const size = parts.find((p) => p.includes('m²')) || '--- m²';
const rooms = parts.find((p) => p.includes('Zi.')) || '--- Zi.';
const id = buildHash(o.id, o.price);
const link = `https://www.kleinanzeigen.de${o.link}`;
return Object.assign(o, { id, size, link });

delete o.tags;
return Object.assign(o, { id, size, rooms, link });
}

function applyBlacklist(o) {
Expand All @@ -33,7 +37,7 @@ const config = {
crawlFields: {
id: '.aditem@data-adid | int',
price: '.aditem-main--middle--price-shipping--price | removeNewline | trim',
size: '.aditem-main .text-module-end | removeNewline | trim',
tags: '.aditem-main--middle--tags | removeNewline | trim',
title: '.aditem-main .text-module-begin a | removeNewline | trim',
link: '.aditem-main .text-module-begin a@href | removeNewline | trim',
description: '.aditem-main .aditem-main--middle--description | removeNewline | trim',
Expand Down
10 changes: 1 addition & 9 deletions lib/services/jobs/jobExecutionService.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,7 @@ export function initJobExecutionService({ providers, settings, intervalMs }) {
browser = await puppeteerExtractor.launchBrowser(matchedProvider.config.url, {});
}

await new FredyPipelineExecutioner(
matchedProvider.config,
job.notificationAdapter,
job.spatialFilter,
prov.id,
job.id,
similarityCache,
browser,
).execute();
await new FredyPipelineExecutioner(matchedProvider.config, job, prov.id, similarityCache, browser).execute();
} catch (err) {
logger.error(err);
}
Expand Down
16 changes: 13 additions & 3 deletions lib/services/storage/jobStorage.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export const upsertJob = ({
userId,
shareWithUsers = [],
spatialFilter = null,
specFilter = null,
}) => {
const id = jobId || nanoid();
const existing = SqliteConnection.query(`SELECT id, user_id FROM jobs WHERE id = @id LIMIT 1`, { id })[0];
Expand All @@ -44,7 +45,8 @@ export const upsertJob = ({
provider = @provider,
notification_adapter = @notification_adapter,
shared_with_user = @shareWithUsers,
spatial_filter = @spatialFilter
spatial_filter = @spatialFilter,
spec_filter = @specFilter
WHERE id = @id`,
{
id,
Expand All @@ -55,12 +57,13 @@ export const upsertJob = ({
provider: toJson(provider ?? []),
notification_adapter: toJson(notificationAdapter ?? []),
spatialFilter: spatialFilter ? toJson(spatialFilter) : null,
specFilter: specFilter ? toJson(specFilter) : null,
},
);
} else {
SqliteConnection.execute(
`INSERT INTO jobs (id, user_id, enabled, name, blacklist, provider, notification_adapter, shared_with_user, spatial_filter)
VALUES (@id, @user_id, @enabled, @name, @blacklist, @provider, @notification_adapter, @shareWithUsers, @spatialFilter)`,
`INSERT INTO jobs (id, user_id, enabled, name, blacklist, provider, notification_adapter, shared_with_user, spatial_filter, spec_filter)
VALUES (@id, @user_id, @enabled, @name, @blacklist, @provider, @notification_adapter, @shareWithUsers, @spatialFilter, @specFilter)`,
{
id,
user_id: ownerId,
Expand All @@ -71,6 +74,7 @@ export const upsertJob = ({
shareWithUsers: toJson(shareWithUsers ?? []),
notification_adapter: toJson(notificationAdapter ?? []),
spatialFilter: spatialFilter ? toJson(spatialFilter) : null,
specFilter: specFilter ? toJson(specFilter) : null,
},
);
}
Expand All @@ -92,6 +96,7 @@ export const getJob = (jobId) => {
j.shared_with_user,
j.notification_adapter AS notificationAdapter,
j.spatial_filter AS spatialFilter,
j.spec_filter AS specFilter,
(SELECT COUNT(1) FROM listings l WHERE l.job_id = j.id AND l.is_active = 1 AND l.manually_deleted = 0) AS numberOfFoundListings
FROM jobs j
WHERE j.id = @id
Expand All @@ -107,6 +112,7 @@ export const getJob = (jobId) => {
shared_with_user: fromJson(row.shared_with_user, []),
notificationAdapter: fromJson(row.notificationAdapter, []),
spatialFilter: fromJson(row.spatialFilter, null),
specFilter: fromJson(row.specFilter, null),
};
};

Expand Down Expand Up @@ -157,6 +163,7 @@ export const getJobs = () => {
j.shared_with_user,
j.notification_adapter AS notificationAdapter,
j.spatial_filter AS spatialFilter,
j.spec_filter AS specFilter,
(SELECT COUNT(1) FROM listings l WHERE l.job_id = j.id AND l.is_active = 1 AND l.manually_deleted = 0) AS numberOfFoundListings
FROM jobs j
WHERE j.enabled = 1
Expand All @@ -170,6 +177,7 @@ export const getJobs = () => {
shared_with_user: fromJson(row.shared_with_user, []),
notificationAdapter: fromJson(row.notificationAdapter, []),
spatialFilter: fromJson(row.spatialFilter, null),
specFilter: fromJson(row.specFilter, null),
}));
};

Expand Down Expand Up @@ -260,6 +268,7 @@ export const queryJobs = ({
j.shared_with_user,
j.notification_adapter AS notificationAdapter,
j.spatial_filter AS spatialFilter,
j.spec_filter AS specFilter,
(SELECT COUNT(1) FROM listings l WHERE l.job_id = j.id AND l.is_active = 1 AND l.manually_deleted = 0) AS numberOfFoundListings
FROM jobs j
${whereSql}
Expand All @@ -276,6 +285,7 @@ export const queryJobs = ({
shared_with_user: fromJson(row.shared_with_user, []),
notificationAdapter: fromJson(row.notificationAdapter, []),
spatialFilter: fromJson(row.spatialFilter, null),
specFilter: fromJson(row.specFilter, null),
}));

return { totalNumber, page: safePage, result };
Expand Down
Loading