#!/usr/bin/env python3 """ Simple server for gathering images from locally connected devices. Supported devices are scanners available through SANE on linux. This service will no work for multiple users. TODO: allow for shutdown and call device.close() then """ import json import logging import subprocess import sys from copy import deepcopy from pathlib import Path import aiohttp_jinja2 import jinja2 import sane from aiohttp import web from PIL import Image ocr_languages = ['deu', 'eng'] logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) config_dir = Path.home() / '.config' / 'rsnaps' cache_dir = Path.home() / '.cache' / 'rsnaps' cache_dir_small = cache_dir / 'small' cache_dir_pdf = cache_dir / 'pdf' archive_dir = None thumbnail_width = 181 thumbnail_height = 256 path_jbig2 = Path.home() / 'Desktop/tools/scan/jbig2' path_pdf_py = Path.home() / 'Desktop/tools/scan/pdf.py' routes = web.RouteTableDef() app_basedir = Path(__file__).parent snap_device = None settings_cache = None def init(archive_dir_): """ Setup config and cache directories. """ global archive_dir try: archive_dir = Path(archive_dir_) except: print('Invalid archive basedir.') sys.exit(2) if not archive_dir.exists(): print('Archive basedir does not exist.') sys.exit(2) # TODO: check if archive_dir is writable config_dir.mkdir(mode=0o700, exist_ok=True) cache_dir.mkdir(mode=0o700, exist_ok=True) cache_dir_small.mkdir(mode=0o700, exist_ok=True) cache_dir_pdf.mkdir(mode=0o700, exist_ok=True) def update_settings(settings: dict): """ Update existing settings, store and return them. Fully replace keys that are in *settings*. """ settings_path = config_dir / 'settings.json' try: with open(settings_path, 'r') as file: settings_ = json.loads(file.read()) except: settings_ = {} settings_.update(settings) with open(settings_path, 'w') as file: file.write(json.dumps(settings_, indent=4)) global settings_cache settings_cache = deepcopy(settings_) return settings_ def get_settings(): """ Return stored settings. """ global settings_cache if settings_cache: return deepcopy(settings_cache) settings_path = config_dir / 'settings.json' try: with open(settings_path, 'r') as file: settings_cache = json.loads(file.read()) return deepcopy(settings_cache or {}) except Exception as err: logger.exception(err) return {} async def store_settings(request): """ Extract settings from POST data and store them. Also change global `snap_device` to the selected one. """ try: data = await request.post() settings_ = get_settings() settings = {} # device device_id = data.get('device_id') device_data = None devices = [list(x) for x in settings_.get('devices')] or [] for device_data_ in devices: if device_id == device_data_[0]: device_data = device_data_.copy() global snap_device if device_id != settings_.get('device_id') or snap_device is None: settings['device_id'] = device_id snap_device = sane.open(device_id) # device_settings device_settings = {} if paper_size := data.get('paper_size'): # for setting the scan area device_settings['paper_size'] = paper_size if mode := data.get('mode'): device_settings['mode'] = mode if resolution := data.get('resolution'): device_settings['resolution'] = int(resolution) device_settings['snap'] = data.get('snap') # collection collection_choice = Path(settings_.get('collection_choice', '.')) if collection_new := str(data.get('collection_new')): p = archive_dir / collection_choice / collection_new p.mkdir(mode=0o700, parents=True, exist_ok=True) settings['collection_choice'] = str(collection_choice / collection_new) if collection_description := data.get('collection_description'): settings['collection_description'] = collection_description # (duplex) page number if dpage_number := data.get('dpage_number'): try: dpage_number = int(dpage_number) except: dpage_number = None if dpage_number in (None, ''): max_ = 0 for name in archive_dir.glob('*'): try: max_ = max(max_, int(name[:4])) except: continue dpage_number = max_ + 1 settings['dpage_number'] = dpage_number # target settings['target'] = data.get('target', 'a') # save settings, if changed settings_old = deepcopy(settings_) settings_.update(settings) if 'device_settings' not in settings_: settings_['device_settings'] = {} if device_id and device_id not in settings_['device_settings']: settings_['device_settings'][device_id] = {} settings_['device_settings'][device_id].update(device_settings) if settings_old != settings_: settings_path = config_dir / 'settings.json' with open(settings_path, 'w') as file: file.write(json.dumps(settings_, indent=4)) global settings_cache settings_cache = deepcopy(settings_) return settings_ except Exception as err: logger.exception(err) return {} def get_params(settings=None): """ Return params required for main template (snaps.html). Includes settings and image names. """ if settings is None: settings = get_settings() images = [] image_path = archive_dir / settings.get('collection_choice', '.') for path in image_path.glob('*'): if path.is_file(): name = path.with_suffix('').name images.append(name) device_id = settings.get('device_id') device_settings = settings.get('device_settings', {}).get(device_id, {}) collection_choices = [str(d.relative_to(archive_dir)) for d in archive_dir.glob('**') if d.is_dir()] collection_choices.sort(key=lambda x: x.lower()) return { 'devices': settings.get('devices', []), 'device_id': device_id, 'paper_sizes': ['DIN A4 (left)', 'DIN A5 (left)', 'DIN A5 (centered)', 'Letter', '115x158mm', '145x420mm', '157x240mm', '170x240mm', '210x440mm'], 'paper_size': device_settings.get('paper_size', 'DIN A4 (left)'), 'modes': device_settings.get('modes', []), 'mode': device_settings.get('mode', 'Gray'), 'resolutions': device_settings.get('resolutions', []), 'resolution': device_settings.get('resolution', 300), 'sources': device_settings.get('sources', []), 'collection_choices': collection_choices, 'collection_choice': settings.get('collection_choice', '.'), 'collection_description': settings.get('collection_description', ''), 'dpage_number': settings.get('dpage_number', 0) + 1, 'target': settings.get('target', 'a'), 'images': sorted(images), } @aiohttp_jinja2.template('rsnaps.html') async def detect(request): """ Detect available devices and store them in settings. """ sane.exit() sane.init() devices = sane.get_devices() sane.exit() update_settings({'devices': devices}) raise web.HTTPFound('/') @aiohttp_jinja2.template('rsnaps.html') async def collection(request): data = await request.post() collection_choice = data.get('collection_choice', '.') update_settings({'collection_choice': collection_choice}) raise web.HTTPFound('/') @aiohttp_jinja2.template('rsnaps.html') async def collection_delete(request): data = await request.post() collection_choice = data.get('collection_choice', '.') if collection_choice and collection_choice != '.': image_dir = archive_dir / collection_choice remove_dir(image_dir) thumbnail_dir = cache_dir_small / collection_choice remove_dir(thumbnail_dir) else: print(76576567373, collection_choice) # TODO update_settings({'collection_choice': '.'}) raise web.HTTPFound('/') def remove_dir(dir_path): if not dir_path.is_dir(): return has_subdirs = False for p in dir_path.iterdir(): if p.is_file(): p.unlink() else: has_subdirs = True if not has_subdirs: dir_path.rmdir() @aiohttp_jinja2.template('rsnaps.html') async def device(request): data = await request.post() device_id = data.get('device_id') global snap_device if device_id: try: snap_device = sane.open(device_id) except: try: sane.exit() if ':libusb:' in device_id: bus_devnum = device_id[-7:] # get usb_ids cmd = ['lsusb', '-s', bus_devnum] process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) out, _ = process.communicate() usb_ids = out.decode('utf-8').split(' ')[5] # do usbreset cmd = ['usbreset', usb_ids] subprocess.run(cmd) sane.init() try: snap_device = sane.open(device_id) except Exception as err: print(f'Error opening device {device_id}: {err}') snap_device = None except: print(f'Error on sane exit and init') snap_device = None else: snap_device = None if snap_device: update_settings({'device_id': device_id}) else: update_settings({'device_id': None}) # get and store device constraints if snap_device: modes = snap_device['mode'].constraint resolutions = snap_device['resolution'].constraint sources = snap_device['source'].constraint device_settings = {} device_settings['modes'] = modes device_settings['resolutions'] = resolutions device_settings['sources'] = sources settings = get_settings() if 'device_settings' not in settings: settings['device_settings'] = {} if device_id not in settings['device_settings']: settings['device_settings'][device_id] = {} settings['device_settings'][device_id].update(device_settings) update_settings({'device_settings': settings['device_settings']}) params = snap_device.get_parameters() logger.info(f'Scanner device parameters: {params}') # logger.info(f'Scanner device options: {snap_device.optlist}') # logger.info(dir(snap_device)) # for o in snap_device.optlist: # try: # print(snap_device[o], dir(snap_device[o])) # print(o, snap_device[o].constraint) # except: # pass raise web.HTTPFound('/') @aiohttp_jinja2.template('rsnaps.html') async def rsnaps(request): """ Just display main view. """ return get_params() @aiohttp_jinja2.template('rsnaps.html') async def snap(request): """ Perform a snap. """ settings = await store_settings(request) try: snap_page(settings) except: logger.exception(f'FAIL: snap_page({settings})') return get_params(settings) async def image_delete(request): """ Delete the named image. """ name = request.match_info['name'] settings = get_settings() image_dir = archive_dir / settings.get('collection_choice', '.') image_file = image_dir / f'{name}.png' if image_file.is_file(): image_file.unlink() image_dir_small = cache_dir_small / settings.get('collection_choice', '.') image_file_small = image_dir_small / f'{name}.png' if image_file_small.is_file(): image_file_small.unlink() raise web.HTTPFound('/') async def image(request): """ Display the named image. """ name = request.match_info['name'] settings = get_settings() image_dir = archive_dir / settings.get('collection_choice', '.') try: with open(image_dir / f'{name}.png', 'rb') as file: img_content = file.read() return web.Response(body=img_content, content_type='image/png') except Exception as err: logger.exception('Image not found') raise web.HTTPNotFound(text='The image does not exist.') async def image_small(request): """ Display the named thumbnail / small image. """ name = request.match_info['name'] settings = get_settings() image_dir_small = cache_dir_small / settings.get('collection_choice', '.') image_file_small = image_dir_small / f'{name}.png' try: with open(image_file_small, 'rb') as file: img_content = file.read() return web.Response(body=img_content, content_type='image/png') except Exception: image_dir = archive_dir / settings.get('collection_choice', '.') image_file = image_dir / f'{name}.png' if not image_file.is_file(): logger.exception('Image not found') raise web.HTTPNotFound(text='The image does not exist.') if not image_dir_small.is_dir(): image_dir_small.mkdir(mode=0o700, parents=True) with Image.open(image_file) as img: img_small = img.resize((thumbnail_width, thumbnail_height), Image.BICUBIC) img_small.save(image_dir_small / f'{name}.png') async def pages_operation(request): """ Perform the requested operation on pages, e.g. PDF generation. """ data = await request.post() if not (pages := parse_pages(data.get('pages'))): raise web.HTTPFound('/') operation = data.get('operation') if operation == 'delete': await delete_pages(pages) raise web.HTTPFound('/') elif operation == 'rotate180': await rotate_pages(pages, angle=180) raise web.HTTPFound('/') else: ocr = operation == 'pdf_ocr' lossy = data.get('lossy') == 'lossy' return await create_pdf(pages, ocr=ocr, lossy=lossy) def parse_pages(pages_): """ Input cleaning: Filter `pages_` for existing ones. Return a list of pages, retaining the requested sort order. """ pages = [] if not pages_: return pages for pr in (prs := str(pages_).split(',')): if '-' in pr: start_, end_ = pr.split('-', 1) start_ = start_.strip() end_ = end_.strip() if start_.endswith('a'): start_t = 'a' start_ = start_[:-1] elif start_.endswith('b'): start_t = 'b' start_ = start_[:-1] else: start_t = '' start = int(start_) if end_.endswith('a'): end_t = 'a' end_ = end_[:-1] elif end_.endswith('b'): end_t = 'b' end_ = end_[:-1] else: end_t = '' end = int(end_) else: pr = pr.strip() if pr.endswith('a'): pr_t = 'a' pr = pr[:-1] elif pr.endswith('b'): pr_t = 'b' pr = pr[:-1] else: pr_t = '' start = end = int(pr) start_t = end_t = pr_t for ind in range(start, end + 1): if ind == start: if not start_t or start_t == 'a': pages.append(f'{ind:04d}a') if not (ind == end and end_t == 'a'): pages.append(f'{ind:04d}b') elif ind == end: if not (ind == start and start_t == 'b'): pages.append(f'{ind:04d}a') if not end_t or end_t == 'b': pages.append(f'{ind:04d}b') else: pages.append(f'{ind:04d}a') pages.append(f'{ind:04d}b') # filter by existing pages pages_set = set(pages) image_dir = archive_dir / get_settings().get('collection_choice', '.') existing = set([p.stem for p in image_dir.iterdir()]) common = pages_set & existing return [page for page in pages if page in common] async def delete_pages(pages): """ Delete the given `pages`, i.e. images and small images. """ image_dir = archive_dir / get_settings().get('collection_choice', '.') image_dir_small = cache_dir_small / get_settings().get('collection_choice', '.') for page in pages: p = image_dir / f'{page}.png' p.unlink(missing_ok=True) p = image_dir_small / f'{page}.png' p.unlink(missing_ok=True) async def rotate_pages(pages, angle=0): """ Rotate the `pages` by `angle`, i.e. images and small images. """ image_dir = archive_dir / get_settings().get('collection_choice', '.') image_dir_small = cache_dir_small / get_settings().get('collection_choice', '.') for page in pages: p = image_dir / f'{page}.png' cmd = [ 'mogrify', '-rotate', str(angle), str(p), ] subprocess.run(cmd, cwd=cache_dir_pdf) p = image_dir_small / f'{page}.png' cmd = [ 'mogrify', '-rotate', str(angle), str(p), ] subprocess.run(cmd, cwd=cache_dir_pdf) async def create_pdf(pages, lossy: bool = False, ocr: bool = False): """ Create and return a PDF from the given pages. If `lossy` is True, use jbig2 for compression. If `ocr` is True, perform OCR using `ocrmypdf`. """ for p in cache_dir_pdf.iterdir(): p.unlink() collection_choice = Path(get_settings().get('collection_choice', '.')) img_paths = [str(archive_dir / collection_choice / f'{page}.png') for page in pages] if lossy: cmd = [ str(path_jbig2), '-s', '-p', '-a', '-v', '-4', ] + img_paths logger.debug(' '.join([str(x) for x in cmd])) subprocess.run(cmd, cwd=cache_dir_pdf) cmd = [ '/usr/bin/python3', str(path_pdf_py), 'output', ] logger.debug(' '.join(cmd)) subprocess.run(cmd, cwd=cache_dir_pdf, capture_output=True) #with open(cache_dir_pdf / 'x.pdf', 'wb') as file: # file.write(result.stdout) else: # create a PDF file using img2pdf cmd = [ 'img2pdf', '--pagesize', 'A4', '-o', str(cache_dir_pdf / 'o1.pdf'), ] + img_paths logger.debug(' '.join([str(x) for x in cmd])) subprocess.run(cmd, cwd=cache_dir_pdf) # optimize images and linearize the pdf file using qpdf cmd = [ 'qpdf', '--optimize-images', '--linearize', '--compress-streams=y', '--object-streams=generate', '--recompress-flate', str(cache_dir_pdf / 'o1.pdf'), str(cache_dir_pdf / 'o.pdf'), ] logger.debug(' '.join([str(x) for x in cmd])) subprocess.run(cmd, cwd=cache_dir_pdf) if ocr: cmd = [ 'ocrmypdf', '-d', '-O', '3', '-l', '+'.join(ocr_languages), '--output-type', 'pdf', cache_dir_pdf / 'o.pdf', cache_dir_pdf / 'ocr.pdf', ] logger.debug(' '.join([str(x) for x in cmd])) subprocess.run(cmd, cwd=cache_dir_pdf) result_file = cache_dir_pdf / 'ocr.pdf' else: result_file = cache_dir_pdf / 'o.pdf' # return result with open(result_file, 'rb') as file: result_content = file.read() return web.Response(body=result_content, content_type='application/pdf') app = web.Application() app.add_routes([ web.get('/', rsnaps), web.post('/detect', detect), web.post('/collection', collection), web.post('/collection/delete', collection_delete), web.post('/device', device), web.post('/snap', snap), web.post('/image-delete/{name}', image_delete), web.get('/image/{name}', image), web.get('/image-small/{name}', image_small), web.post('/pages-operation', pages_operation), ]) aiohttp_jinja2.setup( app, loader=jinja2.FileSystemLoader(app_basedir / 'templates'), ) app.router.add_static('/static', app_basedir / 'static') # snap data def snap_page(settings): """ Set device options. """ device_id = settings.get('device_id') device_settings = settings.get('device_settings', {}).get(device_id, {}) # set source before mode and resolution! global snap_device snap_device.source = device_settings.get('snap') snap_device.mode = device_settings.get('mode') snap_device.resolution = device_settings.get('resolution') if device_settings.get('snap') in ('Automatic Document Feeder', 'ADF Front'): scan_adf(snap_device, settings) if device_settings.get('snap') == 'ADF Duplex': scan_adf_duplex(snap_device, settings) else: scan_page(snap_device, settings) def scan_page(snap_device, settings): """ Scan a single page from source 'Flatbed'. """ set_scan_area(settings) snap_device.start() img = snap_device.snap() store_image(img, settings) def scan_adf(snap_device, settings): """ Scan pages from source 'Automatic Document Feeder'. """ set_scan_area(settings) direction = -1 if settings.get('target') == 'b' else 1 img_i = 0 dpage_number = settings.get('dpage_number', 1) if direction == -1: dpage_number -= 1 while True: try: snap_device.start() img = snap_device.snap(True) if not isinstance(img, Image.Image): break settings['dpage_number'] = dpage_number + direction * img_i store_image(img, settings) img_i += 1 except Exception as e: if str(e) == 'Document feeder out of documents': return else: print(e) # snap_device.close() # device_id = settings['device_id'] # snap_device = sane.open(device_id) break # TODO: maybe `adf_mode` can be set to `simplex`? see `--adf-mode` in http://sane-project.org/man/sane-epsonds.5.html # for img in snap_device.multi_scan(): # if not isinstance(img, Image.Image): # break # settings['dpage_number'] = dpage_number + direction * img_i # store_image(img, settings) # img_i += 1 def scan_adf_duplex(snap_device, settings): """ Scan pages from source 'ADF Duplex' (a DADF scanner). """ set_scan_area(settings) img_i = 0 dpage_number = settings.get('dpage_number', 1) side = 'b' for img in snap_device.multi_scan(): if not isinstance(img, Image.Image): continue side = 'a' if side == 'b' else 'b' settings['target'] = side settings['dpage_number'] = dpage_number + img_i // 2 store_image(img, settings) img_i += 1 def set_scan_area(settings): """ Set coordinates of the scan area, using device params and paper size. """ device_id = settings.get('device_id') device_settings = settings.get('device_settings', {}).get(device_id, {}) paper_size = device_settings.get('paper_size', '') if paper_size.startswith('DIN A5'): paper_width_mm = 148 paper_height_mm = 210 elif paper_size.startswith('Letter'): paper_width_mm = 216 paper_height_mm = 279 elif paper_size.startswith('115x158mm'): paper_width_mm = 115 paper_height_mm = 158 elif paper_size.startswith('145x420mm'): paper_width_mm = 145 paper_height_mm = 420 elif paper_size.startswith('157x240mm'): paper_width_mm = 157 paper_height_mm = 240 elif paper_size.startswith('170x240mm'): paper_width_mm = 170 paper_height_mm = 240 elif paper_size.startswith('210x440mm'): paper_width_mm = 210 paper_height_mm = 440 else: paper_width_mm = 210 paper_height_mm = 297 scan_width_mm = snap_device['tl_x'].constraint[1] if scan_width_mm > paper_width_mm: if 'centered' in paper_size: offset_hl = offset_hr = (scan_width_mm - paper_width_mm) / 2 else: offset_hl = 0 offset_hr = scan_width_mm - paper_width_mm snap_device.tl_x = offset_hl snap_device.br_x = scan_width_mm - offset_hr snap_device.tl_y = 0 snap_device.br_y = paper_height_mm #print(paper_size, snap_device.tl_x,snap_device.tl_x,snap_device.br_x,snap_device.br_y) def store_image(img, settings): dpage_number = settings.get('dpage_number', 1) target = settings.get('target', 'a') img_name = f'{dpage_number:04d}{target}.png' img.save(archive_dir / settings.get('collection_choice', '.') / img_name) #img_small = img.resize((thumbnail_width, thumbnail_height), Image.BICUBIC) # device_id = settings['device_id'] # device_settings = settings['device_settings'][device_id] # mode = device_settings.get('mode', '?') # resolution = device_settings.get('resolution', '?') #collection_name = device_settings.get('collection_name', '').replace(' ', '_') #img_small_name = f'{dpage_number:04d}{target}_{mode}_{resolution}_{collection_name}.png' #img_small.save(cache_dir_small / img_small_name) update_settings({'dpage_number': dpage_number}) if __name__ == '__main__': if len(sys.argv) < 2: print('Please give the archive basedir as argument 1.') sys.exit(2) init(sys.argv[1]) web.run_app(app, port=8066)