824 lines
26 KiB
Python
824 lines
26 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
"""
|
||
|
Simple server for gathering images from locally connected devices.
|
||
|
|
||
|
Supported devices are scanners available through SANE on linux.
|
||
|
|
||
|
This service will no work for multiple users.
|
||
|
|
||
|
TODO: allow for shutdown and call device.close() then
|
||
|
"""
|
||
|
|
||
|
import json
|
||
|
import logging
|
||
|
import subprocess
|
||
|
import sys
|
||
|
from copy import deepcopy
|
||
|
from pathlib import Path
|
||
|
|
||
|
import aiohttp_jinja2
|
||
|
import jinja2
|
||
|
import sane
|
||
|
from aiohttp import web
|
||
|
from PIL import Image
|
||
|
|
||
|
|
||
|
ocr_languages = ['deu', 'eng']
|
||
|
|
||
|
|
||
|
logging.basicConfig()
|
||
|
logging.getLogger().setLevel(logging.DEBUG)
|
||
|
logger = logging.getLogger(__name__)
|
||
|
logger.setLevel(logging.DEBUG)
|
||
|
|
||
|
|
||
|
config_dir = Path.home() / '.config' / 'rsnaps'
|
||
|
cache_dir = Path.home() / '.cache' / 'rsnaps'
|
||
|
cache_dir_small = cache_dir / 'small'
|
||
|
cache_dir_pdf = cache_dir / 'pdf'
|
||
|
archive_dir = None
|
||
|
|
||
|
|
||
|
thumbnail_width = 181
|
||
|
thumbnail_height = 256
|
||
|
|
||
|
|
||
|
path_jbig2 = Path.home() / 'Desktop/tools/scan/jbig2'
|
||
|
path_pdf_py = Path.home() / 'Desktop/tools/scan/pdf.py'
|
||
|
|
||
|
|
||
|
routes = web.RouteTableDef()
|
||
|
|
||
|
|
||
|
app_basedir = Path(__file__).parent
|
||
|
|
||
|
|
||
|
snap_device = None
|
||
|
|
||
|
|
||
|
settings_cache = None
|
||
|
|
||
|
|
||
|
def init(archive_dir_):
|
||
|
"""
|
||
|
Setup config and cache directories.
|
||
|
"""
|
||
|
global archive_dir
|
||
|
try:
|
||
|
archive_dir = Path(archive_dir_)
|
||
|
except:
|
||
|
print('Invalid archive basedir.')
|
||
|
sys.exit(2)
|
||
|
if not archive_dir.exists():
|
||
|
print('Archive basedir does not exist.')
|
||
|
sys.exit(2)
|
||
|
# TODO: check if archive_dir is writable
|
||
|
config_dir.mkdir(mode=0o700, exist_ok=True)
|
||
|
cache_dir.mkdir(mode=0o700, exist_ok=True)
|
||
|
cache_dir_small.mkdir(mode=0o700, exist_ok=True)
|
||
|
cache_dir_pdf.mkdir(mode=0o700, exist_ok=True)
|
||
|
|
||
|
|
||
|
def update_settings(settings: dict):
|
||
|
"""
|
||
|
Update existing settings, store and return them.
|
||
|
|
||
|
Fully replace keys that are in *settings*.
|
||
|
"""
|
||
|
settings_path = config_dir / 'settings.json'
|
||
|
try:
|
||
|
with open(settings_path, 'r') as file:
|
||
|
settings_ = json.loads(file.read())
|
||
|
except:
|
||
|
settings_ = {}
|
||
|
settings_.update(settings)
|
||
|
with open(settings_path, 'w') as file:
|
||
|
file.write(json.dumps(settings_, indent=4))
|
||
|
global settings_cache
|
||
|
settings_cache = deepcopy(settings_)
|
||
|
return settings_
|
||
|
|
||
|
|
||
|
def get_settings():
|
||
|
"""
|
||
|
Return stored settings.
|
||
|
"""
|
||
|
global settings_cache
|
||
|
if settings_cache:
|
||
|
return deepcopy(settings_cache)
|
||
|
settings_path = config_dir / 'settings.json'
|
||
|
try:
|
||
|
with open(settings_path, 'r') as file:
|
||
|
setting_cache = json.loads(file.read())
|
||
|
return deepcopy(settings_cache or {})
|
||
|
except Exception as err:
|
||
|
logger.exception(err)
|
||
|
return {}
|
||
|
|
||
|
|
||
|
async def store_settings(request):
|
||
|
"""
|
||
|
Extract settings from POST data and store them.
|
||
|
|
||
|
Also change global `snap_device` to the selected one.
|
||
|
"""
|
||
|
try:
|
||
|
data = await request.post()
|
||
|
settings_ = get_settings()
|
||
|
settings = {}
|
||
|
|
||
|
# device
|
||
|
device_id = data.get('device_id')
|
||
|
device_data = None
|
||
|
devices = [list(x) for x in settings_.get('devices')] or []
|
||
|
for device_data_ in devices:
|
||
|
if device_id == device_data_[0]:
|
||
|
device_data = device_data_.copy()
|
||
|
global snap_device
|
||
|
if device_id != settings_.get('device_id') or snap_device is None:
|
||
|
settings['device_id'] = device_id
|
||
|
snap_device = sane.open(device_id)
|
||
|
|
||
|
# device_settings
|
||
|
device_settings = {}
|
||
|
if paper_size := data.get('paper_size'): # for setting the scan area
|
||
|
device_settings['paper_size'] = paper_size
|
||
|
if mode := data.get('mode'):
|
||
|
device_settings['mode'] = mode
|
||
|
if resolution := data.get('resolution'):
|
||
|
device_settings['resolution'] = int(resolution)
|
||
|
device_settings['snap'] = data.get('snap')
|
||
|
|
||
|
# collection
|
||
|
collection_choice = Path(settings_.get('collection_choice', '.'))
|
||
|
if collection_new := str(data.get('collection_new')):
|
||
|
p = archive_dir / collection_choice / collection_new
|
||
|
p.mkdir(mode=0o700, parents=True, exist_ok=True)
|
||
|
settings['collection_choice'] = str(collection_choice / collection_new)
|
||
|
if collection_description := data.get('collection_description'):
|
||
|
settings['collection_description'] = collection_description
|
||
|
|
||
|
# (duplex) page number
|
||
|
if dpage_number := data.get('dpage_number'):
|
||
|
try:
|
||
|
dpage_number = int(dpage_number)
|
||
|
except:
|
||
|
dpage_number = None
|
||
|
if dpage_number in (None, ''):
|
||
|
max_ = 0
|
||
|
for name in archive_dir.glob('*'):
|
||
|
try:
|
||
|
max_ = max(max_, int(name[:4]))
|
||
|
except:
|
||
|
continue
|
||
|
dpage_number = max_ + 1
|
||
|
settings['dpage_number'] = dpage_number
|
||
|
|
||
|
# target
|
||
|
settings['target'] = data.get('target', 'a')
|
||
|
|
||
|
# save settings, if changed
|
||
|
settings_old = deepcopy(settings_)
|
||
|
settings_.update(settings)
|
||
|
if 'device_settings' not in settings_:
|
||
|
settings_['device_settings'] = {}
|
||
|
if device_id and device_id not in settings_['device_settings']:
|
||
|
settings_['device_settings'][device_id] = {}
|
||
|
settings_['device_settings'][device_id].update(device_settings)
|
||
|
if settings_old != settings_:
|
||
|
settings_path = config_dir / 'settings.json'
|
||
|
with open(settings_path, 'w') as file:
|
||
|
file.write(json.dumps(settings_, indent=4))
|
||
|
global settings_cache
|
||
|
settings_cache = deepcopy(settings_)
|
||
|
return settings_
|
||
|
except Exception as err:
|
||
|
logger.exception(err)
|
||
|
return {}
|
||
|
|
||
|
|
||
|
def get_params(settings=None):
|
||
|
"""
|
||
|
Return params required for main template (snaps.html).
|
||
|
|
||
|
Includes settings and image names.
|
||
|
"""
|
||
|
if settings is None:
|
||
|
settings = get_settings()
|
||
|
images = []
|
||
|
image_path = archive_dir / settings.get('collection_choice', '.')
|
||
|
for path in image_path.glob('*'):
|
||
|
if path.is_file():
|
||
|
name = path.with_suffix('').name
|
||
|
images.append(name)
|
||
|
device_id = settings.get('device_id')
|
||
|
device_settings = settings.get('device_settings', {}).get(device_id, {})
|
||
|
collection_choices = [str(d.relative_to(archive_dir))
|
||
|
for d in archive_dir.glob('**') if d.is_dir()]
|
||
|
collection_choices.sort(key=lambda x: x.lower())
|
||
|
return {
|
||
|
'devices': settings.get('devices', []),
|
||
|
'device_id': device_id,
|
||
|
'paper_sizes': ['DIN A4 (left)', 'DIN A5 (left)', 'DIN A5 (centered)', 'Letter', '115x158mm', '145x420mm', '157x240mm', '170x240mm', '210x440mm'],
|
||
|
'paper_size': device_settings.get('paper_size', 'DIN A4 (left)'),
|
||
|
'modes': device_settings.get('modes', []),
|
||
|
'mode': device_settings.get('mode', 'Gray'),
|
||
|
'resolutions': device_settings.get('resolutions', []),
|
||
|
'resolution': device_settings.get('resolution', 300),
|
||
|
'sources': device_settings.get('sources', []),
|
||
|
'collection_choices': collection_choices,
|
||
|
'collection_choice': settings.get('collection_choice', '.'),
|
||
|
'collection_description': settings.get('collection_description', ''),
|
||
|
'dpage_number': settings.get('dpage_number', 0) + 1,
|
||
|
'target': settings.get('target', 'a'),
|
||
|
'images': sorted(images),
|
||
|
}
|
||
|
|
||
|
|
||
|
@aiohttp_jinja2.template('rsnaps.html')
|
||
|
async def detect(request):
|
||
|
"""
|
||
|
Detect available devices and store them in settings.
|
||
|
"""
|
||
|
sane.exit()
|
||
|
sane.init()
|
||
|
devices = sane.get_devices()
|
||
|
sane.exit()
|
||
|
update_settings({'devices': devices})
|
||
|
raise web.HTTPFound('/')
|
||
|
|
||
|
|
||
|
@aiohttp_jinja2.template('rsnaps.html')
|
||
|
async def collection(request):
|
||
|
data = await request.post()
|
||
|
collection_choice = data.get('collection_choice', '.')
|
||
|
update_settings({'collection_choice': collection_choice})
|
||
|
raise web.HTTPFound('/')
|
||
|
|
||
|
|
||
|
@aiohttp_jinja2.template('rsnaps.html')
|
||
|
async def collection_delete(request):
|
||
|
data = await request.post()
|
||
|
collection_choice = data.get('collection_choice', '.')
|
||
|
if collection_choice and collection_choice != '.':
|
||
|
image_dir = archive_dir / collection_choice
|
||
|
remove_dir(image_dir)
|
||
|
thumbnail_dir = cache_dir_small / collection_choice
|
||
|
remove_dir(thumbnail_dir)
|
||
|
else:
|
||
|
print(76576567373, collection_choice) # TODO
|
||
|
update_settings({'collection_choice': '.'})
|
||
|
raise web.HTTPFound('/')
|
||
|
|
||
|
|
||
|
def remove_dir(dir_path):
|
||
|
if not dir_path.is_dir():
|
||
|
return
|
||
|
has_subdirs = False
|
||
|
for p in dir_path.iterdir():
|
||
|
if p.is_file():
|
||
|
p.unlink()
|
||
|
else:
|
||
|
has_subdirs = True
|
||
|
if not has_subdirs:
|
||
|
dir_path.rmdir()
|
||
|
|
||
|
|
||
|
@aiohttp_jinja2.template('rsnaps.html')
|
||
|
async def device(request):
|
||
|
data = await request.post()
|
||
|
device_id = data.get('device_id')
|
||
|
global snap_device
|
||
|
if device_id:
|
||
|
try:
|
||
|
snap_device = sane.open(device_id)
|
||
|
except:
|
||
|
try:
|
||
|
sane.exit()
|
||
|
if ':libusb:' in device_id:
|
||
|
bus_devnum = device_id[-7:]
|
||
|
# get usb_ids
|
||
|
cmd = ['lsusb', '-s', bus_devnum]
|
||
|
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
||
|
out, _ = process.communicate()
|
||
|
usb_ids = out.decode('utf-8').split(' ')[5]
|
||
|
# do usbreset
|
||
|
cmd = ['usbreset', usb_ids]
|
||
|
subprocess.run(cmd)
|
||
|
sane.init()
|
||
|
try:
|
||
|
snap_device = sane.open(device_id)
|
||
|
except Exception as err:
|
||
|
print(f'Error opening device {device_id}: {err}')
|
||
|
snap_device = None
|
||
|
except:
|
||
|
print(f'Error on sane exit and init')
|
||
|
snap_device = None
|
||
|
else:
|
||
|
snap_device = None
|
||
|
if snap_device:
|
||
|
update_settings({'device_id': device_id})
|
||
|
else:
|
||
|
update_settings({'device_id': None})
|
||
|
|
||
|
# get and store device constraints
|
||
|
if snap_device:
|
||
|
modes = snap_device['mode'].constraint
|
||
|
resolutions = snap_device['resolution'].constraint
|
||
|
sources = snap_device['source'].constraint
|
||
|
device_settings = {}
|
||
|
device_settings['modes'] = modes
|
||
|
device_settings['resolutions'] = resolutions
|
||
|
device_settings['sources'] = sources
|
||
|
settings = get_settings()
|
||
|
if 'device_settings' not in settings:
|
||
|
settings['device_settings'] = {}
|
||
|
if device_id not in settings['device_settings']:
|
||
|
settings['device_settings'][device_id] = {}
|
||
|
settings['device_settings'][device_id].update(device_settings)
|
||
|
update_settings({'device_settings': settings['device_settings']})
|
||
|
|
||
|
params = snap_device.get_parameters()
|
||
|
logger.info(f'Scanner device parameters: {params}')
|
||
|
# logger.info(f'Scanner device options: {snap_device.optlist}')
|
||
|
# logger.info(dir(snap_device))
|
||
|
|
||
|
# for o in snap_device.optlist:
|
||
|
# try:
|
||
|
# print(snap_device[o], dir(snap_device[o]))
|
||
|
# print(o, snap_device[o].constraint)
|
||
|
# except:
|
||
|
# pass
|
||
|
|
||
|
raise web.HTTPFound('/')
|
||
|
|
||
|
|
||
|
@aiohttp_jinja2.template('rsnaps.html')
|
||
|
async def rsnaps(request):
|
||
|
"""
|
||
|
Just display main view.
|
||
|
"""
|
||
|
return get_params()
|
||
|
|
||
|
|
||
|
@aiohttp_jinja2.template('rsnaps.html')
|
||
|
async def snap(request):
|
||
|
"""
|
||
|
Perform a snap.
|
||
|
"""
|
||
|
settings = await store_settings(request)
|
||
|
try:
|
||
|
snap_page(settings)
|
||
|
except:
|
||
|
logger.exception(f'FAIL: snap_page({settings})')
|
||
|
return get_params(settings)
|
||
|
|
||
|
|
||
|
async def image_delete(request):
|
||
|
"""
|
||
|
Delete the named image.
|
||
|
"""
|
||
|
name = request.match_info['name']
|
||
|
settings = get_settings()
|
||
|
image_dir = archive_dir / settings.get('collection_choice', '.')
|
||
|
image_file = image_dir / f'{name}.png'
|
||
|
if image_file.is_file():
|
||
|
image_file.unlink()
|
||
|
image_dir_small = cache_dir_small / settings.get('collection_choice', '.')
|
||
|
image_file_small = image_dir_small / f'{name}.png'
|
||
|
if image_file_small.is_file():
|
||
|
image_file_small.unlink()
|
||
|
raise web.HTTPFound('/')
|
||
|
|
||
|
|
||
|
async def image(request):
|
||
|
"""
|
||
|
Display the named image.
|
||
|
"""
|
||
|
name = request.match_info['name']
|
||
|
settings = get_settings()
|
||
|
image_dir = archive_dir / settings.get('collection_choice', '.')
|
||
|
try:
|
||
|
with open(image_dir / f'{name}.png', 'rb') as file:
|
||
|
img_content = file.read()
|
||
|
return web.Response(body=img_content, content_type='image/png')
|
||
|
except Exception as err:
|
||
|
logger.exception('Image not found')
|
||
|
raise web.HTTPNotFound(text='The image does not exist.')
|
||
|
|
||
|
|
||
|
async def image_small(request):
|
||
|
"""
|
||
|
Display the named thumbnail / small image.
|
||
|
"""
|
||
|
name = request.match_info['name']
|
||
|
settings = get_settings()
|
||
|
image_dir_small = cache_dir_small / settings.get('collection_choice', '.')
|
||
|
image_file_small = image_dir_small / f'{name}.png'
|
||
|
try:
|
||
|
with open(image_file_small, 'rb') as file:
|
||
|
img_content = file.read()
|
||
|
return web.Response(body=img_content, content_type='image/png')
|
||
|
except Exception:
|
||
|
image_dir = archive_dir / settings.get('collection_choice', '.')
|
||
|
image_file = image_dir / f'{name}.png'
|
||
|
if not image_file.is_file():
|
||
|
logger.exception('Image not found')
|
||
|
raise web.HTTPNotFound(text='The image does not exist.')
|
||
|
|
||
|
if not image_dir_small.is_dir():
|
||
|
image_dir_small.mkdir(mode=0o700, parents=True)
|
||
|
with Image.open(image_file) as img:
|
||
|
img_small = img.resize((thumbnail_width, thumbnail_height), Image.BICUBIC)
|
||
|
img_small.save(image_dir_small / f'{name}.png')
|
||
|
|
||
|
|
||
|
async def pages_operation(request):
|
||
|
"""
|
||
|
Perform the requested operation on pages, e.g. PDF generation.
|
||
|
"""
|
||
|
data = await request.post()
|
||
|
if not (pages := parse_pages(data.get('pages'))):
|
||
|
raise web.HTTPFound('/')
|
||
|
operation = data.get('operation')
|
||
|
if operation == 'delete':
|
||
|
await delete_pages(pages)
|
||
|
raise web.HTTPFound('/')
|
||
|
elif operation == 'rotate180':
|
||
|
await rotate_pages(pages, angle=180)
|
||
|
raise web.HTTPFound('/')
|
||
|
else:
|
||
|
ocr = operation == 'pdf_ocr'
|
||
|
lossy = data.get('lossy') == 'lossy'
|
||
|
return await create_pdf(pages, ocr=ocr, lossy=lossy)
|
||
|
|
||
|
|
||
|
def parse_pages(pages_):
|
||
|
"""
|
||
|
Input cleaning: Filter `pages_` for existing ones.
|
||
|
|
||
|
Return a list of pages, retaining the requested sort order.
|
||
|
"""
|
||
|
pages = []
|
||
|
if not pages_:
|
||
|
return pages
|
||
|
for pr in (prs := str(pages_).split(',')):
|
||
|
if '-' in pr:
|
||
|
start_, end_ = pr.split('-', 1)
|
||
|
start_ = start_.strip()
|
||
|
end_ = end_.strip()
|
||
|
if start_.endswith('a'):
|
||
|
start_t = 'a'
|
||
|
start_ = start_[:-1]
|
||
|
elif start_.endswith('b'):
|
||
|
start_t = 'b'
|
||
|
start_ = start_[:-1]
|
||
|
else:
|
||
|
start_t = ''
|
||
|
start = int(start_)
|
||
|
if end_.endswith('a'):
|
||
|
end_t = 'a'
|
||
|
end_ = end_[:-1]
|
||
|
elif end_.endswith('b'):
|
||
|
end_t = 'b'
|
||
|
end_ = end_[:-1]
|
||
|
else:
|
||
|
end_t = ''
|
||
|
end = int(end_)
|
||
|
else:
|
||
|
pr = pr.strip()
|
||
|
if pr.endswith('a'):
|
||
|
pr_t = 'a'
|
||
|
pr = pr[:-1]
|
||
|
elif pr.endswith('b'):
|
||
|
pr_t = 'b'
|
||
|
pr = pr[:-1]
|
||
|
else:
|
||
|
pr_t = ''
|
||
|
start = end = int(pr)
|
||
|
start_t = end_t = pr_t
|
||
|
for ind in range(start, end + 1):
|
||
|
if ind == start:
|
||
|
if not start_t or start_t == 'a':
|
||
|
pages.append(f'{ind:04d}a')
|
||
|
if not (ind == end and end_t == 'a'):
|
||
|
pages.append(f'{ind:04d}b')
|
||
|
elif ind == end:
|
||
|
if not (ind == start and start_t == 'b'):
|
||
|
pages.append(f'{ind:04d}a')
|
||
|
if not end_t or end_t == 'b':
|
||
|
pages.append(f'{ind:04d}b')
|
||
|
else:
|
||
|
pages.append(f'{ind:04d}a')
|
||
|
pages.append(f'{ind:04d}b')
|
||
|
# filter by existing pages
|
||
|
pages_set = set(pages)
|
||
|
image_dir = archive_dir / get_settings().get('collection_choice', '.')
|
||
|
existing = set([p.stem for p in image_dir.iterdir()])
|
||
|
common = pages_set & existing
|
||
|
return [page for page in pages if page in common]
|
||
|
|
||
|
|
||
|
async def delete_pages(pages):
|
||
|
"""
|
||
|
Delete the given `pages`, i.e. images and small images.
|
||
|
"""
|
||
|
image_dir = archive_dir / get_settings().get('collection_choice', '.')
|
||
|
image_dir_small = cache_dir_small / get_settings().get('collection_choice', '.')
|
||
|
for page in pages:
|
||
|
p = image_dir / f'{page}.png'
|
||
|
p.unlink(missing_ok=True)
|
||
|
p = image_dir_small / f'{page}.png'
|
||
|
p.unlink(missing_ok=True)
|
||
|
|
||
|
|
||
|
async def rotate_pages(pages, angle=0):
|
||
|
"""
|
||
|
Rotate the `pages` by `angle`, i.e. images and small images.
|
||
|
"""
|
||
|
image_dir = archive_dir / get_settings().get('collection_choice', '.')
|
||
|
image_dir_small = cache_dir_small / get_settings().get('collection_choice', '.')
|
||
|
for page in pages:
|
||
|
p = image_dir / f'{page}.png'
|
||
|
cmd = [
|
||
|
'mogrify',
|
||
|
'-rotate',
|
||
|
str(angle),
|
||
|
str(p),
|
||
|
]
|
||
|
subprocess.run(cmd, cwd=cache_dir_pdf)
|
||
|
p = image_dir_small / f'{page}.png'
|
||
|
cmd = [
|
||
|
'mogrify',
|
||
|
'-rotate',
|
||
|
str(angle),
|
||
|
str(p),
|
||
|
]
|
||
|
subprocess.run(cmd, cwd=cache_dir_pdf)
|
||
|
|
||
|
|
||
|
async def create_pdf(pages, lossy: bool = False, ocr: bool = False):
|
||
|
"""
|
||
|
Create and return a PDF from the given pages.
|
||
|
|
||
|
If `lossy` is True, use jbig2 for compression.
|
||
|
|
||
|
If `ocr` is True, perform OCR using `ocrmypdf`.
|
||
|
"""
|
||
|
for p in cache_dir_pdf.iterdir():
|
||
|
p.unlink()
|
||
|
|
||
|
img_paths = [str(archive_dir / f'{page}.png') for page in pages]
|
||
|
|
||
|
if lossy:
|
||
|
cmd = [
|
||
|
str(path_jbig2),
|
||
|
'-s',
|
||
|
'-p',
|
||
|
'-a',
|
||
|
'-v',
|
||
|
'-4',
|
||
|
] + img_paths
|
||
|
logger.debug(' '.join([str(x) for x in cmd]))
|
||
|
subprocess.run(cmd, cwd=cache_dir_pdf)
|
||
|
|
||
|
cmd = [
|
||
|
'/usr/bin/python3',
|
||
|
str(path_pdf_py),
|
||
|
'output',
|
||
|
]
|
||
|
logger.debug(' '.join(cmd))
|
||
|
subprocess.run(cmd, cwd=cache_dir_pdf, capture_output=True)
|
||
|
#with open(cache_dir_pdf / 'x.pdf', 'wb') as file:
|
||
|
# file.write(result.stdout)
|
||
|
else:
|
||
|
# create a PDF file using img2pdf
|
||
|
cmd = [
|
||
|
'img2pdf',
|
||
|
'--pagesize',
|
||
|
'A4',
|
||
|
'-o',
|
||
|
str(cache_dir_pdf / 'o1.pdf'),
|
||
|
] + img_paths
|
||
|
logger.debug(' '.join([str(x) for x in cmd]))
|
||
|
subprocess.run(cmd, cwd=cache_dir_pdf)
|
||
|
|
||
|
# optimize images and linearize the pdf file using qpdf
|
||
|
cmd = [
|
||
|
'qpdf',
|
||
|
'--optimize-images',
|
||
|
'--linearize',
|
||
|
'--compress-streams=y',
|
||
|
'--object-streams=generate',
|
||
|
'--recompress-flate',
|
||
|
str(cache_dir_pdf / 'o1.pdf'),
|
||
|
str(cache_dir_pdf / 'o.pdf'),
|
||
|
]
|
||
|
logger.debug(' '.join([str(x) for x in cmd]))
|
||
|
subprocess.run(cmd, cwd=cache_dir_pdf)
|
||
|
|
||
|
if ocr:
|
||
|
cmd = [
|
||
|
'ocrmypdf',
|
||
|
'-d',
|
||
|
'-O',
|
||
|
'3',
|
||
|
'-l',
|
||
|
'+'.join(ocr_languages),
|
||
|
'--output-type',
|
||
|
'pdf',
|
||
|
cache_dir_pdf / 'o.pdf',
|
||
|
cache_dir_pdf / 'ocr.pdf',
|
||
|
]
|
||
|
logger.debug(' '.join([str(x) for x in cmd]))
|
||
|
subprocess.run(cmd, cwd=cache_dir_pdf)
|
||
|
result_file = cache_dir_pdf / 'ocr.pdf'
|
||
|
else:
|
||
|
result_file = cache_dir_pdf / 'o.pdf'
|
||
|
|
||
|
# return result
|
||
|
with open(result_file, 'rb') as file:
|
||
|
result_content = file.read()
|
||
|
return web.Response(body=result_content, content_type='application/pdf')
|
||
|
|
||
|
|
||
|
app = web.Application()
|
||
|
app.add_routes([
|
||
|
web.get('/', rsnaps),
|
||
|
web.post('/detect', detect),
|
||
|
web.post('/collection', collection),
|
||
|
web.post('/collection/delete', collection_delete),
|
||
|
web.post('/device', device),
|
||
|
web.post('/snap', snap),
|
||
|
web.post('/image-delete/{name}', image_delete),
|
||
|
web.get('/image/{name}', image),
|
||
|
web.get('/image-small/{name}', image_small),
|
||
|
web.post('/pages-operation', pages_operation),
|
||
|
])
|
||
|
aiohttp_jinja2.setup(
|
||
|
app,
|
||
|
loader=jinja2.FileSystemLoader(app_basedir / 'templates'),
|
||
|
)
|
||
|
app.router.add_static('/static', app_basedir / 'static')
|
||
|
|
||
|
|
||
|
# snap data
|
||
|
|
||
|
|
||
|
def snap_page(settings):
|
||
|
"""
|
||
|
Set device options.
|
||
|
"""
|
||
|
device_id = settings.get('device_id')
|
||
|
device_settings = settings.get('device_settings', {}).get(device_id, {})
|
||
|
|
||
|
# set source before mode and resolution!
|
||
|
global snap_device
|
||
|
snap_device.source = device_settings.get('snap')
|
||
|
snap_device.mode = device_settings.get('mode')
|
||
|
snap_device.resolution = device_settings.get('resolution')
|
||
|
|
||
|
if device_settings.get('snap') in ('Automatic Document Feeder', 'ADF Front'):
|
||
|
scan_adf(snap_device, settings)
|
||
|
if device_settings.get('snap') == 'ADF Duplex':
|
||
|
scan_adf_duplex(snap_device, settings)
|
||
|
else:
|
||
|
scan_page(snap_device, settings)
|
||
|
|
||
|
|
||
|
def scan_page(snap_device, settings):
|
||
|
"""
|
||
|
Scan a single page from source 'Flatbed'.
|
||
|
"""
|
||
|
set_scan_area(settings)
|
||
|
snap_device.start()
|
||
|
img = snap_device.snap()
|
||
|
store_image(img, settings)
|
||
|
|
||
|
|
||
|
def scan_adf(snap_device, settings):
|
||
|
"""
|
||
|
Scan pages from source 'Automatic Document Feeder'.
|
||
|
"""
|
||
|
set_scan_area(settings)
|
||
|
direction = -1 if settings.get('target') == 'b' else 1
|
||
|
img_i = 0
|
||
|
dpage_number = settings.get('dpage_number', 1)
|
||
|
if direction == -1:
|
||
|
dpage_number -= 1
|
||
|
while True:
|
||
|
try:
|
||
|
snap_device.start()
|
||
|
img = snap_device.snap(True)
|
||
|
if not isinstance(img, Image.Image):
|
||
|
break
|
||
|
settings['dpage_number'] = dpage_number + direction * img_i
|
||
|
store_image(img, settings)
|
||
|
img_i += 1
|
||
|
except Exception as e:
|
||
|
if str(e) == 'Document feeder out of documents':
|
||
|
return
|
||
|
else:
|
||
|
print(e)
|
||
|
# snap_device.close()
|
||
|
# device_id = settings['device_id']
|
||
|
# snap_device = sane.open(device_id)
|
||
|
break
|
||
|
|
||
|
# TODO: maybe `adf_mode` can be set to `simplex`? see `--adf-mode` in http://sane-project.org/man/sane-epsonds.5.html
|
||
|
|
||
|
# for img in snap_device.multi_scan():
|
||
|
# if not isinstance(img, Image.Image):
|
||
|
# break
|
||
|
# settings['dpage_number'] = dpage_number + direction * img_i
|
||
|
# store_image(img, settings)
|
||
|
# img_i += 1
|
||
|
|
||
|
|
||
|
def scan_adf_duplex(snap_device, settings):
|
||
|
"""
|
||
|
Scan pages from source 'ADF Duplex' (a DADF scanner).
|
||
|
"""
|
||
|
set_scan_area(settings)
|
||
|
img_i = 0
|
||
|
dpage_number = settings.get('dpage_number', 1)
|
||
|
side = 'b'
|
||
|
for img in snap_device.multi_scan():
|
||
|
if not isinstance(img, Image.Image):
|
||
|
continue
|
||
|
side = 'a' if side == 'b' else 'b'
|
||
|
settings['target'] = side
|
||
|
settings['dpage_number'] = dpage_number + img_i // 2
|
||
|
store_image(img, settings)
|
||
|
img_i += 1
|
||
|
|
||
|
|
||
|
def set_scan_area(settings):
|
||
|
"""
|
||
|
Set coordinates of the scan area, using device params and paper size.
|
||
|
"""
|
||
|
device_id = settings.get('device_id')
|
||
|
device_settings = settings.get('device_settings', {}).get(device_id, {})
|
||
|
paper_size = device_settings.get('paper_size', '')
|
||
|
if paper_size.startswith('DIN A5'):
|
||
|
paper_width_mm = 148
|
||
|
paper_height_mm = 210
|
||
|
elif paper_size.startswith('Letter'):
|
||
|
paper_width_mm = 216
|
||
|
paper_height_mm = 279
|
||
|
elif paper_size.startswith('115x158mm'):
|
||
|
paper_width_mm = 115
|
||
|
paper_height_mm = 158
|
||
|
elif paper_size.startswith('145x420mm'):
|
||
|
paper_width_mm = 145
|
||
|
paper_height_mm = 420
|
||
|
elif paper_size.startswith('157x240mm'):
|
||
|
paper_width_mm = 157
|
||
|
paper_height_mm = 240
|
||
|
elif paper_size.startswith('170x240mm'):
|
||
|
paper_width_mm = 170
|
||
|
paper_height_mm = 240
|
||
|
elif paper_size.startswith('210x440mm'):
|
||
|
paper_width_mm = 210
|
||
|
paper_height_mm = 440
|
||
|
else:
|
||
|
paper_width_mm = 210
|
||
|
paper_height_mm = 297
|
||
|
scan_width_mm = snap_device['tl_x'].constraint[1]
|
||
|
if scan_width_mm > paper_width_mm:
|
||
|
if 'centered' in paper_size:
|
||
|
offset_hl = offset_hr = (scan_width_mm - paper_width_mm) / 2
|
||
|
else:
|
||
|
offset_hl = 0
|
||
|
offset_hr = scan_width_mm - paper_width_mm
|
||
|
snap_device.tl_x = offset_hl
|
||
|
snap_device.br_x = scan_width_mm - offset_hr
|
||
|
snap_device.tl_y = 0
|
||
|
snap_device.br_y = paper_height_mm
|
||
|
#print(paper_size, snap_device.tl_x,snap_device.tl_x,snap_device.br_x,snap_device.br_y)
|
||
|
|
||
|
|
||
|
def store_image(img, settings):
|
||
|
dpage_number = settings.get('dpage_number', 1)
|
||
|
target = settings.get('target', 'a')
|
||
|
img_name = f'{dpage_number:04d}{target}.png'
|
||
|
img.save(archive_dir / settings.get('collection_choice', '.') / img_name)
|
||
|
#img_small = img.resize((thumbnail_width, thumbnail_height), Image.BICUBIC)
|
||
|
# device_id = settings['device_id']
|
||
|
# device_settings = settings['device_settings'][device_id]
|
||
|
# mode = device_settings.get('mode', '?')
|
||
|
# resolution = device_settings.get('resolution', '?')
|
||
|
#collection_name = device_settings.get('collection_name', '').replace(' ', '_')
|
||
|
#img_small_name = f'{dpage_number:04d}{target}_{mode}_{resolution}_{collection_name}.png'
|
||
|
#img_small.save(cache_dir_small / img_small_name)
|
||
|
update_settings({'dpage_number': dpage_number})
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
if len(sys.argv) < 2:
|
||
|
print('Please give the archive basedir as argument 1.')
|
||
|
sys.exit(2)
|
||
|
init(sys.argv[1])
|
||
|
web.run_app(app, port=8066)
|