From 09995646017600b638dfdd4c993018fd65df8340 Mon Sep 17 00:00:00 2001 From: Pavle Portic Date: Thu, 26 Dec 2019 20:55:41 +0100 Subject: [PATCH] Add distributed version of the code --- .dockerignore | 12 +++ Dockerfile | 15 ++++ Pipfile | 4 +- Pipfile.lock | 146 ++++++++++++++++++++++++++++++++++++ distributed/app.py | 37 +++++++++ distributed/celeryconfig.py | 103 +++++++++++++++++++++++++ distributed/common.py | 22 ++++++ distributed/server.py | 59 +++++++++++++++ distributed/worker.py | 57 ++++++++++++++ docker-compose.yml | 35 +++++++++ seq.py | 1 - 11 files changed, 488 insertions(+), 3 deletions(-) create mode 100644 .dockerignore create mode 100644 Dockerfile create mode 100644 Pipfile.lock create mode 100644 distributed/app.py create mode 100644 distributed/celeryconfig.py create mode 100644 distributed/common.py create mode 100644 distributed/server.py create mode 100644 distributed/worker.py create mode 100644 docker-compose.yml diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..4aafb65 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,12 @@ +.git +.gitignore + +docker-compose.yml +Dockerfile +.dockerignore + +.editorconfig +LICENSE + +output +*.csv diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..dcad160 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.8-slim + +WORKDIR /code +COPY Pipfile Pipfile.lock ./ +RUN set -x \ + && pip install pipenv \ + && pipenv install --system --deploy + +CMD ["celery", "worker", "--app", "distributed.app", "-E", "-Ofair", "--loglevel=WARN"] + +ENV PYTHONPATH=/code \ + PYTHONUNBUFFERED="1" \ + C_FORCE_ROOT="1" + +COPY . ./ diff --git a/Pipfile b/Pipfile index 6e7adc6..b4043c8 100644 --- a/Pipfile +++ b/Pipfile @@ -6,8 +6,8 @@ verify_ssl = true [dev-packages] [packages] -matplotlib = "*" -scipy = "*" +scipy = "==1.4.1" +celery = {extras = ["librabbitmq", "redis"],version = "*"} [requires] python_version = "3" diff --git a/Pipfile.lock b/Pipfile.lock new file mode 100644 index 0000000..df80d94 --- /dev/null +++ b/Pipfile.lock @@ -0,0 +1,146 @@ +{ + "_meta": { + "hash": { + "sha256": "e6f3c835443673b396678699cc0401278a3c7827a9139dbdc05eff1fd769ecc6" + }, + "pipfile-spec": 6, + "requires": { + "python_version": "3" + }, + "sources": [ + { + "name": "pypi", + "url": "https://pypi.org/simple", + "verify_ssl": true + } + ] + }, + "default": { + "amqp": { + "hashes": [ + "sha256:6e649ca13a7df3faacdc8bbb280aa9a6602d22fd9d545336077e573a1f4ff3b8", + "sha256:77f1aef9410698d20eaeac5b73a87817365f457a507d82edf292e12cbb83b08d" + ], + "version": "==2.5.2" + }, + "billiard": { + "hashes": [ + "sha256:01afcb4e7c4fd6480940cfbd4d9edc19d7a7509d6ada533984d0d0f49901ec82", + "sha256:b8809c74f648dfe69b973c8e660bcec00603758c9db8ba89d7719f88d5f01f26" + ], + "version": "==3.6.1.0" + }, + "celery": { + "extras": [ + "librabbitmq", + "redis" + ], + "hashes": [ + "sha256:7c544f37a84a5eadc44cab1aa8c9580dff94636bb81978cdf9bf8012d9ea7d8f", + "sha256:d3363bb5df72d74420986a435449f3c3979285941dff57d5d97ecba352a0e3e2" + ], + "index": "pypi", + "version": "==4.4.0" + }, + "kombu": { + "hashes": [ + "sha256:2a9e7adff14d046c9996752b2c48b6d9185d0b992106d5160e1a179907a5d4ac", + "sha256:67b32ccb6fea030f8799f8fd50dd08e03a4b99464ebc4952d71d8747b1a52ad1" + ], + "version": "==4.6.7" + }, + "librabbitmq": { + "hashes": [ + "sha256:3116e40c02d4285b8dd69834e4cbcb1a89ea534ca9147e865f11d44e7cc56eea", + "sha256:5cdfb473573396d43d54cef9e9b4c74fa3d1516da51d04a7b261f6ef4e0bd8be", + "sha256:98e355f486964dadae7e8b51c9a60e9aa0653bbe27f6b14542687f305c4c3652", + "sha256:c2a8113d3c831808d1d940fdf43e4882636a1efe2864df7ab3bb709a45016b37", + "sha256:cd9cc09343b193d7cf2cff6c6a578061863bd986a4bdf38f922e9dc32e15d944", + "sha256:ffa2363a860ab5dcc3ce3703247e05e940c73d776c03a3f3f9deaf3cf43bb96c" + ], + "version": "==2.0.0" + }, + "numpy": { + "hashes": [ + "sha256:0a7a1dd123aecc9f0076934288ceed7fd9a81ba3919f11a855a7887cbe82a02f", + "sha256:0c0763787133dfeec19904c22c7e358b231c87ba3206b211652f8cbe1241deb6", + "sha256:3d52298d0be333583739f1aec9026f3b09fdfe3ddf7c7028cb16d9d2af1cca7e", + "sha256:43bb4b70585f1c2d153e45323a886839f98af8bfa810f7014b20be714c37c447", + "sha256:475963c5b9e116c38ad7347e154e5651d05a2286d86455671f5b1eebba5feb76", + "sha256:64874913367f18eb3013b16123c9fed113962e75d809fca5b78ebfbb73ed93ba", + "sha256:683828e50c339fc9e68720396f2de14253992c495fdddef77a1e17de55f1decc", + "sha256:6ca4000c4a6f95a78c33c7dadbb9495c10880be9c89316aa536eac359ab820ae", + "sha256:75fd817b7061f6378e4659dd792c84c0b60533e867f83e0d1e52d5d8e53df88c", + "sha256:7d81d784bdbed30137aca242ab307f3e65c8d93f4c7b7d8f322110b2e90177f9", + "sha256:8d0af8d3664f142414fd5b15cabfd3b6cc3ef242a3c7a7493257025be5a6955f", + "sha256:9679831005fb16c6df3dd35d17aa31dc0d4d7573d84f0b44cc481490a65c7725", + "sha256:a8f67ebfae9f575d85fa859b54d3bdecaeece74e3274b0b5c5f804d7ca789fe1", + "sha256:acbf5c52db4adb366c064d0b7c7899e3e778d89db585feadd23b06b587d64761", + "sha256:ada4805ed51f5bcaa3a06d3dd94939351869c095e30a2b54264f5a5004b52170", + "sha256:c7354e8f0eca5c110b7e978034cd86ed98a7a5ffcf69ca97535445a595e07b8e", + "sha256:e2e9d8c87120ba2c591f60e32736b82b67f72c37ba88a4c23c81b5b8fa49c018", + "sha256:e467c57121fe1b78a8f68dd9255fbb3bb3f4f7547c6b9e109f31d14569f490c3", + "sha256:ede47b98de79565fcd7f2decb475e2dcc85ee4097743e551fe26cfc7eb3ff143", + "sha256:f58913e9227400f1395c7b800503ebfdb0772f1c33ff8cb4d6451c06cabdf316", + "sha256:fe39f5fd4103ec4ca3cb8600b19216cd1ff316b4990f4c0b6057ad982c0a34d5" + ], + "version": "==1.17.4" + }, + "pytz": { + "hashes": [ + "sha256:1c557d7d0e871de1f5ccd5833f60fb2550652da6be2693c1e02300743d21500d", + "sha256:b02c06db6cf09c12dd25137e563b31700d3b80fcc4ad23abb7a315f2789819be" + ], + "version": "==2019.3" + }, + "redis": { + "hashes": [ + "sha256:3613daad9ce5951e426f460deddd5caf469e08a3af633e9578fc77d362becf62", + "sha256:8d0fc278d3f5e1249967cba2eb4a5632d19e45ce5c09442b8422d15ee2c22cc2" + ], + "version": "==3.3.11" + }, + "scipy": { + "hashes": [ + "sha256:00af72998a46c25bdb5824d2b729e7dabec0c765f9deb0b504f928591f5ff9d4", + "sha256:0902a620a381f101e184a958459b36d3ee50f5effd186db76e131cbefcbb96f7", + "sha256:1e3190466d669d658233e8a583b854f6386dd62d655539b77b3fa25bfb2abb70", + "sha256:2cce3f9847a1a51019e8c5b47620da93950e58ebc611f13e0d11f4980ca5fecb", + "sha256:3092857f36b690a321a662fe5496cb816a7f4eecd875e1d36793d92d3f884073", + "sha256:386086e2972ed2db17cebf88610aab7d7f6e2c0ca30042dc9a89cf18dcc363fa", + "sha256:71eb180f22c49066f25d6df16f8709f215723317cc951d99e54dc88020ea57be", + "sha256:770254a280d741dd3436919d47e35712fb081a6ff8bafc0f319382b954b77802", + "sha256:787cc50cab3020a865640aba3485e9fbd161d4d3b0d03a967df1a2881320512d", + "sha256:8a07760d5c7f3a92e440ad3aedcc98891e915ce857664282ae3c0220f3301eb6", + "sha256:8d3bc3993b8e4be7eade6dcc6fd59a412d96d3a33fa42b0fa45dc9e24495ede9", + "sha256:9508a7c628a165c2c835f2497837bf6ac80eb25291055f56c129df3c943cbaf8", + "sha256:a144811318853a23d32a07bc7fd5561ff0cac5da643d96ed94a4ffe967d89672", + "sha256:a1aae70d52d0b074d8121333bc807a485f9f1e6a69742010b33780df2e60cfe0", + "sha256:a2d6df9eb074af7f08866598e4ef068a2b310d98f87dc23bd1b90ec7bdcec802", + "sha256:bb517872058a1f087c4528e7429b4a44533a902644987e7b2fe35ecc223bc408", + "sha256:c5cac0c0387272ee0e789e94a570ac51deb01c796b37fb2aad1fb13f85e2f97d", + "sha256:cc971a82ea1170e677443108703a2ec9ff0f70752258d0e9f5433d00dda01f59", + "sha256:dba8306f6da99e37ea08c08fef6e274b5bf8567bb094d1dbe86a20e532aca088", + "sha256:dc60bb302f48acf6da8ca4444cfa17d52c63c5415302a9ee77b3b21618090521", + "sha256:dee1bbf3a6c8f73b6b218cb28eed8dd13347ea2f87d572ce19b289d6fd3fbc59" + ], + "index": "pypi", + "version": "==1.4.1" + }, + "six": { + "hashes": [ + "sha256:1f1b7d42e254082a9db6279deae68afb421ceba6158efa6131de7b3003ee93fd", + "sha256:30f610279e8b2578cab6db20741130331735c781b56053c59c4076da27f06b66" + ], + "version": "==1.13.0" + }, + "vine": { + "hashes": [ + "sha256:133ee6d7a9016f177ddeaf191c1f58421a1dcc6ee9a42c58b34bed40e1d2cd87", + "sha256:ea4947cc56d1fd6f2095c8d543ee25dad966f78692528e68b4fada11ba3f98af" + ], + "version": "==1.3.0" + } + }, + "develop": {} +} diff --git a/distributed/app.py b/distributed/app.py new file mode 100644 index 0000000..a0af7a6 --- /dev/null +++ b/distributed/app.py @@ -0,0 +1,37 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# Copyright © 2019 +# +# Distributed under terms of the BSD 3-Clause license. + +from celery import Celery +from celery.signals import worker_ready + + +app = Celery('distributed') +app.config_from_object('distributed.celeryconfig') + + +if app.conf.AM_I_SERVER: + @worker_ready.connect + def bootstrap(**kwargs): + from .server import distributed_chunk + + delay_time = 2 + print(f'Getting ready to automatically seed computations in {delay_time} seconds...') + kwargs = { + 'L1': app.conf.PENDULUM_L1, + 'L2': app.conf.PENDULUM_L2, + 'm1': app.conf.PENDULUM_M1, + 'm2': app.conf.PENDULUM_M2, + 'tmax': app.conf.PENDULUM_TMAX, + 'dt': app.conf.PENDULUM_DT, + 'theta_resolution': app.conf.PENDULUM_THETA_RESOLUTION, + } + distributed_chunk.apply_async(kwargs=kwargs, countdown=delay_time) + + +if __name__ == '__main__': + app.start() diff --git a/distributed/celeryconfig.py b/distributed/celeryconfig.py new file mode 100644 index 0000000..bfe5f6f --- /dev/null +++ b/distributed/celeryconfig.py @@ -0,0 +1,103 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# Copyright © 2019 +# +# Distributed under terms of the BSD 3-Clause license. + +from multiprocessing import cpu_count +import os + +from kombu import Queue + + +# Environment based settings +MAX_CPU_CORES = os.getenv('MAX_CPU_CORES', cpu_count()) +AM_I_SERVER = (os.getenv('COMPUTER_TYPE') == 'server') + +# Parameters +PENDULUM_L1 = int(os.getenv('PENDULUM_L1', 1)) +PENDULUM_L2 = int(os.getenv('PENDULUM_L2', 1)) +PENDULUM_M1 = int(os.getenv('PENDULUM_M1', 1)) +PENDULUM_M2 = int(os.getenv('PENDULUM_M2', 1)) +PENDULUM_TMAX = int(os.getenv('PENDULUM_TMAX', 30)) +PENDULUM_DT = float(os.getenv('PENDULUM_DT', 0.01)) +PENDULUM_THETA_RESOLUTION = int(os.getenv('PENDULUM_THETA_RESOLUTION', 6)) +PENDULUM_OUTPUT_FILE = os.getenv('PENDULUM_OUTPUT_FILE', '/output/dis.csv') + +# Concurrency settings +CELERYD_CONCURRENCY = MAX_CPU_CORES + +# This ensures that each worker will only take one task at a time, when combined +# with late acks. This is the recommended configuration for long-running tasks. +# References: +# * http://celery.readthedocs.org/en/latest/userguide/optimizing.html#prefetch-limits +# * http://celery.readthedocs.org/en/latest/userguide/optimizing.html#reserve-one-task-at-a-time +# * http://celery.readthedocs.org/en/latest/configuration.html#celeryd-prefetch-multiplier +# * http://stackoverflow.com/questions/16040039/understanding-celery-task-prefetching +# * https://bugs.launchpad.net/openquake-old/+bug/1092050 +# * https://wiredcraft.com/blog/3-gotchas-for-celery/ +# * http://www.lshift.net/blog/2015/04/30/making-celery-play-nice-with-rabbitmq-and-bigwig/ +CELERYD_PREFETCH_MULTIPLIER = 1 + + +# Task result backend settings +CELERY_RESULT_BACKEND = 'redis://redis' + +# Message Routing +CELERY_DEFAULT_QUEUE = 'worker' +CELERY_DEFAULT_EXCHANGE = 'tasks' +CELERY_DEFAULT_ROUTING_KEY = 'worker' + +if AM_I_SERVER: + CELERY_QUEUES = ( + Queue('server', routing_key='server'), + ) +else: + CELERY_QUEUES = ( + Queue('worker', routing_key='worker'), + ) + + +class ServerTasksRouter(object): + def route_for_task(self, task, args=None, kwargs=None): + if task.startswith('distributed.server.'): + return {'queue': 'server'} + + return None + + +CELERY_ROUTES = ( + ServerTasksRouter(), +) + + +# Broker Settings +BROKER_URL = "amqp://rabbitmq" +CELERY_ACCEPT_CONTENT = ['pickle', 'json'] + +# Task execution settings +CELERY_MESSAGE_COMPRESSION = 'bzip2' +CELERY_TASK_RESULT_EXPIRES = None +CELERY_DISABLE_RATE_LIMITS = True +CELERY_TRACK_STARTED = True + +# This ensures that the worker acks the task *after* it's completed. +# If the worker crashes or gets killed mid-execution, the task will be returned +# to the broker and restarted on another worker. +# References: +# * https://wiredcraft.com/blog/3-gotchas-for-celery/ +# * http://celery.readthedocs.org/en/latest/configuration.html#celery-acks-late +# * http://celery.readthedocs.org/en/latest/faq.html#faq-acks-late-vs-retry +CELERY_ACKS_LATE = True + + +# Worker settings +if AM_I_SERVER: + CELERY_IMPORTS = ['distributed.server'] +else: + CELERY_IMPORTS = ['distributed.worker'] + +# HACK: Prevents weird SymPy related memory leaks +CELERYD_MAX_TASKS_PER_CHILD = 100 diff --git a/distributed/common.py b/distributed/common.py new file mode 100644 index 0000000..4ec076d --- /dev/null +++ b/distributed/common.py @@ -0,0 +1,22 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# Copyright © 2019 +# +# Distributed under terms of the BSD 3-Clause license. + +import numpy as np + + +def iterate_both_theta(theta_resolution): + search_space = np.linspace(0, 2 * np.pi, theta_resolution) + for theta1 in search_space: + for theta2 in search_space: + yield theta1, theta2 + + +def iterate_theta(theta_resolution): + search_space = np.linspace(0, 2 * np.pi, theta_resolution) + for theta in search_space: + yield theta diff --git a/distributed/server.py b/distributed/server.py new file mode 100644 index 0000000..fb2db3e --- /dev/null +++ b/distributed/server.py @@ -0,0 +1,59 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# Copyright © 2019 +# +# Distributed under terms of the BSD 3-Clause license. + +import csv + +from celery import chord + +from .app import app +from .worker import compute_single, compute_chunk +from .common import iterate_theta, iterate_both_theta + + +@app.task +def distributed_single(L1, L2, m1, m2, tmax, dt, theta_resolution): + return chord( + ( + compute_single.s(L1, L2, m1, m2, tmax, dt, theta1_init, theta2_init) + for theta1_init, theta2_init in iterate_both_theta(theta_resolution) + ), + write_csv.s(), + ).delay() + + +@app.task +def distributed_chunk(L1, L2, m1, m2, tmax, dt, theta_resolution): + return chord( + ( + compute_chunk.s(L1, L2, m1, m2, tmax, dt, theta1_init, theta_resolution) + for theta1_init in iterate_theta(theta_resolution) + ), + write_csv.s(), + ).delay() + + +def result_to_dict(theta1_init, result): + return { + 'theta1_init': theta1_init, + 'theta2_init': result[0], + 'theta1': result[1], + 'theta2': result[2], + } + + +@app.task +def write_csv(results): + with open(app.conf.PENDULUM_OUTPUT_FILE, 'w', newline='') as csvfile: + fieldnames = ['theta1_init', 'theta2_init', 'theta1', 'theta2'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter=',') + writer.writeheader() + for chunk in sorted(results): + for result in chunk[1]: + writer.writerow(result_to_dict(chunk[0], result)) + + return len(results) diff --git a/distributed/worker.py b/distributed/worker.py new file mode 100644 index 0000000..1c2588c --- /dev/null +++ b/distributed/worker.py @@ -0,0 +1,57 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# Copyright © 2019 +# +# Distributed under terms of the BSD 3-Clause license. + +import numpy as np +from scipy.integrate import odeint + +from .app import app +from .common import iterate_theta + + +g = 9.81 + + +def deriv(y, t, L1, L2, m1, m2): + theta1, z1, theta2, z2 = y + c, s = np.cos(theta1 - theta2), np.sin(theta1 - theta2) + theta1dot = z1 + z1dot = (m2 * g * np.sin(theta2) * c - m2 * s * (L1 * z1**2 * c + L2 * z2**2) - (m1 + m2) * g * np.sin(theta1)) / L1 / (m1 + m2 * s**2) + theta2dot = z2 + z2dot = ((m1 + m2) * (L1 * z1**2 * s - g * np.sin(theta2) + g * np.sin(theta1) * c) + m2 * L2 * z2**2 * s * c) / L2 / (m1 + m2 * s**2) + + return theta1dot, z1dot, theta2dot, z2dot + + +def solve(L1, L2, m1, m2, tmax, dt, y0): + t = np.arange(0, tmax + dt, dt) + y = odeint(deriv, y0, t, args=(L1, L2, m1, m2)) + + return y[:, 0], y[:, 2] + + +def compute(L1, L2, m1, m2, tmax, dt, theta1_init, theta2_init): + y0 = np.array([theta1_init, 0, theta2_init, 0]) + theta1, theta2 = solve(L1, L2, m1, m2, tmax, dt, y0) + + return theta2_init, theta1[-1], theta2[-1] + + +@app.task +def compute_single(L1, L2, m1, m2, tmax, dt, theta1_init, theta2_init): + return compute(L1, L2, m1, m2, tmax, dt, theta1_init, theta2_init) + + +@app.task +def compute_chunk(L1, L2, m1, m2, tmax, dt, theta1_init, theta_resolution): + return ( + theta1_init, + [ + compute(L1, L2, m1, m2, tmax, dt, theta1_init, theta2_init) + for theta2_init in iterate_theta(theta_resolution) + ] + ) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..42cce3f --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,35 @@ +version: '3' +services: + worker: + build: . + environment: + MAX_CPU_CORES: 8 + COMPUTER_TYPE: worker + depends_on: + - redis + - rabbitmq + + server: + build: . + environment: + MAX_CPU_CORES: 1 + COMPUTER_TYPE: server + PENDULUM_L1: 1 + PENDULUM_L2: 1 + PENDULUM_M1: 1 + PENDULUM_M2: 1 + PENDULUM_TMAX: 30 + PENDULUM_DT: 0.01 + PENDULUM_THETA_RESOLUTION: 6 + depends_on: + - redis + - rabbitmq + volumes: + - ./output:/output + + redis: + image: redis:alpine + + rabbitmq: + image: rabbitmq:alpine + diff --git a/seq.py b/seq.py index cadb483..4ec8786 100644 --- a/seq.py +++ b/seq.py @@ -116,4 +116,3 @@ def main(): if __name__ == '__main__': main() -