From 32066ad362e5db71ec5ea768db30070f5175a0d8 Mon Sep 17 00:00:00 2001 From: ibu Date: Sun, 9 Jan 2022 16:47:02 +0000 Subject: [PATCH] Fix many bugs related to resource storage and resource merging --- src/atextcrawler/resource/operations.py | 107 ++++++++++++------------ 1 file changed, 52 insertions(+), 55 deletions(-) diff --git a/src/atextcrawler/resource/operations.py b/src/atextcrawler/resource/operations.py index 0ed3a22..4edd58f 100644 --- a/src/atextcrawler/resource/operations.py +++ b/src/atextcrawler/resource/operations.py @@ -99,7 +99,7 @@ async def get_site_path( Return the next path of a given site that needs to be processed. If none needs to be processed, return None. - I particular, for sites having crawl_enabled=false return None. + In particular, for sites having crawl_enabled=false return None. Only return paths that have last been visited before *before* or not been processed at all. Paths with an ok_count of -3 or lower @@ -200,34 +200,37 @@ async def process_site_path( site.base_url, res_sitemap.urls ) await add_site_paths(conn, site.id_, paths) - return False - - # handle TextResource - relevant, is_new_resource = await _handle_text_resource( - app, conn, tf, site, site_path, resource, url - ) - if not relevant: - return False - site_path.resource_id = resource.id_ - site_path.canonical = resource.init_fields.get('canonical') + resource_id = None + is_new_resource = False + else: # handle TextResource + resource_id, is_new_resource = await _handle_text_resource( + app, conn, tf, site, site_path, resource, url + ) + site_path.canonical = resource.init_fields.get('canonical') + if shortlink_url := resource.init_fields.get('shortlink'): + await _save_shortlink( + conn, + site, + url, + resource_id, + shortlink_url, + site_path.last_visit, + ) + site_path.resource_id = resource_id site_path.ok_count += 1 await site_path.save(conn) - - if shortlink_url := resource.init_fields.get('shortlink'): - await _save_shortlink( - conn, site, url, resource, shortlink_url, site_path.last_visit - ) - return is_new_resource async def _handle_text_resource( app, conn, tf, site, site_path, resource, url -) -> tuple[bool, bool]: +) -> tuple[Optional[int], bool]: """ - Ingest a text resource. + Ingest a text resource returning the id of the possibly merged resource. - Return whether the resource is relevant and whether it is new. + Return the id of the merged resource (or None if the incoming resource + has a too short text and is not worth storing a resource) and + whether the resource is new (False if the returned resource_id is None). """ # save the resource's internal links paths = [] @@ -250,22 +253,18 @@ async def _handle_text_resource( app.config['elasticsearch']['index_base_name'], ) await site_path.save(conn) - return False, False + return None, False simhash = simhash_from_bigint(resource.simhash) index = site.simhash_index similar_ids = search_simhash(index, simhash) + print(similar_ids, site_path.resource_id) # determine the destination resource and resources to be merged into it old_id = site_path.resource_id - if ( - old_id - and old_id in similar_ids - and ( # similar to old text - dest_resource := await TextResource().load(conn, old_id) - ) - ): - merge_ids = list(filter(lambda elem: elem != old_id, similar_ids)) + if old_id and old_id in similar_ids: + merge_ids = similar_ids + dest_resource = await TextResource().load(conn, old_id) else: # no old text, or old text not similar any more if old_id: await site_path.unlink_resource( @@ -302,36 +301,34 @@ async def _handle_text_resource( create_simhash(index, resource.id_, simhash) # add resource to search index - if resource.content_type in ('html', 'plain'): - await index_resource( + await index_resource( + app.search_engine, + tf, + site_path, + resource, + site.base_url, + url, + ) + + # replace references to any merge resource with links to the dest resource + sql = "UPDATE site_path SET resource_id=$1 WHERE resource_id=ANY($2)" + await conn.execute(sql, resource.id_, merge_ids) + + # remove orphaned resources after merging + sql = "DELETE FROM resource WHERE id=ANY($1) RETURNING (id, lang)" + rows = await conn.fetch(sql, set(merge_ids) - set([resource.id_])) + for row in rows: + await delete_resource( app.search_engine, - tf, - site_path, - resource, - site.base_url, - url, + row['row'][1], + row['row'][0], ) - # merge resources: merge_ids -> resource - for merge_id in merge_ids: - # replace links to the merge resource with links to the dest resource - sql = "UPDATE site_path SET resource_id=$1 WHERE resource_id=$2" - await conn.execute(sql, resource.id_ or None, merge_id) - # remove orphaned merge resource - sql = "DELETE FROM resource WHERE id=$1 RETURNING (true, lang)" - found = await conn.fetchval(sql, merge_id) - if found: - await delete_resource( - app.search_engine, - found[1], - merge_id, - ) - - return True, is_new_resource + return resource.id_, is_new_resource async def _save_shortlink( - conn, site, url, resource, shortlink_url, last_visit + conn, site, url, resource_id, shortlink_url, last_visit ): """ Save a shortlink. @@ -349,11 +346,11 @@ async def _save_shortlink( last_visit=last_visit, ok_count=1, canonical=False, - resource_id=resource.id_, + resource_id=resource_id, ) else: shortlink.last_visit = last_visit shortlink.ok_count += 1 shortlink.canonical = False - shortlink.resource_id = resource.id_ + shortlink.resource_id = resource_id await shortlink.save(conn)