Autumn Coffee

facebook github

balancer

Use any of your choice!

What you want to have here is something that will terminate SSL, handle slow clients, do basic request routing, etc.

Consider configuring your balancer in such a way that meaningful requests are routed to routerd.

Example balancers are:

routerd

This is the heart of runtime infrastructure.

Just look at README.md:

routerd

Microservice organizer.

Building

$ nix-build

Running

$ ./result/bin/routerd /path/to/config.json

Configuring

Simplest configuration file could look like that:

{
    "bind4": "127.0.0.1",
    "port": 1490,
    "hosts": {
        "output": [
            "127.0.0.1:14999"
        ]
    },
    "graphs": {
        "main": {
            "services": [
                "output"
            ]
        }
    },
    "routes": [
        {"r": "^/", "g": "main"}
    ]
}

bind4 and port will tell routerd to listen for incoming requests on 127.0.0.1:1490. There is also bind6 which will tell routerd to listen on specified IPv6 address.

hosts contains the list of the addresses of microservices. In this example, service output is accessible via 127.0.0.1:14999. If it had more than one instance ready to serve requests - address of second instance of output should've been added to the list.

graphs contains the list of microservice chains required to process the request. In this example, graph main lists only one service (output) to which the original request should be forwarded and which will generate the response that will be forwarded to the client. It is important to note that output is a special service name: routerd will only forward the response of service called output to the client, and won't do that with any other service.

routes contains the mapping between URI path and graph name that should be used for that path. In this example, graph main should be used for all pathes starting with /, effectively making graph main the default graph for all requests.

Services inside the graph also can depend on each other. Consider this:

{
    "bind4": "127.0.0.1",
    "port": 1490,
    "hosts": {
        "t1": [
            "127.0.0.1:10001"
        ],
        "t2": [
            "127.0.0.1:10002"
        ],
        "t3": [
            "127.0.0.1:10003"
        ],
        "t4": [
            "127.0.0.1:10004"
        ],
        "output": [
            "127.0.0.1:10005"
        ]
    },
    "graphs": {
        "main": {
            "services": [
                "t1",
                "t2",
                "t3",
                "t4",
                "output"
            ],
            "deps": [
                {"a": "output", "b": "t1"},
                {"a": "t4", "b": "output"},
                {"a": "t4", "b": "t2"},
                {"a": "t2", "b": "t1"}
            ]
        }
    },
    "routes": [
        {"r": "^/", "g": "main"}
    ]
}

In this example, graph main not only lists used services, but also provides dependency constraints for its services. a and b in dependency specification come from the phrase service A depends on service B. So:

  1. service output depends on service t1;
  2. service t2 depends on service t1 too;
  3. service t4 depends on services output and t2;
  4. services t1 and t3 do not depend on any other service.

The word "depends" also could be read as "will be called after". So:

  1. t1 and t3 will receive original client's request in parallel immediately after the request has been received by routerd;
  2. after t1 has responded to routerd, output and t2 will receive original request + the response of t1, all in single HTTP request;
  3. after service output has responded to routerd, its response will immediately be forwarded to the client;
  4. after both output and t2 have responded to routerd, t4 will receive the original request + the responses of output and t2 , all in single HTTP request;
  5. the response of t3 will be ignored because no other service depends on it.

Using

routerd is meant to forward HTTP requests and responses between the client and microservices.

Each request that routerd receives is transformed into multipart/form-data POST, looking exactly like file upload request, with the contents of the file named exactly the same as X-AC-RouterD header's value containing the body of original request.

All headers of the original request are preserved except the following:

  1. Content-Length: it is now called X-AC-RouterD-Content-Length and contains the value of original Content-Length header; it also is present in the body part containing original request body with its original name;
  2. Content-Type: it is now called X-AC-RouterD-CType and contains the value of original Content-Type header.

