aboutsummaryrefslogtreecommitdiff
path: root/src_py
diff options
context:
space:
mode:
Diffstat (limited to 'src_py')
-rw-r--r--src_py/opcut/common.py115
-rw-r--r--src_py/opcut/csp.py117
-rw-r--r--src_py/opcut/main.py117
-rw-r--r--src_py/opcut/output.py26
-rw-r--r--src_py/opcut/server.py32
5 files changed, 299 insertions, 108 deletions
diff --git a/src_py/opcut/common.py b/src_py/opcut/common.py
index c7884e8..184537f 100644
--- a/src_py/opcut/common.py
+++ b/src_py/opcut/common.py
@@ -3,23 +3,27 @@ import enum
from opcut import util
-State = util.namedtuple(
- 'State',
+Params = util.namedtuple(
+ 'Params',
['cut_width', 'float'],
['panels', 'List[Panel]'],
- ['items', 'List[Item]'],
+ ['items', 'List[Item]'])
+
+Result = util.namedtuple(
+ 'Result',
+ ['params', 'Params'],
['used', 'List[Used]'],
['unused', 'List[Unused]'])
Panel = util.namedtuple(
'Panel',
- ['id', 'Any'],
+ ['id', 'str'],
['width', 'float'],
['height', 'float'])
Item = util.namedtuple(
'Item',
- ['id', 'Any'],
+ ['id', 'str'],
['width', 'float'],
['height', 'float'],
['can_rotate', 'bool'])
@@ -46,4 +50,103 @@ Method = enum.Enum('Method', [
class UnresolvableError(Exception):
- """Exception raised when State is not solvable"""
+ """Exception raised when Result is not solvable"""
+
+
+def params_to_json_data(params):
+ """Convert params to json serializable data specified by
+ ``opcut://params.yaml#``
+
+ Args:
+ params (Params): params
+
+ Returns:
+ Any: json serializble data
+
+ """
+ return {'cut_width': params.cut_width,
+ 'panels': {panel.id: {'width': panel.width,
+ 'height': panel.height}
+ for panel in params.panels},
+ 'items': {item.id: {'width': item.width,
+ 'height': item.height,
+ 'can_rotate': item.can_rotate}
+ for item in params.items}}
+
+
+def json_data_to_params(json_data):
+ """Convert json serializable data specified by ``opcut://params.yaml#``
+ to params
+
+ Args:
+ json_data (Any): json serializable data
+
+ Returns:
+ Params
+
+ """
+ return Params(cut_width=json_data['cut_width'],
+ panels=[Panel(id=k,
+ width=v['width'],
+ height=v['height'])
+ for k, v in json_data['panels'].items()],
+ items=[Item(id=k,
+ width=v['width'],
+ height=v['height'],
+ can_rotate=v['can_rotate'])
+ for k, v in json_data['items'].items()])
+
+
+def result_to_json_data(result):
+ """Convert result to json serializable data specified by
+ ``opcut://result.yaml#``
+
+ Args:
+ result (Result): result
+
+ Returns:
+ Any: json serializble data
+
+ """
+ return {'params': params_to_json_data(result.params),
+ 'used': [{'panel': used.panel.id,
+ 'item': used.item.id,
+ 'x': used.x,
+ 'y': used.y,
+ 'rotate': used.rotate}
+ for used in result.used],
+ 'unused': [{'panel': unused.panel.id,
+ 'width': unused.width,
+ 'height': unused.height,
+ 'x': unused.x,
+ 'y': unused.y}
+ for unused in result.unused]}
+
+
+def json_data_to_result(json_data):
+ """Convert json serializable data specified by ``opcut://result.yaml#``
+ to result
+
+ Args:
+ json_data (Any): json serializable data
+
+ Returns:
+ Result
+
+ """
+ params = json_data_to_params(json_data['params'])
+ panels = {panel.id: panel for panel in params.panels}
+ items = {item.id: item for item in params.items}
+ return Result(params=params,
+ used=[Used(panel=panels[used['panel']],
+ item=items[used['item']],
+ x=used['x'],
+ y=used['y'],
+ rotate=used['rotate'])
+ for used in json_data['used']],
+ unused=[Unused(panel=panels[unused['panel']],
+ width=unused['width'],
+ height=unused['height'],
+ x=unused['x'],
+ y=unused['y'])
+ for unused in json_data['unused']])
diff --git a/src_py/opcut/csp.py b/src_py/opcut/csp.py
index 4d53d6e..b0e1387 100644
--- a/src_py/opcut/csp.py
+++ b/src_py/opcut/csp.py
@@ -3,103 +3,99 @@ import itertools
from opcut import common
-def calculate(panels, items, cut_width, method):
+def calculate(params, method):
"""Calculate cutting stock problem
Args:
- panels (List[common.Panel]): input panels
- items (List[common.Item]): input items
- cut_width (float): cut width
+ params (common.Params): input parameters
method (common.Method): calculation method
Returns:
- State
+ Result
"""
- state = common.State(
- cut_width=cut_width,
- panels=panels,
- items=items,
+ result = common.Result(
+ params=params,
used=[],
unused=[common.Unused(panel=panel,
width=panel.width,
height=panel.height,
x=0,
y=0)
- for panel in panels])
+ for panel in params.panels])
return {
common.Method.GREEDY: _calculate_greedy,
common.Method.FORWARD_GREEDY: _calculate_forward_greedy
- }[method](state)
+ }[method](result)
_fitness_K = 0.03
-def _calculate_greedy(state):
- while not _is_done(state):
- new_state = None
+def _calculate_greedy(result):
+ while not _is_done(result):
+ new_result = None
new_fitness = None
- for next_state in _get_next_states(state):
- next_state_fitness = _fitness(next_state)
- if new_fitness is None or next_state_fitness < new_fitness:
- new_state = next_state
- new_fitness = next_state_fitness
- if not new_state:
+ for next_result in _get_next_results(result):
+ next_result_fitness = _fitness(next_result)
+ if new_fitness is None or next_result_fitness < new_fitness:
+ new_result = next_result
+ new_fitness = next_result_fitness
+ if not new_result:
raise common.UnresolvableError()
- state = new_state
- return state
+ result = new_result
+ return result
-def _calculate_forward_greedy(state):
- while not _is_done(state):
- new_state = None
+def _calculate_forward_greedy(result):
+ while not _is_done(result):
+ new_result = None
new_fitness = None
- for next_state in _get_next_states(state):
+ for next_result in _get_next_results(result):
try:
- next_state_fitness = _fitness(_calculate_greedy(next_state))
+ next_result_fitness = _fitness(_calculate_greedy(next_result))
except common.UnresolvableError:
continue
- if new_fitness is None or next_state_fitness < new_fitness:
- new_state = next_state
- new_fitness = next_state_fitness
- if not new_state:
+ if new_fitness is None or next_result_fitness < new_fitness:
+ new_result = next_result
+ new_fitness = next_result_fitness
+ if not new_result:
raise common.UnresolvableError()
- state = new_state
- return state
+ result = new_result
+ return result
-def _get_next_states(state):
+def _get_next_results(result):
selected_item = None
- used_items = {used.item for used in state.used}
- for item in state.items:
- if item in used_items:
+ used_items = {used.item.id for used in result.used}
+ for item in result.params.items:
+ if item.id in used_items:
continue
if (not selected_item or
max(item.width, item.height) >
max(selected_item.width, selected_item.height)):
selected_item = item
if not selected_item:
- raise Exception('state is done')
- return _get_next_states_for_item(state, selected_item)
+ raise Exception('result is done')
+ return _get_next_results_for_item(result, selected_item)
-def _get_next_states_for_item(state, item):
+def _get_next_results_for_item(result, item):
ret = []
- loop_iter = ((False, i, unused) for i, unused in enumerate(state.unused))
+ loop_iter = ((False, i, unused) for i, unused in enumerate(result.unused))
if item.can_rotate:
loop_iter = itertools.chain(
loop_iter,
- ((True, i, unused) for i, unused in enumerate(state.unused)))
+ ((True, i, unused) for i, unused in enumerate(result.unused)))
for rotate, i, unused in loop_iter:
for vertical in [True, False]:
new_used, new_unused = _cut_item_from_unused(
- unused, item, rotate, state.cut_width, vertical)
+ unused, item, rotate, result.params.cut_width, vertical)
if not new_used:
continue
- ret.append(state._replace(
- used=state.used + [new_used],
- unused=state.unused[:i] + new_unused + state.unused[i+1:]))
+ ret.append(result._replace(
+ used=result.used + [new_used],
+ unused=result.unused[:i] + new_unused + result.unused[i+1:]))
return ret
@@ -133,20 +129,23 @@ def _cut_item_from_unused(unused, item, rotate, cut_width, vertical):
return used, new_unused
-def _is_done(state):
- return len(state.items) == len(state.used)
+def _is_done(result):
+ return len(result.params.items) == len(result.used)
-def _fitness(state):
- total_area = sum(panel.width * panel.height for panel in state.panels)
- result = 0
- for panel in state.panels:
+def _fitness(result):
+ total_area = sum(panel.width * panel.height
+ for panel in result.params.panels)
+ fitness = 0
+ for panel in result.params.panels:
used_areas = [used.item.width * used.item.height
- for used in state.used]
+ for used in result.used
+ if used.panel == panel]
unused_areas = [unused.width * unused.height
- for unused in state.unused]
- result += (panel.width * panel.height - sum(used_areas)) / total_area
- result -= (_fitness_K *
- min(used_areas, default=0) * max(unused_areas, default=0) /
- (total_area * total_area))
- return result
+ for unused in result.unused
+ if unused.panel == panel]
+ fitness += (panel.width * panel.height - sum(used_areas)) / total_area
+ fitness -= (_fitness_K *
+ min(used_areas, default=0) * max(unused_areas, default=0) /
+ (total_area * total_area))
+ return fitness
diff --git a/src_py/opcut/main.py b/src_py/opcut/main.py
index 9ca2e18..6758267 100644
--- a/src_py/opcut/main.py
+++ b/src_py/opcut/main.py
@@ -3,17 +3,20 @@ import argparse
import yaml
import logging.config
import urllib.parse
-import aiohttp.web
-import ssl
import asyncio
-import contextlib
import os.path
from opcut import util
+from opcut import common
import opcut.json_validator
+import opcut.csp
+import opcut.server
+import opcut.output
def main():
+ """Application main entry point"""
+
args = _create_parser().parse_args()
if args.log_conf_path:
@@ -22,59 +25,113 @@ def main():
opcut.json_validator.validate(log_conf, 'opcut://logging.yaml#')
logging.config.dictConfig(log_conf)
+ action_fn = {
+ 'server': server,
+ 'calculate': calculate,
+ 'output': output}.get(args.action, server)
+ action_fn(args)
+
+
+def server(args):
+ """Server main entry point
+
+ Args:
+ args: command line argument
+
+ """
+ if sys.platform == 'win32':
+ asyncio.set_event_loop(asyncio.ProactorEventLoop())
+
addr = urllib.parse.urlparse(args.ui_addr)
pem_path = args.ui_pem_path
ui_path = args.ui_path or os.path.join(os.path.dirname(__file__), 'web')
util.run_until_complete_without_interrupt(
- async_main(addr, pem_path, ui_path))
+ opcut.server.run(addr, pem_path, ui_path))
+
+def calculate(args):
+ """Calculate result and generate outputs
-async def async_main(addr, pem_path, ui_path):
+ Args:
+ args: command line argument
- if addr.scheme == 'https':
- ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ssl_ctx.load_cert_chain(pem_path)
- else:
- ssl_ctx = None
+ """
+ with open(args.params_path, 'r', encoding='utf-8') as f:
+ params_json_data = yaml.safe_load(f)
+ opcut.json_validator.validate(params_json_data, 'opcut://params.yaml#')
+ params = common.json_data_to_params(params_json_data)
+ result = opcut.csp.calculate(params, args.method)
+ result_json_data = common.result_to_json_data(result)
+ with open(args.result_path, 'w', encoding='utf-8') as f:
+ yaml.safe_dump(result_json_data, f,
+ indent=4, default_flow_style=False,
+ explicit_start=True, explicit_end=True)
+ output(args)
- app = aiohttp.web.Application()
- app.router.add_route('GET', '/',
- lambda req: aiohttp.web.HTTPFound('/index.html'))
- app.router.add_static('/', ui_path)
- app_handler = app.make_handler()
- srv = await asyncio.get_event_loop().create_server(
- app_handler, host=addr.hostname, port=addr.port, ssl=ssl_ctx)
+def output(args):
+ """Generate outputs based on calculation result
- with contextlib.suppress(asyncio.CancelledError):
- await asyncio.Future()
+ Args:
+ args: command line argument
- srv.close()
- await srv.wait_closed()
- await app.shutdown()
- await app_handler.finish_connections(0)
- await app.cleanup()
+ """
+ with open(args.result_path, 'r', encoding='utf-8') as f:
+ result_json_data = yaml.safe_load(f)
+ opcut.json_validator.validate(result_json_data, 'opcut://result.yaml#')
+ result = common.json_data_to_result(result_json_data)
+
+ if args.output_pdf_path:
+ pdf_bytes = opcut.output.generate_pdf(result)
+ with open(args.output_pdf_path, 'wb') as f:
+ f.write(pdf_bytes)
def _create_parser():
parser = argparse.ArgumentParser(prog='opcut')
parser.add_argument(
+ '--log', default=None, metavar='path', dest='log_conf_path',
+ help="logging configuration")
+ subparsers = parser.add_subparsers(title='actions', dest='action')
+
+ server = subparsers.add_parser('server', help='run web server')
+ server.add_argument(
'--ui-addr', default='http://0.0.0.0:8080',
metavar='addr', dest='ui_addr',
help="address of listening web ui socket formated as "
"'<type>://<host>:<port>' - <type> is 'http' or 'https'; "
"<host> is hostname; <port> is tcp port number "
"(default http://0.0.0.0:8080)")
- parser.add_argument(
+ server.add_argument(
'--ui-pem', default=None, metavar='path', dest='ui_pem_path',
help="web front-end pem file path - required for https")
- parser.add_argument(
+ server.add_argument(
'--ui-path', default=None, metavar='path', dest='ui_path',
- help="override path to front-end web app")
- parser.add_argument(
- '--log', default=None, metavar='path', dest='log_conf_path',
- help="logging configuration")
+ help="override path to front-end web app directory")
+
+ calculate = subparsers.add_parser('calculate', help='calculate result')
+ calculate.add_argument(
+ '--params', required=True, metavar='path', dest='params_path',
+ help="calculate parameters file path "
+ "(specified by opcut://params.yaml#)")
+ calculate.add_argument(
+ '--method', dest='method', type=common.Method,
+ default=common.Method.FORWARD_GREEDY,
+ choices=list(map(lambda x: x.name, common.Method)),
+ help="calculate method (default FORWARD_GREEDY)")
+
+ output = subparsers.add_parser('output', help='generate output')
+
+ for p in [calculate, output]:
+ p.add_argument(
+ '--result', required=True, metavar='path', dest='result_path',
+ help="calculate result file path "
+ "(specified by opcut://result.yaml#)")
+ p.add_argument(
+ '--output-pdf', default=None, metavar='path',
+ dest='output_pdf_path', help="optional PDF output file path")
+
return parser
diff --git a/src_py/opcut/output.py b/src_py/opcut/output.py
index bd1def1..311585e 100644
--- a/src_py/opcut/output.py
+++ b/src_py/opcut/output.py
@@ -6,13 +6,13 @@ from reportlab.lib.units import mm
from opcut import util
-def generate_pdf(state, pagesize_mm=(210, 297),
+def generate_pdf(result, pagesize_mm=(210, 297),
margin_top_mm=10, margin_bottom_mm=20,
margin_left_mm=10, margin_right_mm=10):
"""Generate PDF output
Args:
- state (opcut.common.State): state
+ result (opcut.common.Result): result
pagesize (Tuple[float,float]): page size as (with, height) in mm
margin_top_mm (float): margin top in mm
margin_bottom_mm (float): margin bottom in mm
@@ -31,17 +31,17 @@ def generate_pdf(state, pagesize_mm=(210, 297),
ret = io.BytesIO()
c = canvas.Canvas(ret, pagesize=pagesize)
c.setFillColorRGB(0.9, 0.9, 0.9)
- for panel in state.panels:
- _pdf_write_panel(c, pagesize, margin, state, panel)
+ for panel in result.params.panels:
+ _pdf_write_panel(c, pagesize, margin, result, panel)
c.save()
return ret.getvalue()
-def generate_csv(state):
+def generate_csv(result):
"""Generate CSV output
Args:
- state (opcut.common.State): state
+ result (opcut.common.Result): result
Returns:
bytes
@@ -53,7 +53,7 @@ def generate_csv(state):
_Margin = util.namedtuple('_Margin', 'top', 'right', 'bottom', 'left')
-def _pdf_write_panel(c, pagesize, margin, state, panel):
+def _pdf_write_panel(c, pagesize, margin, result, panel):
if (panel.width / panel.height >
(pagesize[0] - margin.left - margin.right) /
(pagesize[1] - margin.top - margin.bottom)):
@@ -72,21 +72,21 @@ def _pdf_write_panel(c, pagesize, margin, state, panel):
(margin.top + margin.bottom))
c.setFillColorRGB(0.5, 0.5, 0.5)
c.rect(x0, y0, width, height, stroke=1, fill=1)
- for used in state.used:
+ for used in result.used:
if used.panel != panel:
continue
- _pdf_write_used(c, x0, y0, scale, state, used)
- for unused in state.unused:
+ _pdf_write_used(c, x0, y0, scale, result, used)
+ for unused in result.unused:
if unused.panel != panel:
continue
- _pdf_write_unused(c, x0, y0, scale, state, unused)
+ _pdf_write_unused(c, x0, y0, scale, result, unused)
c.setFillColorRGB(0, 0, 0)
c.drawCentredString(pagesize[0] / 2, margin.bottom / 2,
panel.id)
c.showPage()
-def _pdf_write_used(c, x0, y0, scale, state, used):
+def _pdf_write_used(c, x0, y0, scale, result, used):
width = used.item.width * scale
height = used.item.height * scale
if used.rotate:
@@ -99,7 +99,7 @@ def _pdf_write_used(c, x0, y0, scale, state, used):
c.drawCentredString(x + width / 2, y + height / 2 - 6, used.item.id)
-def _pdf_write_unused(c, x0, y0, scale, state, unused):
+def _pdf_write_unused(c, x0, y0, scale, result, unused):
width = unused.width * scale
height = unused.height * scale
x = unused.x * scale + x0
diff --git a/src_py/opcut/server.py b/src_py/opcut/server.py
new file mode 100644
index 0000000..edd6b91
--- /dev/null
+++ b/src_py/opcut/server.py
@@ -0,0 +1,32 @@
+import ssl
+import contextlib
+import asyncio
+
+import aiohttp.web
+
+
+async def run(addr, pem_path, ui_path):
+
+ if addr.scheme == 'https':
+ ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ssl_ctx.load_cert_chain(pem_path)
+ else:
+ ssl_ctx = None
+
+ app = aiohttp.web.Application()
+ app.router.add_route('GET', '/',
+ lambda req: aiohttp.web.HTTPFound('/index.html'))
+ app.router.add_static('/', ui_path)
+ app_handler = app.make_handler()
+
+ srv = await asyncio.get_event_loop().create_server(
+ app_handler, host=addr.hostname, port=addr.port, ssl=ssl_ctx)
+
+ with contextlib.suppress(asyncio.CancelledError):
+ await asyncio.Future()
+
+ srv.close()
+ await srv.wait_closed()
+ await app.shutdown()
+ await app_handler.finish_connections(0)
+ await app.cleanup()