Add distributed version of the code

This commit is contained in:
Pavle Portic 2019-12-26 20:55:41 +01:00
parent 3a971e5eba
commit 0999564601
Signed by: TheEdgeOfRage
GPG Key ID: 6758ACE46AA2A849
11 changed files with 488 additions and 3 deletions

12
.dockerignore Normal file
View File

@ -0,0 +1,12 @@
.git
.gitignore
docker-compose.yml
Dockerfile
.dockerignore
.editorconfig
LICENSE
output
*.csv

15
Dockerfile Normal file
View File

@ -0,0 +1,15 @@
FROM python:3.8-slim
WORKDIR /code
COPY Pipfile Pipfile.lock ./
RUN set -x \
&& pip install pipenv \
&& pipenv install --system --deploy
CMD ["celery", "worker", "--app", "distributed.app", "-E", "-Ofair", "--loglevel=WARN"]
ENV PYTHONPATH=/code \
PYTHONUNBUFFERED="1" \
C_FORCE_ROOT="1"
COPY . ./

View File

@ -6,8 +6,8 @@ verify_ssl = true
[dev-packages]
[packages]
matplotlib = "*"
scipy = "*"
scipy = "==1.4.1"
celery = {extras = ["librabbitmq", "redis"],version = "*"}
[requires]
python_version = "3"

146
Pipfile.lock generated Normal file
View File