Since the request seen by microservices is now always HTTP POST, original request method could be found in X-AC-RouterD-Method header. Request method stored here is lowercased.

Note: if the original request is multipart/form-data POST itself - no exceptions are made and it becomes wrapped in another multipart/form-data POST like if it was any other request. So it is multipart/form-data inside multipart/form-data.

back

Generic backend.

You can find a link to C++ backend boilerplate built with Autumn Coffee library below.

As we use separate frontend service, our backend's only purpose is to communicate with database servers, perform dynamic calculation and respond in any machine-readable format such as JSON or protobuf. Essentially backend focuses on providing API endpoints and does not touch data visualization at all.

What is really important here is to understand that no outgoing requests from backend and/or server-side frontend services should be made: consider using routerd for that, configuring your graph accordingly.

As backend communicates to server-side frontend via TCP stack now, please consider backend response sizes. If there is a data that appears to be static - it is best to dump such data to persistent files, and use those directly in both server-side frontend and backend services.

As each request is now a multipart/form-data POST (and original multipart/form-data POSTs are wrapped around by another multipart/form-data body) and each request part may contain arbitrary headers that you may want to use (for example, for propagating new cookies from backend service to the user through server-side frontend service), you may need some additional tools for that.

Example of parsing multipart/form-data POST using Autumn Coffee bodyparser follows.

You can find library here.
from ac_bodyparser import MultipartBody

boundary = ... # Multipart boundary
raw_body = ... # Raw body
part_name = 'some_part' # Request part name

parser = MultipartBody(boundary, raw_body)

part = parser.chunk(f'"{part_name}"')
part_content = part.content() # Returns `bytes`
part_header_value = part.get('X-Custom-Header') # Returns `str`

parser.delete() # Free memory
You can find library here.
const ACBodyParser = require('ac-bodyparser');

const boundary = ...; // Multipart boundary
const rawBody = ...; // Raw body
const partName = 'some_part'; // Request part name

const parser = new ACBodyParser(boundary, rawBody);

const part = parser.chunk(`"${partName}"`);
const partContent = part.content(); // Returns `Buffer`
const partHeaderValue = part.get('X-Custom-Header'); // Returns `String`

parser.delete(); // Free memory

front

Generic server-side frontend.

You can find a link to React + Material-UI + Next.js + Express boilerplate below.

As we use stand-alone server-side frontend service and almost all required data is already provided to frontend via various infrastructure means, our frontend should focus on fast and efficient data visualization.

What is really important here is to understand that no outgoing requests from backend and/or server-side frontend services should be made: consider using routerd for that, configuring your graph accordingly.

As backend communicates to server-side frontend via TCP stack now, please consider backend response sizes. If there is a data that appears to be static - it is best to dump such data to persistent files, and use those directly in both server-side frontend and backend services.

As each request is now a multipart/form-data POST (and original multipart/form-data POSTs are wrapped around by another multipart/form-data body) and each request part may contain arbitrary headers that you may want to use (for example, for propagating new cookies from backend service to the user through server-side frontend service), you may need some additional tools for that.

If you are using Express or compatible framework - you don't have to use low-level bodyparser yourself. Instead, you can use a wrapper (already conveniently used in provided boilerplate):

const Express = require('express');
const {ACExpressRouterDRequest} = require('ac-express-routerd');

const expressApp = Express();

expressApp.use('*', function(req, res, next) {
  ACExpressRouterDRequest(req).then(next).catch(function() {
    console.log('Failed to parse request for ' + req.url);
    res.sendStatus(500);
  });
});

That wrapper will imbue request object with a few additional methods and properties:

const body = req.body; // Original request's body as a `Buffer`
const parsedBody = req.json(); // Essentially `JSON.parse(req.body)`

const partName = 'some_part'; // Request part name
const part = req.getServiceResponse(partName);

const partContent = part.content(); // Returns `Buffer`
const partHeaderValue = part.get('X-Custom-Header'); // Returns `String`

