From b194e06bb58c6a840fd080466bb62b14cf64e201 Mon Sep 17 00:00:00 2001 From: James Larisch Date: Fri, 16 Dec 2016 15:46:17 -0500 Subject: Bloom, first pass --- chapter/7/langs-consistency.md | 295 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 292 insertions(+), 3 deletions(-) diff --git a/chapter/7/langs-consistency.md b/chapter/7/langs-consistency.md index 78162bc..cd0a7e5 100644 --- a/chapter/7/langs-consistency.md +++ b/chapter/7/langs-consistency.md @@ -244,10 +244,299 @@ Why not push the consistency guarantees in between the IO-level and the applicat Wouldn't it be great if tools like this existed? ### Bloom -[ Introduce Bloom ] +Before talking about such tools, I'd like you to forget almost everything you know about programming for a second (unless of course you've never programmed in a Von Neumann-based language in which you sequentially update pieces of memory; which, by the way, you have). -#### Restriction & Danger -[Bloom restricts you, it's different, and it's dangerous] +Imagine the following scenario: you are "programming" a node in a cluster of computers. All of the other computers work as expected. When you receive a message (all messages will include an integer), your task is to save the message, increment the integer, and resend the message back to its originator. You must also send messages you've received from `stdin`. Unfortunately, the programming environment isn't like anything you've encountered before. +You have access to five buffers: +* Messages you have received in the last 5 seconds +* Inputs you've received from `stdin` in the last 5 seconds +* An outgoing messages buffer: flushed & sent every 5 seconds +* A bucket of saved messages: *never* flushed + +However, you only have access to these buffers *every 5 seconds*. If messages are formatted as such: `(SOURCE, INTEGER, T)`, your buffers might look like when `t = 0`. (`t` is the number of seconds elapsed) + +``` + +RECV-BUFFER: [(A, 1, 0), (B, 2, 0)] +RSTDIN-INPUTS: [(A, 5, 0), (C, 10, 0)] +SEND-BUFFER: [] +SAVED: [(D, -1, 0), (E, -100, 0)] +``` + +If you don't write any code to manipulate these buffers, when `t = 5`, your buffers might look like: + +``` + +RECV-BUFFER: [(C, 10, 5)] +STDIN-INPUTS: [(X, 1, 5)] +SEND-BUFFER: [] +SAVED: [(D, -1, 0), (E, -100, 0)] +``` + +You can see that from `t = 0` to `t = 5`, you received one message from `C` and someone typed a message to `X` via `stdin`. + +Remember our goals? +* Save received messages from the network +* Send out messages received from `stdin` +* For all received network messages, increment the integer and resend it back to the originator + +In Javascript, perhaps you code up something like this: + +```javascript +onFiveSecondInterval(function() { + recvBuffer.forEach(function(msg) { + savedBuffer.push(msg); // save message + let newMsg = msg.clone() + newMsg.integer++; // increment recv'd message + sendBuffer.push(newMsg); // send it out + }); + + stdinInputBuffer.forEach(function(msg) { + sendBuffer.push(msg); // send stdin message + }); +}); +``` + +or Ruby: + +```ruby +on_five_second_interval do + recv_buffer.each do |msg| + saved_buffer << msg + new_msg = msg.clone + new_msg.integer += 1 + send_buffer << new_msg + end + + stdin_input_buffer.each do |msg| + send_buffer << msg + end +end +``` + +We have expressed this model using an event-driven programming style: the main event is `t % 5 = 0`: when the buffers populate & flush. + +Notice we perform a few "copies". We read something from one buffer and place it into another one, perhaps after applying some modification. Perhaps we place a message from a given buffer into two buffers (`recv_buffer` to `saved_buffer` & `send_buffer`). + +This situation screams for a more functional approach: +```ruby +on_five_second_interval do + saved_buffer += recv_buffer # add everything in recv_buffer to saved_buffer + + send_buffer += recv_buffer.map do |msg| # map over the recv_buffer, increment integers, add to send_buffer + new_msg = msg.clone + new_msg.integer += 1 + new_msg # this block returns new_msg + end + + send_buffer += stdin_input_buffer # add stdin messages to the send buffer +end +``` + +After this block/callback is called, the system automatically flushes & routes messages as described above. + +Bloom, a research language developed at UC Berkeley, has a similar programming model to the one described above. Execution is broken up into a series of "timesteps". In the above example, one "timestemp" would be the execution of one `on_five_second_interval` function. Bloom, like the theoretical system above, automatically flushes and populates the buffers before and after each timestep. In the above example, 5 seconds was an arbitrary amount of time. In Bloom, timesteps (rounds of evaluation) are logical tools - they may happen every second, 10 seconds, etc. Logically, it shouldn't affect how your program executes. In reality, Bud's timesteps correspond to evaluation iterations. Your code is evaluated, executed, and the process repeats. + +So what does a Bloom program look like? Bloom's prototypal implementation is called Bud and is implemented in Ruby. There are two main parts to a Bloom program: +1. User defined buffers: rather than the four buffers I gave you above, Bloom users can define their own buffers. There are different types of buffers depending on the behavior you desire: + * `channel`: Above, `recv_buffer` and `send_buffer` would be considered channels. They facilitate sending network messages to and from other nodes. Like the messages above, messages sent into these channels contain a "location-specifier", which tells Bloom where the message should be sent. If you wanted to send a message to `A`, you could push the message `(@A, 10)` into your send buffer (in Ruby, `["@A", 10]`). The `@` denotes the location-specifier. + * `table`: Above, `saved_buffer` would be considered a table. The contents of tables persist across timesteps. +2. Code to be executed at each timestep. A Bloom (Bud) program can be seen as the inside of the block passed to `on_five_second_interval`. In fact, it looks very similar, as we'll see. + +For the purposes of this chapter, let's assume `stdin_input_buffer` is a special kind of channel in which are sent in via `stdin`. Let's also assume this channel exists in all Bloom programs. + +Let's take a look at an example Bud program. + +First, let's declare our state. + +```ruby +module Incrementer + def state + channel :network_channel ['@dst', 'src', 'integer'] + table :saved_buffer ['dst', 'src', 'integer'] + # implied channel :stdin_input_buffer ['@dst', 'src', 'integer'] + end +end +``` + +The first line of `state` means: declare a channel called `network_channel` in which messages are 3-tuples. The first field of the message is called `dst`, the second `src`, and the third is called `integer`. `@` is our location-specifier, so if a program wants to send a message to a node at a given identifier, they will place it in the first `dst` field. + +The second line means: declare a table (persists) called `saved_buffer` in which messages follow the same format as `network_channel`. There's no location specifier since this collection is not network-connected. + +You can think of the Ruby array after the channel name as the "schema" of that collection. + +Notice how we only have one network channel for both receiving and sending. Before, we had two buffers, one for sending and one for receiving. When we place items *into* `network_channel`, Bud will automatically send messages to the appropriate `@dst`. + +Next, let's write our code. This code will be executed at every timestamp. In fact, you can think of a Bud program as the code inside of a timestamp callback. Let's model the raw Ruby code we saw above. + +```ruby +module Incrementer + def state + channel :network_channel ['@dst', 'src', 'integer'] + table :saved_buffer ['dst', 'src', 'integer'] + # implied channel :stdin_input_buffer ['@dst', 'src', 'integer'] + end + + declare + def increment_messages + network_channel <~ network_channel.map { |x| [x.src, x.dst, x.integer + 1] } + end + + declare + def save_messages + saved_buffer <= network_channel + end + + declare + def send_messages + network_channel <~ stdin_input_buffer + end +end +``` + +Don't panic. Remember - the output of this program is identical to our Ruby callback program from earlier. Let's walk through it step by step. +```ruby +declare +def increment_messages + network_channel <~ network_channel.map { |x| [x.src, x.dst, x.integer] } +end +``` +Here, we take messages we've received from the network channel and send them back into the network channel. The `<~` operator says "copy all of the elements in the right-hand-side and eventually send them off onto the network in the channel on the left-hand-side". So, we map over the contents of network channel *in the current timestep*: switching the `src` and `dst` fields, and incrementing the integer. This mapped collection is passed back into the network channel. Bud will ensure those messages sent off at some point. + +``` +declare +def save_messages + saved_buffer <= network_channel +end +``` +In `save_messages`, we use the `<=` operator. `<=` says "copy all of the elements in the right-hand-side and add them to the table on the left-hand-side." It's important to note that this movement occurs *within the current timestep*. This means if `saved_buffer` is referenced elsewhere in the code, it will include the contents of `network_channel`. If we had used the `<+` operator instead, the contents of `network_channel` would show up in `saved_buffer` in the *next* timestep. The latter is useful if you'd like to operate on the current contents of `saved_buffer` in the current timestep but want to specify how `saved_buffer` should be updated for the next timestep. + +Remember, all of this code is executed in each timestep - the separation of code into separate methods is merely for readability. + +``` +declare +def send_messages + network_channel <~ stdin_input_buffer +end +``` + +`send_messages` operates very much like `increment_messages`, except it reads the contents of `stdin_input_buffer` and places them into the network channel to be sent off at an indeterminite time. + +#### Details + +Examine Bloom's "style". Compare it to your (probably) standard way of programming. Compare it to the Javascript & Ruby examples within this strange "timestep" model. Bloom has a more "declarative" style: what does this mean? Look at our Javascript: + +```javascript +onFiveSecondInterval(function() { + recvBuffer.forEach(function(msg) { + savedBuffer.push(msg); // save message + let newMsg = msg.clone() + newMsg.integer++; // increment recv'd message + sendBuffer.push(newMsg); // send it out + }); + + stdinInputBuffer.forEach(function(msg) { + sendBuffer.push(msg); // send stdin message + }); +}); +``` + +"Every five seconds, loop over the received messages. For each one, do this, then that, then that." We are telling the computer each step we'd like it to perform. In Bud, however, we describe the state of tables and channels at either the current or next timestep using operators and other tables and channels. We describe what we'd like our collections to include and look like, rather than what to do. You declare what you'd like the state of the world to be at the current instant and at following instants. + +#### Isn't this chapter about consistency? + +It's time to implement our shopping cart in Bloom. We are going to introduce one more collection: a `periodic`. For example, `periodic :timer 10` instantiates a new periodic collection. This collection is "not empty" every 10 seconds. Alone, it's not all that useful. However, when `join`'d with another table, it can be used to perform actions every `x` seconds. + +```ruby +module ShoppingCart + include MulticastProtocol + + def state + table :cart ['item'] + channel :recv_channel ['@src', 'dst', 'item'] + # implied channel :stdin_input_buffer ['item'] + periodic :timer 10 + end + + declare + def add_items + cart <= stdin_input_buffer + end + + declare + def send_items + send_mcast <= join([cart, timer]).map { |item, timer| item } + end + + declare + def receive_items + cart <+ recv_channel.map { |x| x.item } + end +end +``` + +`send_mcast` is a special type of channel we receive from the `MulticastProtocol` mixin. It sends all items in the right-hand-side to every known peer. +* `add_items`: receive items from stdin, add them to the cart +* `send_items`: join our cart with the 10-second timer. Since the timer only "appears" every 10 seconds, this `join` will produce a result every 10 seconds. When it does, send all cart items to all peers via `send_mcast`. +* `receive_items`: when we receive a message from a peer, add the item to our cart. + +Functionally, this code is equivalent to our working Javascript shopping cart implementation. A few important things to note: +* In our Javascript example, we broadcasted our entire cart to all peers. When a peer received a message, they unioned their current cart with the received one. Here, each node broadcasts each element in the cart. When a node receives an item, it adds it to the current cart. Since tables are represented as sets, repeated or unordered additions do not matter. You can think of `{A, B, C}.add(D)` as equivalent to `{A, B, C}.union({D})`. +* You cannot add items twice. Since tables are represented as sets and we simply add items to our set, an item can only ever exist once. This was true of our Javascript example as well. +* You still cannot remove items! + +Bloom has leveraged the montononic, add-only set and constructed a declarative programming model based around these sets. When you treat everything as sets (not unlike SQL) and you introduce the notion of "timestemps", you can express programs as descriptions of state rather than an order of operations. Besides being a rather unique model, Bloom presents an accessible and (perhaps...) safe model for programming eventually consistent programs. + +#### Sets only? +Bloom's programming model is built around the set. As Aviral discussed in the previous chapter, however, sets are not the only monotonic data structures. Other CRDTs are incredibly useful for programming eventually consistent distributed programs. + +Recall that a *bounded join semilattice* (CRDT) can be represented as a 3-tuple: `(S, U, ⊥)`. `S` is the set of all elements within the semilattice. `U` is the `least-upper bound` operation. `⊥` is the "least" element within the set. For example, for add-only sets, `S = the set of all sets`, `U = union` and `⊥ = {}`. Elements of these semilattices, when `U` is applied, can only "stay the same or get larger". Sets can only stay the same size or get larger - they can never rollback. For some element `e` in `S`, `e U ⊥` must equal `e`. +For a semilattice we'll call `integerMax`, `S = the set of all integers`, `U = max(x, y)`, and `⊥ = -Infinity`. + +These semilattices (and many more!) can be used to program other types of distributed, eventually consistent programs. Although sets are powerful, there might be more expressive ways to describe your program. It's not difficult to imagine using `integerMax` to keep a global counter across multiple machines. + +Unfortunately, Bloom does not provide support for other CRDTs. In fact, you cannot define your own datatypes at all. You are bound by the collections described. + +BloomL, an addendum to the Bloom language, provides support for these types of data structures. Specifically, BloomL does two things: +* Adds a number of built-in lattices such as `lmax` (`integerMax`), `lmin`, etc. +* Adds an "interface" for lattices: the user can define lattices that "implement" this interface. + +This interface, if in an OO language like Java, would look something like: + +```java +interface Lattice { + static Lattice leastElement(); + Lattice merge(Lattice a, Lattice b); +} +``` + +[I am purposely leaving out morphisms & monotones for the sake of simplicity.] + +This provides the user with much more freedom in terms of the types of Bloom programs she can write. + +#### Review + +Bloom aims to provide a new model for writing distributed programs. And since bloom only allows for monotonic data structures with monotonicity-preserving operations, we're safe from Jerry the intern, right? + +Wrong. Unfortunately, I left out an operator from Bloom's set of collection operators. `<-` removes all elements in the right-hand-size from the table in the left-hand-side. As we've seen from Jerry's work on our original Javascript shopping cart implementation, naively attempting to remove elements from a distributed set is not a safe operation. Rollbacks can potentially destroy the properties we worked so hard to achieve. So what gives? Why would the Bloom developers add this operation? + +Despite putting so much emphasis on consistency via logical monotonicity, the Bloom programmers recognize that your program might need *some* coordination. + +In our example, we don't require coordination. We accept the fact that a user may ask a given node for the current state of her shopping cart and may not receive the most up-to-date response. There's no need for coordination, because we've used our domain knowledge to accept a compromise. + +For our shopping cart examples: when a client asks a given node what's in her cart, that node will respond with the information it's received so far. We know this information won't be *incorrect*, but this data could be *stale*. That client might be missing information. + +The Bloom team calls these points in your program *points of order*. They are points in your program where coordination may be required. In fact, the Bloom developers provide analysis tools for identifying points of order within your program. There's no reason why you couldn't implement a non-monotonic shopping cart in which all nodes must synchronize before giving a response to the user. The Bloom analysis tool would tell you where the points of order lie in your program and you would need to add coordination. + +So what does Bloom really give us? First off, it demonstrates an unusual and possibly more expressive way to program distributed systems. Consistency-wise, it uses sets under the hood for its collections. As long as you shy away from `<-` operator, you can be confident that your collections will only monotonically grow. Since the order of packets is not guaranteed, structuring these eventually consistent applications is reasonably easy within Bloom. BloomL also gives us the power to define our own monotonic data structures by "implementing" the lattice interface. + +However, Bloom makes it easy to program non-monotonic distributed programs as well. Applications may require coordination and the `<-` operator in particular can cause serious harm to our desired formal properties. Luckily, Bloom attempts to let the programmer know exactly when coordination may be required within their programs. Whenever an operation may return a stale or non-up-to-date value, Bloom's analysis tools let the programmer know. + +Another thing to consider: BloomL's user-defined lattices are just that - user-defined. It's up to the programmer to ensure that the data structures that implement the lattice interface are actually valid lattice structures. If not, Bloom can't help you. + +Currently Bloom exists as a Ruby prototype: Bud. Hypothetically speaking, there's nothing stopping the programmer from writing normal, sequentially evaluated Ruby code within Bud. This can also cause harm to our formal properties. + +All in all, Bloom provides programmers with a new model for writing distributed programs. If the user desires monotonic data structures and operations, it's relatively easy to use and reason about. Rather than blindly destroying the properties of your system, you will know exactly when you introduce a possible point of order into your program. It's up to you to decide whether or not you need to introduce coordination. ### Lasp [ Introduce Lasp ] -- cgit v1.2.3