@ -0,0 +1,146 @@
{
"_meta": {
"hash": {
"sha256": "e6f3c835443673b396678699cc0401278a3c7827a9139dbdc05eff1fd769ecc6"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {
"amqp": {
"hashes": [
"sha256:6e649ca13a7df3faacdc8bbb280aa9a6602d22fd9d545336077e573a1f4ff3b8",
"sha256:77f1aef9410698d20eaeac5b73a87817365f457a507d82edf292e12cbb83b08d"
],
"version": "==2.5.2"
},
"billiard": {
"hashes": [
"sha256:01afcb4e7c4fd6480940cfbd4d9edc19d7a7509d6ada533984d0d0f49901ec82",
"sha256:b8809c74f648dfe69b973c8e660bcec00603758c9db8ba89d7719f88d5f01f26"
],
"version": "==3.6.1.0"
},
"celery": {
"extras": [
"librabbitmq",
"redis"
],
"hashes": [
"sha256:7c544f37a84a5eadc44cab1aa8c9580dff94636bb81978cdf9bf8012d9ea7d8f",
"sha256:d3363bb5df72d74420986a435449f3c3979285941dff57d5d97ecba352a0e3e2"
],
"index": "pypi",
"version": "==4.4.0"
},
"kombu": {
"hashes": [
"sha256:2a9e7adff14d046c9996752b2c48b6d9185d0b992106d5160e1a179907a5d4ac",
"sha256:67b32ccb6fea030f8799f8fd50dd08e03a4b99464ebc4952d71d8747b1a52ad1"
],
"version": "==4.6.7"
},
"librabbitmq": {
"hashes": [
"sha256:3116e40c02d4285b8dd69834e4cbcb1a89ea534ca9147e865f11d44e7cc56eea",
"sha256:5cdfb473573396d43d54cef9e9b4c74fa3d1516da51d04a7b261f6ef4e0bd8be",
"sha256:98e355f486964dadae7e8b51c9a60e9aa0653bbe27f6b14542687f305c4c3652",
"sha256:c2a8113d3c831808d1d940fdf43e4882636a1efe2864df7ab3bb709a45016b37",
"sha256:cd9cc09343b193d7cf2cff6c6a578061863bd986a4bdf38f922e9dc32e15d944",
"sha256:ffa2363a860ab5dcc3ce3703247e05e940c73d776c03a3f3f9deaf3cf43bb96c"
],
"version": "==2.0.0"
},
"numpy": {
"hashes": [
"sha256:0a7a1dd123aecc9f0076934288ceed7fd9a81ba3919f11a855a7887cbe82a02f",
"sha256:0c0763787133dfeec19904c22c7e358b231c87ba3206b211652f8cbe1241deb6",
"sha256:3d52298d0be333583739f1aec9026f3b09fdfe3ddf7c7028cb16d9d2af1cca7e",
"sha256:43bb4b70585f1c2d153e45323a886839f98af8bfa810f7014b20be714c37c447",
"sha256:475963c5b9e116c38ad7347e154e5651d05a2286d86455671f5b1eebba5feb76",
"sha256:64874913367f18eb3013b16123c9fed113962e75d809fca5b78ebfbb73ed93ba",
"sha256:683828e50c339fc9e68720396f2de14253992c495fdddef77a1e17de55f1decc",
"sha256:6ca4000c4a6f95a78c33c7dadbb9495c10880be9c89316aa536eac359ab820ae",
"sha256:75fd817b7061f6378e4659dd792c84c0b60533e867f83e0d1e52d5d8e53df88c",
"sha256:7d81d784bdbed30137aca242ab307f3e65c8d93f4c7b7d8f322110b2e90177f9",
"sha256:8d0af8d3664f142414fd5b15cabfd3b6cc3ef242a3c7a7493257025be5a6955f",
"sha256:9679831005fb16c6df3dd35d17aa31dc0d4d7573d84f0b44cc481490a65c7725",
"sha256:a8f67ebfae9f575d85fa859b54d3bdecaeece74e3274b0b5c5f804d7ca789fe1",
"sha256:acbf5c52db4adb366c064d0b7c7899e3e778d89db585feadd23b06b587d64761",
"sha256:ada4805ed51f5bcaa3a06d3dd94939351869c095e30a2b54264f5a5004b52170",
"sha256:c7354e8f0eca5c110b7e978034cd86ed98a7a5ffcf69ca97535445a595e07b8e",
"sha256:e2e9d8c87120ba2c591f60e32736b82b67f72c37ba88a4c23c81b5b8fa49c018",
"sha256:e467c57121fe1b78a8f68dd9255fbb3bb3f4f7547c6b9e109f31d14569f490c3",
"sha256:ede47b98de79565fcd7f2decb475e2dcc85ee4097743e551fe26cfc7eb3ff143",
"sha256:f58913e9227400f1395c7b800503ebfdb0772f1c33ff8cb4d6451c06cabdf316",
"sha256:fe39f5fd4103ec4ca3cb8600b19216cd1ff316b4990f4c0b6057ad982c0a34d5"
],
"version": "==1.17.4"
},
"pytz": {
"hashes": [
"sha256:1c557d7d0e871de1f5ccd5833f60fb2550652da6be2693c1e02300743d21500d",
"sha256:b02c06db6cf09c12dd25137e563b31700d3b80fcc4ad23abb7a315f2789819be"
],
"version": "==2019.3"
},
"redis": {
"hashes": [
"sha256:3613daad9ce5951e426f460deddd5caf469e08a3af633e9578fc77d362becf62",
"sha256:8d0fc278d3f5e1249967cba2eb4a5632d19e45ce5c09442b8422d15ee2c22cc2"
],
"version": "==3.3.11"
},
"scipy": {
"hashes": [
"sha256:00af72998a46c25bdb5824d2b729e7dabec0c765f9deb0b504f928591f5ff9d4",
"sha256:0902a620a381f101e184a958459b36d3ee50f5effd186db76e131cbefcbb96f7",
"sha256:1e3190466d669d658233e8a583b854f6386dd62d655539b77b3fa25bfb2abb70",
"sha256:2cce3f9847a1a51019e8c5b47620da93950e58ebc611f13e0d11f4980ca5fecb",
"sha256:3092857f36b690a321a662fe5496cb816a7f4eecd875e1d36793d92d3f884073",
"sha256:386086e2972ed2db17cebf88610aab7d7f6e2c0ca30042dc9a89cf18dcc363fa",
"sha256:71eb180f22c49066f25d6df16f8709f215723317cc951d99e54dc88020ea57be",
"sha256:770254a280d741dd3436919d47e35712fb081a6ff8bafc0f319382b954b77802",
"sha256:787cc50cab3020a865640aba3485e9fbd161d4d3b0d03a967df1a2881320512d",
"sha256:8a07760d5c7f3a92e440ad3aedcc98891e915ce857664282ae3c0220f3301eb6",
"sha256:8d3bc3993b8e4be7eade6dcc6fd59a412d96d3a33fa42b0fa45dc9e24495ede9",
"sha256:9508a7c628a165c2c835f2497837bf6ac80eb25291055f56c129df3c943cbaf8",
"sha256:a144811318853a23d32a07bc7fd5561ff0cac5da643d96ed94a4ffe967d89672",
"sha256:a1aae70d52d0b074d8121333bc807a485f9f1e6a69742010b33780df2e60cfe0",
"sha256:a2d6df9eb074af7f08866598e4ef068a2b310d98f87dc23bd1b90ec7bdcec802",
"sha256:bb517872058a1f087c4528e7429b4a44533a902644987e7b2fe35ecc223bc408",
"sha256:c5cac0c0387272ee0e789e94a570ac51deb01c796b37fb2aad1fb13f85e2f97d",
"sha256:cc971a82ea1170e677443108703a2ec9ff0f70752258d0e9f5433d00dda01f59",
"sha256:dba8306f6da99e37ea08c08fef6e274b5bf8567bb094d1dbe86a20e532aca088",
"sha256:dc60bb302f48acf6da8ca4444cfa17d52c63c5415302a9ee77b3b21618090521",
"sha256:dee1bbf3a6c8f73b6b218cb28eed8dd13347ea2f87d572ce19b289d6fd3fbc59"
],
"index": "pypi",
"version": "==1.4.1"
},
"six": {
"hashes": [
"sha256:1f1b7d42e254082a9db6279deae68afb421ceba6158efa6131de7b3003ee93fd",
"sha256:30f610279e8b2578cab6db20741130331735c781b56053c59c4076da27f06b66"
],
"version": "==1.13.0"
},
"vine": {
"hashes": [
"sha256:133ee6d7a9016f177ddeaf191c1f58421a1dcc6ee9a42c58b34bed40e1d2cd87",
"sha256:ea4947cc56d1fd6f2095c8d543ee25dad966f78692528e68b4fada11ba3f98af"
],
"version": "==1.3.0"
}
},
"develop": {}
}

37
distributed/app.py Normal file
View File

@ -0,0 +1,37 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2019 <pavle.portic@tilda.center>
#
# Distributed under terms of the BSD 3-Clause license.
from celery import Celery
from celery.signals import worker_ready
app = Celery('distributed')
app.config_from_object('distributed.celeryconfig')
if app.conf.AM_I_SERVER:
@worker_ready.connect
def bootstrap(**kwargs):
from .server import distributed_chunk
delay_time = 2
print(f'Getting ready to automatically seed computations in {delay_time} seconds...')
kwargs = {
'L1': app.conf.PENDULUM_L1,
'L2': app.conf.PENDULUM_L2,
'm1': app.conf.PENDULUM_M1,
'm2': app.conf.PENDULUM_M2,
'tmax': app.conf.PENDULUM_TMAX,
'dt': app.conf.PENDULUM_DT,
'theta_resolution': app.conf.PENDULUM_THETA_RESOLUTION,
}
distributed_chunk.apply_async(kwargs=kwargs, countdown=delay_time)
if __name__ == '__main__':
app.start()

103
distributed/celeryconfig.py Normal file
View File

@ -0,0 +1,103 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2019 <pavle.portic@tilda.center>
#
# Distributed under terms of the BSD 3-Clause license.
from multiprocessing import cpu_count
import os
from kombu import Queue
# Environment based settings
MAX_CPU_CORES = os.getenv('MAX_CPU_CORES', cpu_count())
AM_I_SERVER = (os.getenv('COMPUTER_TYPE') == 'server')
# Parameters
PENDULUM_L1 = int(os.getenv('PENDULUM_L1', 1))
PENDULUM_L2 = int(os.getenv('PENDULUM_L2', 1))
PENDULUM_M1 = int(os.getenv('PENDULUM_M1', 1))
PENDULUM_M2 = int(os.getenv('PENDULUM_M2', 1))
PENDULUM_TMAX = int(os.getenv('PENDULUM_TMAX', 30))
PENDULUM_DT = float(os.getenv('PENDULUM_DT', 0.01))
PENDULUM_THETA_RESOLUTION = int(os.getenv('PENDULUM_THETA_RESOLUTION', 6))
PENDULUM_OUTPUT_FILE = os.getenv('PENDULUM_OUTPUT_FILE', '/output/dis.csv')
# Concurrency settings
CELERYD_CONCURRENCY = MAX_CPU_CORES
# This ensures that each worker will only take one task at a time, when combined
# with late acks. This is the recommended configuration for long-running tasks.
# References:
# * http://celery.readthedocs.org/en/latest/userguide/optimizing.html#prefetch-limits
# * http://celery.readthedocs.org/en/latest/userguide/optimizing.html#reserve-one-task-at-a-time
# * http://celery.readthedocs.org/en/latest/configuration.html#celeryd-prefetch-multiplier
# * http://stackoverflow.com/questions/16040039/understanding-celery-task-prefetching
# * https://bugs.launchpad.net/openquake-old/+bug/1092050
# * https://wiredcraft.com/blog/3-gotchas-for-celery/
# * http://www.lshift.net/blog/2015/04/30/making-celery-play-nice-with-rabbitmq-and-bigwig/
CELERYD_PREFETCH_MULTIPLIER = 1
# Task result backend settings
CELERY_RESULT_BACKEND = 'redis://redis'
# Message Routing
CELERY_DEFAULT_QUEUE = 'worker'
CELERY_DEFAULT_EXCHANGE = 'tasks'
CELERY_DEFAULT_ROUTING_KEY = 'worker'
if AM_I_SERVER:
CELERY_QUEUES = (
Queue('server', routing_key='server'),
)
else:
CELERY_QUEUES = (
Queue('worker', routing_key='worker'),
)
class ServerTasksRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task.startswith('distributed.server.'):
return {'queue': 'server'}
return None
CELERY_ROUTES = (
ServerTasksRouter(),
)
# Broker Settings
BROKER_URL = "amqp://rabbitmq"
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
# Task execution settings
CELERY_MESSAGE_COMPRESSION = 'bzip2'
CELERY_TASK_RESULT_EXPIRES = None
CELERY_DISABLE_RATE_LIMITS = True
CELERY_TRACK_STARTED = True
# This ensures that the worker acks the task *after* it's completed.
# If the worker crashes or gets killed mid-execution, the task will be returned
# to the broker and restarted on another worker.
# References:
# * https://wiredcraft.com/blog/3-gotchas-for-celery/
# * http://celery.readthedocs.org/en/latest/configuration.html#celery-acks-late
# * http://celery.readthedocs.org/en/latest/faq.html#faq-acks-late-vs-retry
CELERY_ACKS_LATE = True
# Worker settings
if AM_I_SERVER:
CELERY_IMPORTS = ['distributed.server']
else:
CELERY_IMPORTS = ['distributed.worker']
# HACK: Prevents weird SymPy related memory leaks
CELERYD_MAX_TASKS_PER_CHILD = 100

22
distributed/common.py Normal file
View File

@ -0,0 +1,22 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2019 <pavle.portic@tilda.center>
#
# Distributed under terms of the BSD 3-Clause license.
import numpy as np
def iterate_both_theta(theta_resolution):
search_space = np.linspace(0, 2 * np.pi, theta_resolution)
for theta1 in search_space:
for theta2 in search_space:
yield theta1, theta2
def iterate_theta(theta_resolution):
search_space = np.linspace(0, 2 * np.pi, theta_resolution)
for theta in search_space:
yield theta

59
distributed/server.py Normal file
View File

@ -0,0 +1,59 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2019 <pavle.portic@tilda.center>
#
# Distributed under terms of the BSD 3-Clause license.
import csv
from celery import chord
from .app import app
from .worker import compute_single, compute_chunk
from .common import iterate_theta, iterate_both_theta
@app.task
def distributed_single(L1, L2, m1, m2, tmax, dt, theta_resolution):
return chord(
(
compute_single.s(L1, L2, m1, m2, tmax, dt, theta1_init, theta2_init)
for theta1_init, theta2_init in iterate_both_theta(theta_resolution)
),
write_csv.s(),
).delay()
@app.task
def distributed_chunk(L1, L2, m1, m2, tmax, dt, theta_resolution):
return chord(
(
compute_chunk.s(L1, L2, m1, m2, tmax, dt, theta1_init, theta_resolution)
for theta1_init in iterate_theta(theta_resolution)
),
write_csv.s(),
).delay()
def result_to_dict(theta1_init, result):
return {
'theta1_init': theta1_init,
'theta2_init': result[0],
'theta1': result[1],
'theta2': result[2],
}
@app.task
def write_csv(results):
with open(app.conf.PENDULUM_OUTPUT_FILE, 'w', newline='') as csvfile:
fieldnames = ['theta1_init', 'theta2_init', 'theta1', 'theta2']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter=',')
writer.writeheader()
for chunk in sorted(results):
for result in chunk[1]:
writer.writerow(result_to_dict(chunk[0], result))
return len(results)

57
distributed/worker.py Normal file
View File

@ -0,0 +1,57 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2019 <pavle.portic@tilda.center>
#
# Distributed under terms of the BSD 3-Clause license.
import numpy as np
from scipy.integrate import odeint
from .app import app
from .common import iterate_theta
g = 9.81
def deriv(y, t, L1, L2, m1, m2):
theta1, z1, theta2, z2 = y
c, s = np.cos(theta1 - theta2), np.sin(theta1 - theta2)
theta1dot = z1
z1dot = (m2 * g * np.sin(theta2) * c - m2 * s * (L1 * z1**2 * c + L2 * z2**2) - (m1 + m2) * g * np.sin(theta1)) / L1 / (m1 + m2 * s**2)
theta2dot = z2
z2dot = ((m1 + m2) * (L1 * z1**2 * s - g * np.sin(theta2) + g * np.sin(theta1) * c) + m2 * L2 * z2**2 * s * c) / L2 / (m1 + m2 * s**2)
return theta1dot, z1dot, theta2dot, z2dot
def solve(L1, L2, m1, m2, tmax, dt, y0):
t = np.arange(0, tmax + dt, dt)
y = odeint(deriv, y0, t, args=(L1, L2, m1, m2))
return y[:, 0], y[:, 2]
def compute(L1, L2, m1, m2, tmax, dt, theta1_init, theta2_init):
y0 = np.array([theta1_init, 0, theta2_init, 0])
theta1, theta2 = solve(L1, L2, m1, m2, tmax, dt, y0)
return theta2_init, theta1[-1], theta2[-1]
@app.task
def compute_single(L1, L2, m1, m2, tmax, dt, theta1_init, theta2_init):
return compute(L1, L2, m1, m2, tmax, dt, theta1_init, theta2_init)
@app.task
def compute_chunk(L1, L2, m1, m2, tmax, dt, theta1_init, theta_resolution):
return (
theta1_init,
[
compute(L1, L2, m1, m2, tmax, dt, theta1_init, theta2_init)
for theta2_init in iterate_theta(theta_resolution)
]
)

35
docker-compose.yml Normal file
View File

@ -0,0 +1,35 @@
version: '3'
services:
worker:
build: .
environment:
MAX_CPU_CORES: 8
COMPUTER_TYPE: worker
depends_on:
- redis
- rabbitmq
server:
build: .
environment:
MAX_CPU_CORES: 1
COMPUTER_TYPE: server
PENDULUM_L1: 1
PENDULUM_L2: 1
PENDULUM_M1: 1
PENDULUM_M2: 1
PENDULUM_TMAX: 30
PENDULUM_DT: 0.01
PENDULUM_THETA_RESOLUTION: 6
depends_on:
- redis
- rabbitmq
volumes:
- ./output:/output
redis:
image: redis:alpine
rabbitmq:
image: rabbitmq:alpine

1
seq.py
View File

@ -116,4 +116,3 @@ def main():
if __name__ == '__main__':
main()