req.delete(); // Free memory

Example of parsing multipart/form-data POST using Autumn Coffee bodyparser follows.

You can find library here.
from ac_bodyparser import MultipartBody

boundary = ... # Multipart boundary
raw_body = ... # Raw body
part_name = 'some_part' # Request part name

parser = MultipartBody(boundary, raw_body)

part = parser.chunk(f'"{part_name}"')
part_content = part.content() # Returns `bytes`
part_header_value = part.get('X-Custom-Header') # Returns `str`

parser.delete() # Free memory
You can find library here.
const ACBodyParser = require('ac-bodyparser');

const boundary = ...; // Multipart boundary
const rawBody = ...; // Raw body
const partName = 'some_part'; // Request part name

const parser = new ACBodyParser(boundary, rawBody);

const part = parser.chunk(`"${partName}"`);
const partContent = part.content(); // Returns `Buffer`
const partHeaderValue = part.get('X-Custom-Header'); // Returns `String`

parser.delete(); // Free memory

data

Persistent data files.

See also: how to generate data files.

Following structures are supported:
  • HashMap - O(1) key-value storage, static bucket count, limited support for updates
  • Heap - O(log(N)) reads, supports prefix search, fixed-size records, essentially - sorted array (sorts automatically), does not support updates
  • RBTree - red-black tree, O(log(N)) key-value storage, supports prefix search

Examples of how to access data files:

Pay attention:
  • in Python and JavaScript, depending on desired key type you may have to specify key type explicitly and carefully work with types in your code
  • bucket count is fixed

Supported key types:

Unsigned 64-bit integer (default key type)

from ac_diskstructs import HashMap

hm = HashMap('/path/to/datafile')

value = hm[1] # Returns `bytes`

hm.delete() # Free memory
const {HashMap} = require('ac-diskstructs');

const hm = new HashMap('/path/to/datafile');

const value = hm.get(1); // Returns `Buffer`

hm.delete(); // Free memory
#include <ac-library/containers/persistent/immutable_hashmap/hashmap.hpp>

NAC::TPersistentImmutableHashMap hm(std::string("/path/to/datafile"), NAC::TPersistentImmutableHashMap::DefaultSeed);

NAC::TBlob value(hm[(uint64_t) 1]);

Variable-length byte array

from ac_diskstructs import HashMap

hm = HashMap('/path/to/datafile', key_type=bytes)

value = hm['one'] # Returns `bytes`

hm.delete() # Free memory
const {HashMap} = require('ac-diskstructs');

const hm = new HashMap('/path/to/datafile', {keyType: Buffer});

const value = hm.get('one'); // Returns `Buffer`

hm.delete(); // Free memory
#include <ac-library/containers/persistent/immutable_hashmap/hashmap.hpp>

NAC::TPersistentImmutableHashMap hm(std::string("/path/to/datafile"), NAC::TPersistentImmutableHashMap::DefaultSeed);

NAC::TBlob value(hm["one"]);
Pay attention:
  • records are not unique
  • record size is fixed
  • cannot be updated
from ac_diskstructs import Heap

heap = Heap('/path/to/datafile')

single_record_matching_prefix = heap["word"] # Returns `bytes`

all_records_matching_prefix = heap.get_all("wo") # Returns iterable over `bytes` objects

for record in all_records_matching_prefix:
    record # `bytes`

all_records_matching_prefix.delete() # Free memory

heap.delete() # Free memory
const {Heap} = require('ac-diskstructs');

const heap = new Heap('/path/to/datafile');

const singleRecordMatchingPrefix = heap.get("word"); // Returns `Buffer`

const allRecordsMatchingPrefix = heap.getAll("wo"); // Returns iterable over `Buffer` objects

for (let record of allRecordsMatchingPrefix) {
  record; // `Buffer`
}

allRecordsMatchingPrefix.delete(); // Free memory

heap.delete(); // Free memory
#include <ac-library/containers/persistent/binary_heap/binary_heap.hpp>

NAC::TPersistentBinaryHeap heap(std::string("/path/to/datafile"));

{
    NAC::TBlob prefix;
    prefix.Append(/* size = */4, "word");

    NAC::TBlob singleRecordMatchingPrefix(heap.Get(prefix));
}

{
    NAC::TBlob shortPrefix;
    shortPrefix.Append(/* size = */2, "wo");

    NAC::TPersistentBinaryHeap::TIterator allRecordsMatchingPrefix(heap.GetAll(shortPrefix));
    NAC::TBlob value;

    while (allRecordsMatchingPrefix.Next(value)) {
        value; // Contains current record
    }
}
from ac_diskstructs import RBTree

rbt = RBTree('/path/to/datafile')

value = rbt['one'] # Returns `bytes`

all_items_matching_prefix = rbt.get_all("wo") # Returns iterable over pairs of `bytes` objects

for (key, value) in all_items_matching_prefix:
    key # `bytes`
    value # `bytes`

all_items_matching_prefix.delete() # Free memory

rbt.delete() # Free memory
const {RBTree} = require('ac-diskstructs');

const rbt = new RBTree('/path/to/datafile');

const value = rbt.get('one'); // Returns `Buffer`

const allItemsMatchingPrefix = rbt.getAll("wo"); // Returns iterable over pairs of `Buffer` objects

for (let [key, value] of allItemsMatchingPrefix) {
  key; // `Buffer`
  value; // `Buffer`
}

allItemsMatchingPrefix.delete(); // Free memory

rbt.delete(); // Free memory
#include <ac-library/containers/rbtree/persistent.hpp>

NAC::TPersistentRBTree rbt(std::string("/path/to/datafile"));
rbt.FindRoot(); // This call is required for opening existing tree!

{
    NAC::TBlob value(rbt["one"]);
}

{
    NAC::TRBTreeBase::TIterator allItemsMatchingPrefix(rbt.GetAll("on"));
    NAC::TBlob key;
    NAC::TBlob value;

    while (allItemsMatchingPrefix.Next(key, value)) {
        key; // Contains current key
        value; // Contains current value
    }
}

generator

Generates persistent data files.

See also: how to access data.

Following structures are supported:
  • HashMap - O(1) key-value storage, static bucket count, limited support for updates
  • Heap - O(log(N)) reads, supports prefix search, fixed-size records, essentially - sorted array (sorts automatically), does not support updates
  • RBTree - red-black tree, O(log(N)) key-value storage, supports prefix search

Examples of how to generate data files:

Pay attention:
  • in Python and JavaScript, depending on desired key type you may have to specify key type explicitly and carefully work with types in your code
  • bucket count is fixed

Supported key types:

Unsigned 64-bit integer (default key type)

from ac_diskstructs import HashMap

bucket_count = 16
hm = HashMap('/path/to/datafile', bucket_count=bucket_count)

hm[1] = 'hello'
hm[2] = 'hi'
hm[3] = 'hey'
hm[4] = 'hola'

hm.close() # Finish write operations
hm.delete() # Free memory
const {HashMap} = require('ac-diskstructs');

const bucketCount = 16;
const hm = new HashMap('/path/to/datafile', {bucketCount});

hm.set(1, 'hello');
hm.set(2, 'hi');
hm.set(3, 'hey');
hm.set(4, 'hola');

hm.close(); // Finish write operations
hm.delete(); // Free memory
#include <ac-library/containers/persistent/immutable_hashmap/hashmap.hpp>

const uint64_t bucketCount(16);
NAC::TPersistentImmutableHashMap hm(std::string("/path/to/datafile"), bucketCount, NAC::TPersistentImmutableHashMap::DefaultSeed);

hm[(uint64_t) 1] = "hello";
hm[(uint64_t) 2] = "hi";
hm[(uint64_t) 3] = "hey";
hm[(uint64_t) 4] = "hola";

hm.Close(); // Finish write operations

Variable-length byte array

from ac_diskstructs import HashMap

bucket_count = 16
hm = HashMap('/path/to/datafile', bucket_count=bucket_count, key_type=bytes)

hm['one'] = 'hello'
hm['two'] = 'hi'
hm['three'] = 'hey'
hm['four'] = 'hola'

hm.close() # Finish write operations
hm.delete() # Free memory
const {HashMap} = require('ac-diskstructs');

const bucketCount = 16;
const hm = new HashMap('/path/to/datafile', {bucketCount, keyType: Buffer});

hm.set('one', 'hello');
hm.set('two', 'hi');
hm.set('three', 'hey');
hm.set('four', 'hola');

hm.close(); // Finish write operations
hm.delete(); // Free memory
#include <ac-library/containers/persistent/immutable_hashmap/hashmap.hpp>

const uint64_t bucketCount(16);
NAC::TPersistentImmutableHashMap hm(std::string("/path/to/datafile"), bucketCount, NAC::TPersistentImmutableHashMap::DefaultSeed);

hm["one"] = "hello";
hm["two"] = "hi";
hm["three"] = "hey";
hm["four"] = "hola";

hm.Close(); // Finish write operations
Pay attention:
  • records are not unique
  • record size is fixed
  • cannot be updated
from ac_diskstructs import Heap

record_size = 4 # Size in BYTES!
heap = Heap('/path/to/datafile', record_size=record_size)

heap.insert("word")
heap.insert("size")
heap.insert("four")

# heap.insert("hello") # Record is longer than record_size == data corruption
# heap.insert("hi") # Record is shorter than record_size == data corruption too

heap.close() # Finish write operations
heap.delete() # Free memory
const {Heap} = require('ac-diskstructs');

const recordSize = 4; // Size in BYTES!
const heap = new Heap('/path/to/datafile', {recordSize});

heap.insert("word");
heap.insert("size");
heap.insert("four");

// heap.insert("hello"); // Record is longer than recordSize == data corruption
// heap.insert("hi"); // Record is shorter than recordSize == data corruption too

heap.close(); // Finish write operations
heap.delete(); // Free memory
#include <ac-library/containers/persistent/binary_heap/binary_heap.hpp>

const uint64_t recordSize(4); // Size in BYTES!
NAC::TPersistentBinaryHeap heap(std::string("/path/to/datafile"), recordSize);

heap.Insert("word");
heap.Insert("size");
heap.Insert("four");

// heap.Insert("hello"); // Record is longer than recordSize == data corruption
// heap.Insert("hi"); // Record is shorter than recordSize == data corruption too

heap.Close(); // Finish write operations
from ac_diskstructs import RBTree

rbt = RBTree('/path/to/datafile', create=True) # use `rw=True` instead of `create=True` to update existing tree

rbt['one'] = 'hello'
rbt['two'] = 'hi'
rbt['three'] = 'hey'
rbt['four'] = 'hola'

rbt.delete() # Free memory
const {RBTree} = require('ac-diskstructs');

const rbt = new RBTree('/path/to/datafile', {create: true}); // use `rw: true` instead of `create: true` to update existing tree

rbt.insert('one', 'hello');
rbt.insert('two', 'hi');
rbt.insert('three', 'hey');
rbt.insert('four', 'hola');

rbt.delete(); // Free memory
#include <ac-library/containers/rbtree/persistent.hpp>

NAC::TPersistentRBTree rbt(std::string("/path/to/datafile"), NAC::TFile::ACCESS_CREATE);
// Or, to update existing tree:
// NAC::TPersistentRBTree rbt(std::string("/path/to/datafile"), NAC::TFile::ACCESS_RDWR);
// rbt.FindRoot(); // This call is required for opening existing tree!
                   // Absence of `FindRoot()` call before updating existing tree
                   // will lead to data corruption.

rbt["one"] = "hello";
rbt["two"] = "hi";
rbt["three"] = "hey";
rbt["four"] = "hola";

Makefile

Check out how everything builds and runs.

default:
	@echo 'Usage:'
	@echo '    make build - builds all'
	@echo '    make start - starts all'
	@echo '    make test - to run tests'
	@echo '    make stop - stops all'
	@echo ''
	@echo 'Further info - read Makefile yourself'
	@echo ''


MAKEFILE_DIR := $(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))))

RUN_DIR := $(MAKEFILE_DIR)/run
ROUTERD_DIR := $(MAKEFILE_DIR)/routerd
BACK_DIR := $(MAKEFILE_DIR)/back
TOOLS_DIR := $(MAKEFILE_DIR)/tools
TESTS_DIR := $(MAKEFILE_DIR)/tests
FRONT_DIR := $(MAKEFILE_DIR)/front
DATA_DIR := $(MAKEFILE_DIR)/data

ROUTERD_PID := $(RUN_DIR)/routerd
BACK_PID := $(RUN_DIR)/back
FRONT_PID := $(RUN_DIR)/front


install_virtualenv:
	python3 -m pip install virtualenv ||:

define setup_virtualenv
	cd $(1) && python3 -m virtualenv venv
	cd $(1) && bash -c 'source ./venv/bin/activate && exec pip install -r ./requirements.txt'
endef

build_routerd:
	cd $(ROUTERD_DIR) && nix-build

build_back:
	cd $(BACK_DIR) && nix-build

build_tools: install_virtualenv
	$(call setup_virtualenv,$(TOOLS_DIR))

build_tests: install_virtualenv
	$(call setup_virtualenv,$(TESTS_DIR))
	$(TESTS_DIR)/install.sh

build_front:
	cd $(FRONT_DIR) && npm install
	cd $(FRONT_DIR) && npm run build-runner
	cd $(FRONT_DIR) && npm run build

build: build_routerd build_back build_tools build_tests build_front

create_run_dir:
	mkdir -p $(RUN_DIR)

create_data_dir:
	mkdir -p $(DATA_DIR)

start_routerd: create_run_dir
	nohup bash -c 'echo $$$$ > $(ROUTERD_PID) && exec $(ROUTERD_DIR)/result/bin/routerd $(MAKEFILE_DIR)/config/routerd.json' &

start_back: create_run_dir
	cd $(BACK_DIR) && nohup bash -c 'echo $$$$ > $(BACK_PID) && ROOT=$(MAKEFILE_DIR) BIND_V4=127.0.0.1 BIND_PORT=1495 exec ./result/bin/back' &

generate_data: create_data_dir
	cd $(TOOLS_DIR) && bash -c 'source ./venv/bin/activate && ROOT=$(MAKEFILE_DIR) exec ./generate_data.py'

start_front: generate_data create_run_dir
	cd $(FRONT_DIR) && nohup bash -c 'echo $$$$ > $(FRONT_PID) && ROOT=$(MAKEFILE_DIR) PORT=3000 NUM_WORKERS=4 exec npm run start' &

start: start_routerd start_back start_front
	@sleep 1
	@echo ''
	@echo ''
	@echo '*******************************'
	@echo 'Now go to http://127.0.0.1:1490'
	@echo '*******************************'
	@echo ''
	@echo ''

test:
	cd $(TESTS_DIR) && bash -c 'source ./venv/bin/activate && exec ./main.py'

stop_routerd:
	kill `cat $(ROUTERD_PID)` ||:

stop_back:
	kill `cat $(BACK_PID)` ||:

stop_front:
	kill `cat $(FRONT_PID)` ||:

stop: stop_routerd stop_back stop_front

tail:
	exec tail -f nohup.out back/nohup.out front/nohup.out

© 2024 Gennadiy Filatov