aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMuzammil <muzammil.abdul.rehman@gmail.com>2016-12-16 17:04:19 -0500
committerMuzammil <muzammil.abdul.rehman@gmail.com>2016-12-16 17:04:19 -0500
commit921ce09aa3d0fc06d084e293047544c3bb97fc89 (patch)
tree818dd50774f6b1331d798c824839f523e029bbb2
parent33f3c34e429b7002bc84297ce9ee4d8abc703c85 (diff)
parent7ecfbfe698bc2f62cbabe3966ca81b8da480c753 (diff)
Merge branch 'master' of https://github.com/heathermiller/dist-prog-book
-rw-r--r--_bibliography/big-data.bib234
-rw-r--r--_bibliography/dist-langs.bib22
-rw-r--r--chapter/4/dist-langs.md2
-rw-r--r--chapter/6/acidic-to-basic-how-the-database-ph-has-changed.md84
-rw-r--r--chapter/6/resources/partitioned-network.jpgbin23772 -> 24303 bytes
-rw-r--r--chapter/8/Hive-architecture.pngbin0 -> 33250 bytes
-rw-r--r--chapter/8/Hive-transformation.pngbin0 -> 43008 bytes
-rw-r--r--chapter/8/big-data.md469
-rw-r--r--chapter/8/cluster-overview.pngbin0 -> 22912 bytes
-rw-r--r--chapter/8/ecosystem.pngbin0 -> 190654 bytes
-rw-r--r--chapter/8/edge-cuts.pngbin0 -> 11492 bytes
-rw-r--r--chapter/8/hadoop-ecosystem.jpgbin0 -> 76009 bytes
-rw-r--r--chapter/8/spark-ecosystem.pngbin0 -> 49070 bytes
-rw-r--r--chapter/8/spark_pipeline.pngbin0 -> 17570 bytes
-rw-r--r--chapter/8/sparksql-data-flow.jpgbin0 -> 128479 bytes
-rw-r--r--chapter/8/sql-vs-dataframes-vs-datasets.pngbin0 -> 48229 bytes
-rw-r--r--chapter/8/trash.md53
-rw-r--r--chapter/8/vertex-cuts.pngbin0 -> 14919 bytes
-rw-r--r--resources/img/data-parallelism.pngbin0 -> 63389 bytes
-rw-r--r--resources/img/mapreduce-execution.pngbin0 -> 230476 bytes
20 files changed, 799 insertions, 65 deletions
diff --git a/_bibliography/big-data.bib b/_bibliography/big-data.bib
index 416b697..599b3c9 100644
--- a/_bibliography/big-data.bib
+++ b/_bibliography/big-data.bib
@@ -1,26 +1,208 @@
-@inproceedings{Uniqueness,
- author = {Philipp Haller and
- Martin Odersky},
- title = {Capabilities for Uniqueness and Borrowing},
- booktitle = {ECOOP 2010, Maribor, Slovenia, June 21-25, 2010.},
- pages = {354--378},
- year = {2010},
-}
-
-@inproceedings{Elsman2005,
- author = {Martin Elsman},
- title = {Type-specialized serialization with sharing},
- booktitle = {Trends in Functional Programming},
- year = {2005},
- pages = {47-62},
-}
-
-@article{Kennedy2004,
- author = {Andrew Kennedy},
- title = {Pickler combinators},
- journal = {J. Funct. Program.},
- volume = {14},
- number = {6},
- year = {2004},
- pages = {727-739},
-} \ No newline at end of file
+@inproceedings{armbrust2015spark,
+ title={Spark sql: Relational data processing in spark},
+ author={Armbrust, Michael and Xin, Reynold S and Lian, Cheng and Huai, Yin and Liu, Davies and Bradley, Joseph K and Meng, Xiangrui and Kaftan, Tomer and Franklin, Michael J and Ghodsi, Ali and others},
+ booktitle={Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data},
+ pages={1383--1394},
+ year={2015},
+ organization={ACM}
+}
+
+@article{bu2010haloop,
+ title={HaLoop: efficient iterative data processing on large clusters},
+ author={Bu, Yingyi and Howe, Bill and Balazinska, Magdalena and Ernst, Michael D},
+ journal={Proceedings of the VLDB Endowment},
+ volume={3},
+ number={1-2},
+ pages={285--296},
+ year={2010},
+ publisher={VLDB Endowment}
+}
+
+@inproceedings{chambers2010flumejava,
+ title={FlumeJava: easy, efficient data-parallel pipelines},
+ author={Chambers, Craig and Raniwala, Ashish and Perry, Frances and Adams, Stephen and Henry, Robert R and Bradshaw, Robert and Weizenbaum, Nathan},
+ booktitle={ACM Sigplan Notices},
+ volume={45},
+ number={6},
+ pages={363--375},
+ year={2010},
+ organization={ACM}
+}
+
+
+@article{ching2015one,
+ title={One trillion edges: graph processing at Facebook-scale},
+ author={Ching, Avery and Edunov, Sergey and Kabiljo, Maja and Logothetis, Dionysios and Muthukrishnan, Sambavi},
+ journal={Proceedings of the VLDB Endowment},
+ volume={8},
+ number={12},
+ pages={1804--1815},
+ year={2015},
+ publisher={VLDB Endowment}
+}
+
+@article{dean2008mapreduce,
+ title={MapReduce: simplified data processing on large clusters},
+ author={Dean, Jeffrey and Ghemawat, Sanjay},
+ journal={Communications of the ACM},
+ volume={51},
+ number={1},
+ pages={107--113},
+ year={2008},
+ publisher={ACM}
+}
+
+
+@inproceedings{ekanayake2010twister,
+ title={Twister: a runtime for iterative mapreduce},
+ author={Ekanayake, Jaliya and Li, Hui and Zhang, Bingjing and Gunarathne, Thilina and Bae, Seung-Hee and Qiu, Judy and Fox, Geoffrey},
+ booktitle={Proceedings of the 19th ACM International Symposium on High Performance Distributed Computing},
+ pages={810--818},
+ year={2010},
+ organization={ACM}
+}
+
+@inproceedings{ghemawat2003google,
+ title={The Google file system},
+ author={Ghemawat, Sanjay and Gobioff, Howard and Leung, Shun-Tak},
+ booktitle={ACM SIGOPS operating systems review},
+ volume={37},
+ number={5},
+ pages={29--43},
+ year={2003},
+ organization={ACM}
+}
+
+@inproceedings{hindman2011mesos,
+ title={Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center.},
+ author={Hindman, Benjamin and Konwinski, Andy and Zaharia, Matei and Ghodsi, Ali and Joseph, Anthony D and Katz, Randy H and Shenker, Scott and Stoica, Ion},
+ booktitle={NSDI},
+ volume={11},
+ pages={22--22},
+ year={2011}
+}
+
+@inproceedings{isard2007dryad,
+ title={Dryad: distributed data-parallel programs from sequential building blocks},
+ author={Isard, Michael and Budiu, Mihai and Yu, Yuan and Birrell, Andrew and Fetterly, Dennis},
+ booktitle={ACM SIGOPS Operating Systems Review},
+ volume={41},
+ number={3},
+ pages={59--72},
+ year={2007},
+ organization={ACM}
+}
+
+
+@inproceedings{malewicz2010pregel,
+ title={Pregel: a system for large-scale graph processing},
+ author={Malewicz, Grzegorz and Austern, Matthew H and Bik, Aart JC and Dehnert, James C and Horn, Ilan and Leiser, Naty and Czajkowski, Grzegorz},
+ booktitle={Proceedings of the 2010 ACM SIGMOD International Conference on Management of data},
+ pages={135--146},
+ year={2010},
+ organization={ACM}
+}
+
+
+@inproceedings{olston2008pig,
+ title={Pig latin: a not-so-foreign language for data processing},
+ author={Olston, Christopher and Reed, Benjamin and Srivastava, Utkarsh and Kumar, Ravi and Tomkins, Andrew},
+ booktitle={Proceedings of the 2008 ACM SIGMOD international conference on Management of data},
+ pages={1099--1110},
+ year={2008},
+ organization={ACM}
+}
+
+@article{pike2005interpreting,
+ title={Interpreting the data: Parallel analysis with Sawzall},
+ author={Pike, Rob and Dorward, Sean and Griesemer, Robert and Quinlan, Sean},
+ journal={Scientific Programming},
+ volume={13},
+ number={4},
+ pages={277--298},
+ year={2005},
+ publisher={Hindawi Publishing Corporation}
+}
+
+@inproceedings{shvachko2010hadoop,
+ title={The hadoop distributed file system},
+ author={Shvachko, Konstantin and Kuang, Hairong and Radia, Sanjay and Chansler, Robert},
+ booktitle={2010 IEEE 26th symposium on mass storage systems and technologies (MSST)},
+ pages={1--10},
+ year={2010},
+ organization={IEEE}
+}
+
+@online{WinNT,
+ author = {Tarau, Paul},
+ title = {Bulk synchronous model},
+ year = 2014,
+ url = {http://www.cse.unt.edu/~tarau/teaching/parpro/papers/Bulk%20synchronous%20parallel.pdf},
+ urldate = {2016-11-24}
+}
+
+@article{thusoo2009hive,
+ title={Hive: a warehousing solution over a map-reduce framework},
+ author={Thusoo, Ashish and Sarma, Joydeep Sen and Jain, Namit and Shao, Zheng and Chakka, Prasad and Anthony, Suresh and Liu, Hao and Wyckoff, Pete and Murthy, Raghotham},
+ journal={Proceedings of the VLDB Endowment},
+ volume={2},
+ number={2},
+ pages={1626--1629},
+ year={2009},
+ publisher={VLDB Endowment}
+}
+
+@inproceedings{thusoo2010hive,
+ title={Hive-a petabyte scale data warehouse using hadoop},
+ author={Thusoo, Ashish and Sarma, Joydeep Sen and Jain, Namit and Shao, Zheng and Chakka, Prasad and Zhang, Ning and Antony, Suresh and Liu, Hao and Murthy, Raghotham},
+ booktitle={2010 IEEE 26th International Conference on Data Engineering (ICDE 2010)},
+ pages={996--1005},
+ year={2010},
+ organization={IEEE}
+}
+@inproceedings{vavilapalli2013apache,
+ title={Apache hadoop yarn: Yet another resource negotiator},
+ author={Vavilapalli, Vinod Kumar and Murthy, Arun C and Douglas, Chris and Agarwal, Sharad and Konar, Mahadev and Evans, Robert and Graves, Thomas and Lowe, Jason and Shah, Hitesh and Seth, Siddharth and others},
+ booktitle={Proceedings of the 4th annual Symposium on Cloud Computing},
+ pages={5},
+ year={2013},
+ organization={ACM}
+}
+@inproceedings{xin2013graphx,
+ title={Graphx: A resilient distributed graph system on spark},
+ author={Xin, Reynold S and Gonzalez, Joseph E and Franklin, Michael J and Stoica, Ion},
+ booktitle={First International Workshop on Graph Data Management Experiences and Systems},
+ pages={2},
+ year={2013},
+ organization={ACM}
+}
+
+@inproceedings{yu2008dryadlinq,
+ title={DryadLINQ: A System for General-Purpose Distributed Data-Parallel Computing Using a High-Level Language.},
+ author={Yu, Yuan and Isard, Michael and Fetterly, Dennis and Budiu, Mihai and Erlingsson, {\'U}lfar and Gunda, Pradeep Kumar and Currey, Jon},
+ booktitle={OSDI},
+ volume={8},
+ pages={1--14},
+ year={2008}
+}
+
+@article{zaharia2010spark,
+ title={Spark: cluster computing with working sets.},
+ author={Zaharia, Matei and Chowdhury, Mosharaf and Franklin, Michael J and Shenker, Scott and Stoica, Ion},
+ journal={HotCloud},
+ volume={10},
+ pages={10--10},
+ year={2010}
+}
+
+
+@article{zhang2012imapreduce,
+ title={imapreduce: A distributed computing framework for iterative computation},
+ author={Zhang, Yanfeng and Gao, Qixin and Gao, Lixin and Wang, Cuirong},
+ journal={Journal of Grid Computing},
+ volume={10},
+ number={1},
+ pages={47--68},
+ year={2012},
+ publisher={Springer}
+}
diff --git a/_bibliography/dist-langs.bib b/_bibliography/dist-langs.bib
index 416b697..657ca63 100644
--- a/_bibliography/dist-langs.bib
+++ b/_bibliography/dist-langs.bib
@@ -23,4 +23,24 @@
number = {6},
year = {2004},
pages = {727-739},
-} \ No newline at end of file
+}
+
+@article{Mernik2005,
+ author = {Marjan Mernik},
+ title = {When and how to develop domain-specific languages},
+ journal = {ACM computing surveys},
+ volume = {37},
+ number = {4},
+ year = {2005},
+ pages = {316-344},
+}
+
+@article{Armstrong2010,
+ author = {Joe Armstrong},
+ title = {Erlang},
+ journal = {Communications of the ACM},
+ volume = {53},
+ number = {9},
+ year = {2010},
+ pages = {68-75},
+}
diff --git a/chapter/4/dist-langs.md b/chapter/4/dist-langs.md
index 9c8a8c9..1736064 100644
--- a/chapter/4/dist-langs.md
+++ b/chapter/4/dist-langs.md
@@ -8,4 +8,4 @@ Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor i
## References
-{% bibliography --file dist-langs %} \ No newline at end of file
+{% bibliography --file dist-langs %}
diff --git a/chapter/6/acidic-to-basic-how-the-database-ph-has-changed.md b/chapter/6/acidic-to-basic-how-the-database-ph-has-changed.md
index 540a2a9..ffc94c0 100644
--- a/chapter/6/acidic-to-basic-how-the-database-ph-has-changed.md
+++ b/chapter/6/acidic-to-basic-how-the-database-ph-has-changed.md
@@ -10,7 +10,7 @@ Relational Database Management Systems are the most ubiquitous database systems
* **Atomicity** guarantees that any transaction will either complete or leave the database unchanged. If any operation of the transaction fails, the entire transaction fails. Thus, a transaction is perceived as an atomic operation on the database. This property is guaranteed even during power failures, system crashes and other erroneous situations.
-* **Consistency** guarantees that any transaction will always result in a valid database state, i.e., the transaction preserves all database rules, such as unique keys, etc.
+* **Consistency** guarantees that any transaction will always result in a valid database state, i.e., the transaction preserves all database rules, such as unique keys.
* **Isolation** guarantees that concurrent transactions do not interfere with each other. No transaction views the effects of other transactions prematurely. In other words, they execute on the database as if they were invoked serially (though a read and write can still be executed in parallel).
@@ -20,17 +20,17 @@ Relational Database Management Systems are the most ubiquitous database systems
Because of the strong guarantees this model simplifies the life of the developer and has been traditionally the go to approach in application development. It is instructive to examine how these properties are enforced.
-Single node databases can simply rely upon locking to ensure *ACID*ity. Each transaction marks the data it operates upon, thus enabling the database to block other concurrent transactions from modifying the same data. The lock has to be acquired both while reading and writing data. The locking mechanism enforces a strict linearizable consistency. An alternative, *multiversioning* allows a read and write operation to execute in parallel. Each transaction which reads data from the database is provided the earlier unmodified version of the data that is being modified by a write operation. This means that read operations don't have to acquire locks on the database. This enables read operations to execute without blocking write operations and write operations to execute without blocking read operations.
+Single node databases can simply rely upon locking to ensure *ACID*ity. Each transaction marks the data it operates upon, thus enabling the database to block other concurrent transactions from modifying the same data. The lock has to be acquired both while reading and writing data. The locking mechanism enforces a strict linearizable consistency, i.e., all transactions are performed in a particular sequence and invariants are always maintained by them. An alternative, *multiversioning* allows a read and write operation to execute in parallel. Each transaction which reads data from the database is provided the earlier unmodified version of the data that is being modified by a write operation. This means that read operations don't have to acquire locks on the database. This enables read operations to execute without blocking write operations and write operations to execute without blocking read operations.
This model works well on a single node. But it exposes a serious limitation when too many concurrent transactions are performed. A single node database server will only be able to process so many concurrent read operations. The situation worsens when many concurrent write operations are performed. To guarantee *ACID*ity, the write operations will be performed in sequence. The last write request will have to wait for an arbitrary amount of time, a totally unacceptable situation for many real time systems. This requires the application developer to decide on a **Scaling** strategy.
-### Transaction Volume
+### 1.2. Scaling transaction volume
To increase the volume of transactions against a database, two scaling strategies can be considered
-**Vertical Scaling** is the easiest approach to scale a relational database. The database is simply moved to a larger computer which provides more transactional capacity. Unfortunately, its far too easy to outgrow the capacity of the largest system available and it is costly to purchase a bigger system each time that happens. Since its not commodity hardware, vendor lock-in will add to further costs.
+* **Vertical Scaling** is the easiest approach to scale a relational database. The database is simply moved to a larger computer which provides more transactional capacity. Unfortunately, its far too easy to outgrow the capacity of the largest system available and it is costly to purchase a bigger system each time that happens. Since its specialized hardware, vendor lock-in will add to further costs.
-**Horizontal Scaling** is a more viable option and can be implemented in two ways. Data can be segregated into functional groups spread across databases. This is called *Functional Scaling*. Data within a functional group can be further split across multiple databases, enabling functional areas to be scaled independently of one another for even more transactional capacity. This is called *sharding*.
+* **Horizontal Scaling** is a more viable option and can be implemented in two ways. Data can be segregated into functional groups spread across databases. This is called *Functional Scaling*. Data within a functional group can be further split across multiple databases, enabling functional areas to be scaled independently of one another for even more transactional capacity. This is called *sharding*.
Horizontal Scaling through functional partitioning enables high degree of scalability. However, the functionally separate tables employ constraints such as foreign keys. For these constraints to be enforced by the database itself, all tables have to reside on a single database server. This limits horizontal scaling. To work around this limitation the tables in a functional group have to be stored on different database servers. But now, a single database server can no longer enforce constraints between the tables. In order to ensure *ACID*ity of distributed transactions, distributed databases employ a two-phase commit (2PC) protocol.
@@ -38,13 +38,22 @@ Horizontal Scaling through functional partitioning enables high degree of scalab
* In the second phase, the coordinator asks each database to commit the data.
-2PC is a blocking protocol and is usually employed for updates which can take from a few milliseconds up to a few minutes to commit. This means that while a transaction is being processed, other transactions will be blocked. So the application that initiated the transaction will be blocked. Another option is to handle the consistency across databases at the application level. This only complicates the situation for the application developer who is likely to implement a similar strategy if *ACID*ity is to be maintained.
+2PC is a blocking protocol and updates can take from a few milliseconds up to a few minutes to commit. This means that while a transaction is being processed, other transactions will be blocked. So the application that initiated the transaction will be blocked. Another option is to handle the consistency across databases at the application level. This only complicates the situation for the application developer who is likely to implement a similar strategy if *ACID*ity is to be maintained.
+## 2. The Distributed Concoction
-## The **ACID*ic side effect
-Traditional distributed databases provide strong consistency guarantees. While processing a transaction, they block other client requests. Imagine a large scale internet based shopping service with consistency enforced across functional partitions. This means that any tra
+A distributed application is expected to have the following three desirable properties:
-## 3. A Distributed Concoction
+1. **Consistency** - This is the guarantee of total ordering of all operations on a data object such that each operation appears indivisible. This means that any read operation must return the most recently written value. This provides a very convenient invariant to the client application. This definition of consistency is the same as the **Atomic**ity guarantee provided by relational database transactions.
+
+2. **Availability** - Every request to a distributed system must result in a response. However, this is too vague a definition. Whether a node failed in the process of responding or it ran a really long computation to generate a response or whether the request or the response got lost due to network issues is generally impossible to determine by the client and willHence, for all practical purposes, availability can be defined as the service responding to a request in a timely fashion, the amount of delay an application can bear depends on the application domain.
+
+3. **Partition Tolerance** - Partitioning is the loss of messages between the nodes of a distributed system. During a network partition, the system can lose arbitrary number of messages between nodes. A partition tolerant system will always respond correctly unless a total network failure happens.
+
+Consistency requirement implies that every request will be treated atomically by the system even if the nodes lose messages due to network partitions.
+Availability requirement implies that every request should receive a response even if a partition causes messages to be lost arbitrarily.
+
+## 3. The CAP Theorem
![Partitioned Network](resources/partitioned-network.jpg)
@@ -54,34 +63,43 @@ In the network above, all messages between the node set M and N are lost due to
2. **Consistency first** - The system does not allow any application to write to data objects as it cannot ensure **consistency** of replica states. This means that the system is perceived to be **unavailable** by the applications.
-If there are no partitions, clearly both consistency and availability can be guaranteed by the system.
+If there are no partitions, clearly both consistency and availability can be guaranteed by the system. This observation led Eric Brewer to conjecture in an invited talk at PODC 2000-
+
+<blockquote>It is impossible for a web service to provide the following three guarantees:
+Consistency
+Availability
+Partition Tolerance</blockquote>
+
+This is called the CAP theorem.
-This simple observation shows a tension between three issues concerning distributed systems -
+It is clear that the prime culprit here is network partition. If there are no network partitions, any distributed service will be both highly available and provide strong consistency of shared data objects. Unfortunately, network partitions cannot be remedied in a distributed system.
-**Consistency** is the guarantee of total ordering of all operations on a data object such that each operation appears indivisible. This means that any read operation must return the most recently written value. This provides a very convenient invariant to the client application that uses the distributed data store. This definition of consistency is the same as the **Atomic**ity guarantee provided by relational database transactions.
+## 4. Two of Three - Exploring the CAP Theorem
-**Availability** is the guarantee that every request to a distributed system must result in a response. However, this is too vague a definition. Whether a node failed in the process of responding or it ran a really long computation to generate a response or whether the request or the response got lost due to network issues is generally impossible to determine by the client and willHence, for all practical purposes, availability can be defined as the service responding to a request in a timely fashion, the amount of delay an application can bear depends on the application domain.
+The CAP theorem dictates that the three desirable properties, consistency, availability and partition tolerance cannot be offered simultaneously. Let's study if its possible to achieve two of these three properties.
-**Partitioning** is the loss of messages between the nodes of a distributed system.
+### Consistency and Availability
+If there are no network partitions, then there is no loss of messages and all requests receive a response within the stipulated time. It is clearly possible to achieve both consistency and availability. Distributed systems over intranet are an example of such systems.
-This observation led Eric Brewer to conjecture in an invited talk at PODC 2000 -
+### Consistency and Partition Tolerance
+Without availability, both of these properties can be achieved easily. A centralized system can provide these guarantees. The state of the application is maintained on a single designated node. All updates from the client are forwarded by the nodes to this designated node. It updates the state and sends the response. When a failure happens, then the system does not respond and is perceived as unavailable by the client. Distributed locking algorithms in databases also provide these guarantees.
-<blockquote>It is impossible for a web service to provide the following three guarantees:
-Consistency
-Availability
-Partition Tolerance</blockquote>
+### Availability and Partition Tolerance
+Without atomic consistency, it is very easy to achieve availability even in the face of partitions. Even if nodes fail to communicate with each other, they can individually handle query and update requests issued by the client. The same data object will have different states on different nodes as the nodes progress independently. This weak consistency model is exhibited by web caches.
+
+Its clear that two of these three properties are easy to achieve in any distributed system. Since large scale distributed systems have to take partitions into account, will they have to sacrifice availability for consistency or consistency for availability? Clearly giving up either consistency or availability is too big a sacrifice.
+
+## 5. The **BASE**ic distributed state
-This is called the CAP theorem. It is clear that the prime culprit here is network partition. If there are no network partitions, any distributed service will be both highly available and provide strong consistency of shared data objects. Unfortunately, network partitions cannot be remedied in a distributed system.
+When viewed through the lens of CAP theorem and its consequences on distributed application design, we realize that we cannot commit to perfect availability and strong consistency. But surely we can explore the middle ground. We can guarantee availability most of the time with occasional inconsistent view of the data. The consistency is eventually achieved when the communication between the nodes resumes. This leads to the following properties of the current distributed applications, referred to by the acronym BASE.
-## The **BASE**ic distributed state
+* **Basically Available** services are those which are partially available when partitions happen. Thus, they appear to work most of the time. Partial failures result in the system being unavailable only for a section of the users.
-When viewed through the lens of CAP theorem and its consequences on distributed application design, we realize that we cannot commit to perfect availability and strong consistency. But surely we can explore the middle ground. We can guarantee availability most of the time with sometimes inconsistent view of the data. The consistency is eventually achieved when the communication between the nodes resumes. This leads to the following properties of the current distributed applications, referred to by the acronym BASE.
+* **Soft State** services provide no strong consistency guarantees. They are not write consistent. Since replicas may not be mutually consistent, applications have to accept stale data.
-**Basically Available** services are those which are partially available when partitions happen. Thus, they appear to work most of the time. Partial failures result in the system being unavailable only for a section of the users.
-**Soft State** services provide no strong consistency guarantees. They are not write consistent. Since replicas may not be mutually consistent, applications have to accept stale data.
-**Eventually Consistent** services try to make application state consistent whenever possible.
+* **Eventually Consistent** services try to make application state consistent whenever possible.
-## Partitions and latency
+## 6. Partitions and latency
Any large scale distributed system has to deal with latency issue. In fact, network partitions and latency are fundamentally related. Once a request is made and no response is received within some duration, the sender node has to assume that a partition has happened. The sender node can take one of the following steps:
1) Cancel the operation as a whole. In doing so, the system is choosing consistency over availability.
@@ -90,32 +108,32 @@ Any large scale distributed system has to deal with latency issue. In fact, netw
Essentially, a partition is an upper bound on the time spent waiting for a response. Whenever this upper bound is exceeded, the system chooses C over A or A over C. Also, the partition may be perceived only by two nodes of a system as opposed to all of them. This means that partitions are a local occurrence.
-## Handling Partitions
+## 7. Handling Partitions
Once a partition has happened, it has to be handled explicitly. The designer has to decide which operations will be functional during partitions. The partitioned nodes will continue their attempts at communication. When the nodes are able to establish communication, the system has to take steps to recover from the partitions.
-### Partition mode functionality
+### 7.1. Partition mode functionality
When at least one side of the system has entered into partition mode, the system has to decide which functionality to support. Deciding this depends on the invariants that the system must maintain. Depending on the nature of problem, the designer may choose to compromise on certain invariants by allowing partitioned system to provide functionality which might violate them. This means the designer is choosing availability over consistency. Certain invariants may have to be maintained and operations that will violate them will either have to be modified or prohibited. This means the designer is choosing consistency over availability.
Deciding which operations to prohibit, modify or delay also depends on other factors such as the node. If the data is stored on the same node, then operations on that data can typically proceed on that node but not on other node.
In any event, the bottomline is that if the designer wishes for the system to be available, certain operations have to be allowed. The node has to maintain a history of these operations so that it can be merged with the rest of the system when it is able to reconnect.
Since the operations can happen simultaneously on multiple disconnected nodes, all sides will maintain this history. One way to maintain this information is through version vectors.
Another interesting problem is to communicate the progress of these operations to the user. Until the system gets out of partition mode, the operations cannot be committed completely. Till then, the user interface has to faithfully represent their incomplete or in-progress status to the user.
-### Partition Recovery
+### 7.2. Partition Recovery
When the partitioned nodes are able to communicate, they have to exchange information to maintain consistency. Both sides continued in their independent direction but now the delayed operations on either side have to be performed and violated invariants have to be fixed. Given the state and history of both sides, the system has to accomplish the following tasks.
-#### Consistency
+#### 7.2.1. Consistency
During recovery, the system has to reconcile the inconsistency in state of both nodes. This is relatively straightforward to accomplish. One approach is to start from the state at the time of partition and apply operations of both sides in an appropriate manner, ensuring that the invariants are maintained. Depending on operations allowed during the partition phase, this process may or may not be possible. The general problem of conflict resolution is not solvable but a restricted set of operations may ensure that the system can always always merge conflicts. For example, Google Docs limits operations to style and text editing. But source-code control systems such as Concurrent Versioning System (CVS) may encounter conflict which require manual resolution. Research has been done on techniques for automatic state convergence. Using commutative operations allows the system to sort the operations in a consistent global order and execute them. Though all operations can't be commutative, for example - addition with bounds checking is not commutative. Mark Shapiro and his colleagues at INRIA have developed *commutative replicated data types (CRDTs)* that provably converge as operations are performed. By implementing state through CRDTs, we can ensure Availability and automatic state convergence after partitions.
-#### Compensation
+#### 7.2.2. Compensation
During partition, its possible for both sides to perform a series of actions which are externalized, i.e. their effects are visible outside the system. To compensate for these actions, the partitioned nodes have to maintain a history.
For example, consider a system in which both sides have executed the same order during a partition. During the recovery phase, the system has to detect this and distinguish it from two intentional orders. Once detected, the duplicate order has to be rolled back. If the order has been committed successfully then the problem has been externalized. The user will see twice the amount deducted from his account for a single purchase. Now, the system has to credit the appropriate amount to the user's account and possibly send an email explaining the entire debacle. All this depends on the system maintaining the history during partition. If the history is not present, then duplicate orders cannot be detected and the user will have to catch the mistake and ask for compensation.
It would have been great if the duplicate order was not issued by the system in the first place. But the requirement to maintain system availability trumps consistency. Mistakes in such cases cannot always be corrected internally. But by admitting them and compensating for them, the system arguably exhibits equivalent behavior.
-### What's the right pH for my distributed solution?
+### 8. What's the right pH for my distributed solution?
Whether an application chooses to be an *ACID*ic or *BASE*ic service depends on the domain. An application developer has to consider the consistency-availability tradeoff on a case by case basis. *ACID*ic databases provide a very simple and strong consistency model making application development easy for domains where data inconsistency cannot be tolerated. *BASE*ic systems provide a very loose consistency model, placing more burden on the application developer to understand the invariants and manage them carefully during partitions by appropriately limiting or modifying the operations.
-## References
+## 9. References
https://neo4j.com/blog/acid-vs-base-consistency-models-explained/
https://en.wikipedia.org/wiki/Eventual_consistency/
diff --git a/chapter/6/resources/partitioned-network.jpg b/chapter/6/resources/partitioned-network.jpg
index 2c91607..513fc13 100644
--- a/chapter/6/resources/partitioned-network.jpg
+++ b/chapter/6/resources/partitioned-network.jpg
Binary files differ
diff --git a/chapter/8/Hive-architecture.png b/chapter/8/Hive-architecture.png
new file mode 100644
index 0000000..9f61454
--- /dev/null
+++ b/chapter/8/Hive-architecture.png
Binary files differ
diff --git a/chapter/8/Hive-transformation.png b/chapter/8/Hive-transformation.png
new file mode 100644
index 0000000..7383188
--- /dev/null
+++ b/chapter/8/Hive-transformation.png
Binary files differ
diff --git a/chapter/8/big-data.md b/chapter/8/big-data.md
index bfd3e7b..6c0781d 100644
--- a/chapter/8/big-data.md
+++ b/chapter/8/big-data.md
@@ -1,11 +1,472 @@
---
layout: page
title: "Large Scale Parallel Data Processing"
-by: "Joe Schmoe and Mary Jane"
+by: "Jingjing and Abhilash"
---
+## Introduction
+The growth of Internet has generated the so-called big data(terabytes or petabytes). It is not possible to fit them into a single machine or process them with one single program. Often the computation has to be done fast enough to provide practical services. A common approach taken by tech giants like Google, Yahoo, Facebook is to process big data across clusters of commodity machines. Many of the computations are conceptually straightforward, and Google proposed the MapReduce model to abstract the logic and proved to be simple and powerful. From then on, the idea inspired lots of other programming models. In this chapter, we will present how programming models evolve over time, why their execution engines are designed in certain ways, and underlying ecosystem that supports each developing thread.
+## 1 Programming Models
+### 1.1 Data parallelism
+*Data parallelism* is to run a single operation on different pieces of the data on different machines in parallel. Comparably, a sequential computation looks like *"for all elements in the dataset, do operation A"*, where dataset could be in the order of terabytes or petabytes aka. big data and one wants to scale up the processing. The challenges to do this sequential computation in a parallelized manner include how to abstract the different types of computations in a simple and correct way, how to distribute the data to hundreds/thousands of machines, how to handle failures and so on.
-Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. {% cite Uniqueness --file big-data %}
+<figure class="main-container">
+ <img src="{{ site.baseurl }}/resources/img/data-parallelism.png" alt="Data Parallelism" />
+</figure>
-## References
+**MapReduce** {% cite dean2008mapreduce --file big-data %} is a programming model proposed by Google to initially satisfy their demand of large-scale indexing for web search service. It provides a simple user program interface: *map* and *reduce* functions and automatically handles the parallelization and distribution.
+
+The MapReduce model is simple and powerful, and quickly became very popular among developers. However, when developers start writing real-world applications, they often end up chaining together MapReduce stages. The pipeline of MapReduce forces programmers to write additional coordinating codes, i.e. the development style goes backward from simple logic computation abstraction to lower-level coordination management. In map reduce, programmers need to reason about data representation on disk or in storage services such as a database. Besides, developers need to clearly understand the map reduce execution model to do manual optimizations[ref]. **FlumeJava** {%cite chambers2010flumejava --file big-data%} library intends to provide support for developing data-parallel pipelines by abstracting away the complexity involved in data representation and implicitly handling the optimizations. It defers the evaluation, constructs an execution plan from parallel collections, optimizes the plan, and then executes underlying MR primitives. The optimized execution is comparable with hand-optimized pipelines, so there's no need to write raw MR programs directly.
+
+An alternative approach to data prallelism is to construct complex, multi-step directed acyclic graphs (DAGs) of work from the user instructions and execute those DAGs all at once. This eliminates the costly synchronization required by MapReduce and makes applications much easier to build and reason about. Dryad, a Microsoft Research project used internally at Microsoft was one such project which leveraged this model of computation.
+
+Microsfot **Dryad** {% cite isard2007dryad --file big-data %} abstracts individual computational tasks as vertices, and constructs a communication graph between those vertices. What programmers need to do is to describe this DAG graph and let Dryad execution engine construct the execution plan and manage scheduling and optimization. One of the advantages of Dryad over MapReduce is that Dryad vertices can process an arbitrary number of inputs and outputs, while MR only supports a single input and a single output for each vertex. Besides the flexibility of computations, Dryad also supports different types of communication channel: file, TCP pipe and shared-memory FIFO.
+
+
+Dryad expresses computation as acyclic data flows, which might be too expensive for some complex applications, e.g. iterative machine learning algorithms. **Spark** {% cite zaharia2010spark --file big-data%} is a framework that uses functional programming and pipelining to provide such support. It is largely inspired by MapReduce's model and builds upon the ideas behind DAG, lazy evaluation of DryadLinq. Instead of writing data to disk for each job as MapReduce does Spark can cache the results across jobs. Spark explicitly caches computational data in memory thorugh specialized immutable datasets named Resilient Distributed Sets(RDD) and reuse the same dataset across multiple parallel operations. The Spark builds upon RDD to achieve fault tolerance by reusing the lineage information of the lost RDD. This results in lesser overhead than what is seen in fault tolerance achieved by checkpoint in Distribtued Shared Memory systems. Moreover, Spark powers a stack of other libraries, e.g..SQL&DataFrames, GraphX, and can easily combine those libraries in one single application. These feature makes Spark the best fit for iterative jobs and interactive analytics and also helps it in providing better performance. Above all, any system can be easily expressed by Spark enabling other models to leverage the specific advantages of Spark systems and still retain the process of computation without any changes to Spark system[ref].
+
+
+Following four sections discuss about the programming models of MapReduce, FlumeJava, Dryad and Spark.
+
+
+### 1.1.1 MapReduce
+In this model, parallelizable computations are abstracted into map and reduce functions. The computation accepts a set of key/value pairs as input and produces a set of key/value pairs as output. The process involves two phases:
+- *Map*, written by the user, accepts a set of key/value pairs("record") as input, applies *map* operation on each record, then it computes a set of intermediate key/value pairs as output.
+- *Reduce*, also written by the user, accepts an intermediate key and a set of values associated with that key, operate on them, produces zero or one output value.
+ Note: there is a *Shuffle* phase between *map* and *reduce*, provided by MapReduce library, groups the all the intermediate values of the same key together and pass to *Reduce* function. We will discuss more in Section 2 Execution Models.
+
+Conceptually, the map and reduction functions have associated **types**:
+
+\\[map (k1,v1) \rightarrow list(k2,v2)\\]
+
+\\[reduce (k2,list(v2)) \rightarrow list(v2)\\]
+
+
+The input keys and values are drawn from a different domain than the output keys and values. The intermediate keys and values are from the same domain as the output keys and values.
+
+
+Concretely, considering the problem of counting the number of occurrence of each word in a large collection of documents: each time, a `map` function that emits a word plus its count 1; a `reduce` function sums together all counts emitted for the same word
+
+```
+map(String key, String value):
+ // key: document name
+ // value: document contents
+ for each word w in value:
+ EmitIntermediate(w, "1");
+
+reduce(String key, Iterator values):
+ // key: a word
+ // values: a list of counts
+ int result = 0;
+ for each v in values:
+ result += ParseInt(v);
+ Emit(AsString(result));
+```
+
+During executing, the MapReduce library assigns a master node to manage data partition and scheduling, other nodes can serve as workers to run either *map* or *reduce* operations on demands. More details of the execution model is discussed later. Here, it's worth mentioning that the intermediate results are written into disks and reduce operation will read from disk. This is crucial for fault tolerance.
+
+*Fault Tolerance*
+MapReduce runs on hundreds or thousands of unreliable commodity machines, so the library must provide fault tolerance. The library assumes that master node would not fail, and it monitors worker failures. If no status update is received from a worker on timeout, the master will mark it as failed. Then the master may schedule the associated task to other workers depending on task type and status. The commits of *map* and *reduce* task outputs are atomic, where the in-progress task writes data into private temporary files, once the task succeeds, it negotiate with the master and rename files to complete the task . In the case of failure, the worker discards those temporary files. This guarantees that if the computation is deterministic, the distribution implementation should produce same outputs as non-faulting sequential execution.
+
+*Limitations*
+Many a analytics workloads like K-means, logistic regression, graph processing applications like PageRank, shortest path using parallel breadth first search require multiple stages of map reduce jobs. In regular map reduce framework like Hadoop, this requires the developer to manually handle the iterations in the driver code. At every iteration, the result of each stage T is written to HDFS and loaded back again at stage T+1 causing a performance bottleneck. The reason being wastage of network bandwidth, CPU resources and mainly the disk I/O operations which are inherently slow. In order to address such challenges in iterative workloads on map reduce, frameworks like Haloop {% cite bu2010haloop --file big-data %}, Twister {% cite ekanayake2010twister --file big-data %} and iMapReduce {% cite zhang2012imapreduce --file big-data %} adopt special techniques like caching the data between iterations and keeping the mapper and reducer alive across the iterations.
+
+### 1.1.2 FlumeJava
+FlumeJava {%cite chambers2010flumejava --file big-data %}was introduced to make it easy to develop, test, and run efficient data-parallel pipelines. FlumeJava represents each dataset as an object and transformation is invoked by applying methods on these objects. It constructs an efficient internal execution plan from a pipeline of MapReduce jobs, uses deferred evaluation and optimizes based on plan structures. The debugging ability allows programmers to run on the local machine first and then deploy to large clusters.
+
+*Core Abstraction*
+- `PCollection<T>`, a immutable bag of elements of type `T`
+- `recordOf(...)`, specifies the encoding of the instance
+- `PTable<K, V>`, a subclass of `PCollection<Pair<K,V>>`, a immutable multi-map with keys of type `K` and values of type `V`
+- `parallelDo()`, can be expressed both the map and reduce parts of MapReduce
+- `groupByKey()`, same as shuffle step of MapReduce
+- `combineValues()`, semantically a special case of `parallelDo()`, a combination of a MapReduce combiner and a MapReduce reducer, which is more efficient than doing all the combining in the reducer.
+
+*Deferred Evaluation & Optimizer*
+The state of each `PCollection` object is either *deferred* (not yet computed) and *materialized* (computed). When the program invokes a parallel operation, it does not actually run the operation. Instead, it performs the operation only when needed. FlumeJava also provides some optimization practices: 1) parallelDo Fusion: f(g(x)) => f o g(x) to reduce steps; 2) MapShuffleCombineReduce (MSCR) Operation that generalizes MapReduce jobs to accept multiple inputs and multiple outputs. And for this, FlumeJava does another MSCR fusion.
+
+
+### 1.1.3 Dryad
+Dryad is a more general and flexible execution engine that execute subroutines at a specified graph vertices. Developers can specify an arbitrary directed acyclic graph to combine computational "vertices" with communication channels (file, TCP pipe, shared-memory FIFO) and build a dataflow graph. Compared with MapReduce, Dryad can specify an arbitrary DAG that have multiple number of inputs/outputs and support multiple stages. Also it can have more channels and boost the performance when using TCP pipes and shared-memory. But like writing a pipeline of MapReduce jobs, Dryad is a low-level programming model and hard for users to program, thus a more declarative model - DryadLINQ {%cite yu2008dryadlinq --file big-data %} was created to fill in the gap. It exploits LINQ, a query language in .NET and automatically translates the data-parallel part into execution plan and passed to the Dryad execution engine. Like MR, writing raw Dryad is hard, programmers need to understand system resources and other lower-level details. This motivates a more declarative programming model: DryadLINQ - a querying language.
+
+### 1.1.4 Spark
+
+Spark  {%cite zaharia2010spark --file big-data %} is a fast, in-memory data processing engine with an elegant and expressive development interface which enables developers to efficiently execute machine learning, SQL or streaming workloads that require fast iterative access to datasets. Its a functional style programming model (similar to DryadLINQ) where a developer can create acyclic data flow graphs and transform a set of input data through a map - reduce like operators. Spark provides two main abstractions - distributed in-memory storage (RDD) and parallel operations (based on Scala’s collection API) on data sets high performance processing, scalability and fault tolerance. 
+
+*Distributed in-memory storage - Resilient Distributed Data sets :*
+
+RDD is a partitioned, read only collection of objects which can be created from data in stable storage or by transforming other RDD. It can be distributed across multiple nodes (parallelize) in a cluster and is fault tolerant(Resilient). If a node fails, a RDD can always be recovered using its lineage graph (information on how it was derived from dataset). A RDD is stored in memory (as much as it can fit and rest is spilled to disk) and is immutable - It can only be transformed to a new RDD. These are the lazy transformations which are applied only if any action is performed on the RDD. Hence, RDD need not be materialized at all times.
+
+The properties that power RDD with the above mentioned features :
+- A list of dependencies on other RDD’s.
+- An array of partitions that a dataset is divided into.
+- A compute function to do a computation on partitions.
+- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
+- Optional preferred locations (aka locality info), (e.g. block locations for an HDFS file)
+
+
+<figure class="main-container">
+ <img src="./spark_pipeline.png" alt="Spark pipeline" />
+</figure>
+
+
+Spark API provide two kinds of operations on a RDD:
+
+- Transformations - lazy operations that return another RDD.
+ - `map (f : T => U) : RDD[T] ⇒ RDD[U]` : Return a MappedRDD[U] by applying function f to each element
+ - `flatMap( f : T ⇒ Seq[U]) : RDD[T] ⇒ RDD[U]` : Return a new FlatMappedRDD[U] by first applying a function to all elements  and then flattening the results.
+ - `filter(f:T⇒Bool) : RDD[T] ⇒ RDD[T]` : Return a FilteredRDD[T] having elemnts that f return true
+ - `groupByKey()` : Being called on (K,V) Rdd, return a new RDD[([K], Iterable[V])]
+ - `reduceByKey(f: (V, V) => V)` : Being called on (K, V) Rdd, return a new RDD[(K, V)] by aggregating values using eg: reduceByKey(_+_)
+ - `join((RDD[(K, V)], RDD[(K, W)]) ⇒ RDD[(K, (V, W))]` :Being called on (K,V) Rdd, return a new RDD[(K, (V, W))] by joining them by key K.
+
+
+- Actions - operations that trigger computation on a RDD and return values.
+
+ - `reduce(f:(T,T)⇒T) : RDD[T] ⇒ T` : return T by reducing the elements using specified commutative and associative binary operator
+ - `collect()` : Return an Array[T] containing all elements
+ - `count()` : Return the number of elements
+
+RDDs by default are discarded after use. However, Spark provides two explicit operations persist() and cache() to ensure RDDs are persisted in memory once the RDD has been computed for the first time.
+
+*Why RDD over Distributed Shared memory (DSM) ?*
+RDDs are immutable and can only be created through coarse grained transformation while DSM allows fine grained read and write operations to each memory location. Hence RDDs do not incur the overhead of checkpointing thats present in DSM and can be recovered using their lineages.
+RDDs are immutable and hence a straggler(slow node) can be replaced with backup copy as in Map reduce. This is hard to implement in DSM as two copies point to the same location and can interfere in each other’s update.
+Other benefits include the scheduling of tasks based on data locality to improve performance and the ability of the RDDs to degrade gracefully incase of memory shortage. Partitions that do not fit in RAM gets spilled to the disk (performance will then be equal to that of any data parallel system).
+
+***Challenges in Spark***
+
+- `Functional API semantics` : The GroupByKey operator is costly in terms of performance. In that it returns a distributed collection of (key, list of value) pairs to a single machine and then an aggregation on individual keys is performed on the same machine resulting in computation overhead. Spark does provide reduceByKey operator which does a partial aggregation on invidual worker nodes before returning the distributed collection. However, developers who are not aware of such a functionality can unintentionally choose groupByKey.
+
+- `Debugging and profiling` : There is no availability of debugging tools and developers find it hard to realize if a computation is happening more on a single machine or if the data-structure they used were inefficient.
+
+### 1.2 Querying: declarative interfaces
+MapReduce provides only two high level primitives - map and reduce that the programmers have to worry about. MapReduce takes care of all the processing over a cluster, failure and recovery, data partitioning etc. However, the framework suffers from rigidity with respect to its one-input data format (key/value pair) and two-stage data flow.
+Several important patterns like joins (which could be highly complex depending on the data) are extremely hard to implement and reason about for a programmer. Sometimes the code could be become repetitive when the programmer wants to implement most common operations like projection, filtering etc.
+Non-programmers like data scientists would highly prefer SQL like interface over a cumbersome and rigid framework{% cite scaling-spark-in-real-world --file big-data%}. Such a high level declarative language can easily express their task while leaving all of the execution optimization details to the backend engine. Hence, these kind of abstractions provide ample opportunities for query optimizations.
+
+Sawzall {% cite pike2005interpreting --file big-data%} is a programming language built on top of MapReduce. It consists of a *filter* phase (map) and an *aggregation* phase (reduce). User program can specify the filter function, and emit the intermediate pairs to external pre-built aggregators.
+
+Apart from Sawzal, Pig {%cite olston2008pig --file big-data %} and Hive {%cite thusoo2009hive --file big-data %} are the other major components that sit on top of Hadoop framework for processing large data sets without the users having to write Java based MapReduce code.
+
+Hive is built by Facebook to organize dataset in structured formats and still utilize the benefit of MapReduce framework. It has its own SQL-like language: HiveQL {%cite thusoo2010hive --file big-data %} which is easy for anyone who understands SQL. Hive reduces code complexity and eliminates lots of boiler plate that would otherwise be an overhead with Java based MapReduce approach. It has a component called *metastore* that are created and reused each time the table is referenced by HiveQL like the way traditional warehousing solutions do. The drawback to using Hive is programmers have to be familiar with basic techniques and best practices for running their Hive queries at maximum speed as it depends on the Hive optimizer. Hive requires developers train the Hive optimizer for efficient optimization of their queries.
+
+Relational interface to big data is good, however, it doesn’t cater to users who want to perform
+
+- ETL to and from various semi or unstructured data sources.
+- advanced analytics like machine learning or graph processing.
+
+These user actions require best of both the worlds - relational queries and procedural algorithms. Pig Latin {% cite olston2008pig --file big-data%} and Spark SQL {% cite armbrust2015spark --file big-data%} bridges this gap by letting users to seamlessly intermix both relational and procedural API. Both the frameworks free the programmer from worrying about internal execution model by providing implicit optimization on the user input DAG of transformations.
+
+Pig Latin aims at a sweet spot between declarative and procedural programming. For advanced programmers, SQL is unnatural to implement program logic and Pig Latin wants to dissemble the set of data transformation into a sequence of steps. This makes Pig more verbose than Hive.
+
+SparkSQL though has the same goals as that of Pig, is better given the Spark exeuction engine, efficient fault tolerance mechanism of Spark and specialized data structure called Dataset.
+
+The following subsections will discuss Hive, Pig Latin, SparkSQL in details.
+
+
+### 1.2.1 Hive/HiveQL
+
+Hive is a data-warehousing infrastructure built on top of the map reduce framework - Hadoop. The primary responsibility of Hive is to provide data summarization, query and analysis. It  supports analysis of large datasets stored in Hadoop’s HDFS {% cite shvachko2010hadoop --file big-data%}. It supports SQL-Like access to structured data which is known as HiveQL (or HQL) as well as big data analysis with the help of MapReduce. These SQL queries can be compiled into map reduce jobs that can be executed be executed on Hadoop. It drastically brings down the development time in writing and maintaining Hadoop jobs.
+
+Data in Hive is organized into three different formats :
+
+`Tables`: Like RDBMS tables Hive contains rows and tables and every table can be mapped to HDFS directory. All the data in the table is serialized and stored in files under the corresponding directory. Hive is extensible to accept user defined data formats, custom serialize and de-serialize methods. It also supports external tables stored in other native file systems like HDFS, NFS or local directories.
+
+`Paritions`: Distribution of data in sub directories of table directory is is determined by one or more partitions. A table can be further partitioned on columns.
+
+`Buckets`: Data in each partition can be further divided into buckets on the basis on hash of a column in a table. Each bucket is stored as a file in the partition directory.
+
+***HiveSQL***: Hive query language consists of a subset of SQL along with some extensions. The language is very SQL-like and supports features like subqueries, joins, cartesian product, group by, aggregation, describe and more. MapReduce programs can also be used in Hive queries. A sample query using MapReduce would look like this:
+```
+FROM (
+ MAP inputdata USING 'python mapper.py' AS (word, count)
+ FROM inputtable
+ CLUSTER BY word
+ )
+ REDUCE word, count USING 'python reduce.py';
+```
+This query uses mapper.py for transforming inputdata into (word, count) pair, distributes data to reducers by hashing on word column (given by CLUSTER) and uses reduce.py.
+INSERT INTO, UPDATE, and DELETE are not supported which makes it easier to handle reader and writer concurrency.
+
+
+***Serialization/Deserialization***
+Hive implements the LazySerDe as the default SerDe interface. A SerDe is a combination of serialization and deserialization which helps developers instruct Hive on how their records should be processed. The Deserializer interface translates rows into internal objects lazily so that the cost of Deserialization of a column is incurred only when it is needed. The Serializer, however, converts a Java object into a format that Hive can write to HDFS or another supported system. Hive also provides a RegexSerDe which allows the use of regular expressions to parse columns out from a row.
+
+### 1.2.2 Pig Latin
+The goal of Pig Latin {% cite olston2008pig --file big-data%} is to attract experienced programmers to perform ad-hoc analysis on big data. Parallel database products provide a simple SQL query interface, which is good for non-programmers and simple tasks, but not in a style where experienced programmers would approach. Instead such programmers prefer to specify single steps and operate as a sequence.
+
+For example, suppose we have a table urls: `(url, category, pagerank)`. The following is a simple SQL query that finds, for each suciently large category, the average pagerank of high-pagerank urls in that category.
+
+```
+SELECT category, AVG(pagerank)
+FROM urls WHERE pagerank > 0.2
+GROUP BY category HAVING COUNT(*) > 106
+```
+
+And Pig Latin would address in following way:
+
+```
+good_urls = FILTER urls BY pagerank > 0.2;
+groups = GROUP good_urls BY category;
+big_groups = FILTER groups BY COUNT(good_urls)>106;
+output = FOREACH big_groups GENERATE
+ category, AVG(good_urls.pagerank);
+```
+
+*Interoperability* Pig Latin is designed to support ad-hoc data analysis, which means the input only requires a function to parse the content of files into tuples. This saves the time-consuming import step. While as for the output, Pig provides freedom to convert tuples into byte sequence where the format can be defined by users.
+
+*Nested Data Model* Pig Latin has a flexible, fully nested data model, and allows complex, non-atomic data types such as set, map, and tuple to occur as fields of a table. The benefits include: closer to how programmer think; data can be stored in the same nested fashion to save recombining time; can have algebraic language; allow rich user defined functions.
+
+*UDFs as First-Class Citizens* Pig Latin supports user-defined functions (UDFs) to support customized tasks for grouping, filtering, or per-tuple processing.
+
+*Debugging Environment* Pig Latin has a novel interactive debugging environment that can generate a concise example data table to illustrate output of each step.
+
+### 1.2.3 SparkSQL :
+
+The major contributions of Spark SQL {% cite armbrust2015spark --file big-data%} are the Dataframe API and the Catalyst. Spark SQL intends to provide relational processing over native RDDs and on several external data sources, through a programmer friendly API, high performance through DBMS techniques, support semi-structured data and external databases, support for advanced analytical processing like machine learning algorithms and graph processing.
+
+***Programming API***
+
+Spark SQL runs on the top of Spark providing SQL interfaces. A user can interact with this interface though JDBC/ODBC, command line or Dataframe API.
+A Dataframe API lets users to intermix both relational and procedural code with ease. Dataframe is a collection of schema based rows of data and named columns on which relational operations can be performed with optimized execution. Unlike a RDD, Dataframe allows developers to define structure for the data and can be related to tables in a relational database or R/Python’s Dataframe. Dataframe can be constructed from tables of external sources or existing native RDD’s. Dataframe is lazy and each object in it represents a logical plan which is not executed until an output operation like save or count is performed.
+Spark SQL supports all the major SQL data types including complex data types like arrays, maps and unions.
+Some of the Dataframe operations include projection (select), filter(where), join and aggregations(groupBy).
+Illustrated below is an example of relational operations on employees data frame to compute the number of female employees in each department.
+
+```
+employees.join(dept, employees("deptId") === dept("id")) .where(employees("gender") === "female") .groupBy(dept("id"), dept("name")) .agg(count("name"))
+```
+Several of these operators like === for equality test, > for greater than, a rithmetic ones (+, -, etc) and aggregators transforms to a abstract syntax tree of the expression which can be passed to Catalyst for optimization.
+A cache() operation on the data frame helps Spark SQL store the data in memory so it can be used in iterative algorithms and for interactive queries. In case of Spark SQL, memory footprint is considerably less as it applies columnar compression schemes like dictionary encoding / run-length encoding.
+
+The DataFrame API also supports inline UDF definitions without complicated packaging and registration. Because UDFs and queries are both expressed in the same general purpose language (Python or Scala), users can use standard debugging tools.
+
+However, a DataFrame lacks type safety. In the above example, attributes are referred to by string names. Hence, it is not possible for the compiler to catch any errors. If attribute names are incorrect then the error will only detected at runtime, when the query plan is created.
+Spark introduced a extension to Dataframe called ***Dataset*** to provide this compile type safety. It embraces object oriented style for programming and has an additional feature termed Encoders. Encoders translate between JVM representations (objects) and Spark’s internal binary format. Spark has built-in encoders which are very advanced in that they generate byte code to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object
+
+
+Winding up - we can compare SQL vs Dataframe vs Dataset as below :
+
+<figure class="main-container">
+ <img src="./sql-vs-dataframes-vs-datasets.png" alt="SQL vs Dataframe vs Dataset" />
+</figure>
+*Figure from the website :* https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html
-{% bibliography --file big-data %} \ No newline at end of file
+### 1.3 Large-scale Parallelism on Graphs
+Map Reduce doesn’t scale easily and is highly inefficient for iterative / graph algorithms like page rank and machine learning algorithms. Iterative algorithms requires programmer to explicitly handle the intermediate results (writing to disks). Hence, every iteration requires reading the input file and writing the results to the disk resulting in high disk I/O which is a performance bottleneck for any batch processing system.
+
+Also graph algorithms require exchange of messages between vertices. In case of PageRank, every vertex requires the contributions from all its adjacent nodes to calculate its score. Map reduce currently lacks this model of message passing which makes it complex to reason about graph algorithms. One model that is commonly employed for implementing distributed graph processing is the graph parallel model.
+
+In the graph-parallel abstraction, a user-defined vertex program is instantiated concurrently for each vertex and interacts with adjacent vertex programs through messages or shared state. Each vertex program can read and modify its vertex property and in some cases adjacent vertex properties. When all vertex programs vote to halt the program terminates. Most systems adopt the bulk synchronous parallel model {% cite bulk-synchronous-model --file big-data%}.
+
+This model was introduced in 1980 to represent the hardware design features of parallel computers. It gained popularity as an alternative for map reduce since it addressed the above mentioned issues with map reduce<br />
+BSP model is a message passing synchronous model where -
+
+ - Computation consists of several steps called as supersets.
+ - The processors involved have their own local memory and every processor is connected to other via a point-to-point communication.
+ - At every superstep, a processor receives input at the beginning, performs computation and outputs at the end.
+ - A processor at superstep S can send message to another processor at superstep S+1 and can as well receive message from superstep S-1.
+ - Barrier synchronization synchs all the processors at the end of every superstep.
+
+A notable feature of the model is the complete control on data through communication between every processor at every superstep. Though similar to map reduce model, BSP preserves data in memory across supersteps and helps in reasoning iterative graph algorithms.
+
+The graph-parallel abstractions allow users to succinctly describe graph algorithms, and provide a runtime engine to execute these algorithms in a distributed nature. They simplify the design, implementation, and application of sophisticated graph algorithms to large-scale real-world problems. Each of these frameworks presents a different view of graph computation, tailored to an originating domain or family of graph algorithms. However, these frameworks fail to address the problems of data preprocessing and construction, favor snapshot recovery over fault tolerance and lack support from distributed data flow frameworks. The data-parallel systems are well suited to the task of graph construction, and are highly scalable. However, suffer from the very problems mentioned before for which the graph-parallel systems came into existence.
+GraphX {%cite xin2013graphx --file big-data%} is a new computation system which builds upon the Spark’s Resilient Distributed Dataset (RDD) to form a new abstraction Resilient Distributed Graph (RDG) to represent records and their relations as vertices and edges respectively. RDG’s leverage the RDD’s fault tolerance mechanism and expressivity.
+
+How does GraphX improve over the existing graph-parallel and data flow models ?
+The RDGs in GraphX provides a set of elegant and expressive computational primitives through which many a graph parallel systems like Pregel, PowerGraph can be easily expressed with minimal lines of code. GraphX simplifies the process of graph ETL and analysis through new operations like filter, view and graph transformations. It minimizes communication and storage overhead.
+
+Similar to the data flow model, it GraphX away from the vertex centric view and adopts transformations on graphs yielding a new graph.
+
+***Why partitioning is important in graph computation systems ?***
+Graph-parallel computation requires every vertex or edge to be processed in the context of its neighborhood. Each transformation depends on the result of distributed joins between vertices and edges. This means that graph computation systems rely on graph partitioning (edge-cuts in most of the systems) and efficient storage to minimize communication and storage overhead and ensure balanced computation.
+
+<figure class="main-container">
+ <img src="./edge-cuts.png" alt="edge cuts" />
+</figure>
+*Figure from {%cite xin2013graphx --file big-data%}*
+
+***Why Edge-cuts are expensive ?***
+Edge-cuts for partitioning requires random assignment of vertices and edges across all the machines. hus the communication and storage overhead is proportional to the number of edges cut, and this makes balancing the number of cuts a priority. For most real-world graphs, constructing an optimal edge-cut is cost prohibitive, and most systems use random edge-cuts which achieve appropriate work balance, but nearly worst-case communication overhead.
+
+<figure class="main-container">
+ <img src="./vertex-cuts.png" alt="Vertex cuts" />
+</figure>
+*Figure from {%cite xin2013graphx --file big-data%}*
+
+***Vertex-cuts - GraphX’s solution to effective partitioning*** : An alternative approach which does the opposite of edge-cut — evenly assign edges to machines, but allow vertices to span multiple machines. The communication and storage overhead of a vertex-cut is directly proportional to the sum of the number of machines spanned by each vertex. Therefore, we can reduce communication overhead and ensure balanced computation by evenly assigning edges to machines in way that minimizes the number of machines spanned by each vertex.
+
+The GraphX RDG structure implements a vertex-cut representation of a graph using three unordered horizontally partitioned RDD tables. These three tables are as follows:
+
+- `EdgeTable(pid, src, dst, data)`: Stores adjacency structure and edge data.
+- `VertexDataTable(id, data)`: Stores vertex data. Contains states associated with vertices that are changing in the course of graph computation
+- `VertexMap(id, pid)`: Maps from vertex ids to the partitions that contain their adjacent edges. Remains static as long as the graph structure doesn’t change.
+
+A three-way relational join is used to bring together source vertex data, edge data, and target vertex data. The join is straightforward, and takes advantage of a partitioner to ensure the join site is local to the edge table. This means GraphX only has to shuffle vertex data.
+
+***Operators in GraphX***
+Other than standard data-parallel operators like filter, map, leftJoin, and reduceByKey, GraphX supports following graph-parallel operators:
+
+- graph - constructs property graph given a collection of edges and vertices.
+- vertices, edges - decompose the graph into a collection of vertices or edges by extracting vertex or edge RDDs.
+- mapV, mapE - transform the vertex or edge collection.
+- triplets -returns collection of form ((i, j), (PV(i), PE(i, j), PV(j))). The operator essentially requires a multiway join between vertex and edge RDD. This operation is optimized by shifting the site of joins to edges, using the routing table, so that only vertex data needs to be shuffled.
+- leftJoin - given a collection of vertices and a graph, returns a new graph which incorporates the property of matching vertices from the given collection into the given graph without changing the underlying graph structure.
+- subgraph - Applies predicates to return a subgraph of the original graph by filtering all the vertices and edges that don't satisfy the vertices and edges predicates respectively.
+- mrTriplets (MapReduce triplet) - logical composition of triplets followed by map and reduceByKey. It is the building block of graph-parallel algorithms.
+
+## 2 Execution Models
+There are many possible implementations for those programming models. In this section, we will discuss about a few different execution models, how the above programming interfaces exploit them, the benefits and limitations of each design and so on. MapReduce, its variants and Spark all use the master/workers model (section 2.1), where the master is responsible for managing data and dynamically scheduling tasks to workers. The master monitors workers' status, and when failure happens, master will reschedule the task to another idle worker. The fault-tolerance is guaranteed by persistence of data in MapReduce versus lineage(for recomputation) in Spark.
+
+
+
+### 2.1 Master/Worker model
+The original MapReduce model is implemented and deployed in Google infrastructure. As described in section 1.1.1, user program defines map and reduce functions and the underlying system manages data partition and schedules jobs across different nodes. Figure 2.1.1 shows the overall flow when the user program calls MapReduce function:
+1. Split data. The input files are split into *M* pieces;
+2. Copy processes. The user program create a master process and the workers. The master picks idle workers to do either map or reduce task;
+3. Map. The map worker reads corresponding splits and passes to the map function. The generated intermediate key/value pairs are buffered in memory;
+4. Partition. The buffered pairs are written to local disk and partitioned to *R* regions periodically. Then the locations are passed back to the master;
+5. Shuffle. The reduce worker reads from the local disks and groups together all occurrences of the same key together;
+6. Reduce. The reduce worker iterates over the grouped intermediate data and calls reduce function on each key and its set of values. The worker appends the output to a final output file;
+7. Wake up. When all tasks finish, the master wakes up the user program.
+
+<figure class="fullwidth">
+ <img src="{{ site.baseurl }}/resources/img/mapreduce-execution.png" alt="MapReduce Execution Overview" />
+</figure>
+<p>Figure 2.1.1 Execution overview<label for="sn-proprietary-monotype-bembo" class="margin-toggle sidenote-number"></label><input type="checkbox" id="sn-proprietary-monotype-bembo" class="margin-toggle"/><span class="sidenote">from original MapReduce paper {%cite dean2008mapreduce --file big-data%}</span></p>
+
+At step 4 and 5, the intermediate dataset is written to the disk by map worker and then read from the disk by reduce worker. Transferring big data chunks over network is expensive, so the data is stored on local disks of the cluster and the master tries to schedule the map task on the machine that contains the dataset or a nearby machine to minimize the network operation.
+
+There are some practices in this paper that make the model work very well in Google, one of them is **backup tasks**: when a MapReduce operation is close to completion, the master schedules backup executions of the remaining in-progress tasks ("straggler"). The task is marked as completed whenever either the primary or the backup execution completes.
+In the paper, the authors measure the performance of MapReduce on two computations running on a large cluster of machines. One computation *grep* through approximately 1TB of data. The other computation *sort* approximately 1TB of data. Both computations take in the order of a hundred seconds. In addition, the backup tasks do help largely reduce execution time. In the experiment where 200 out of 1746 tasks were intentionally killed, the scheduler was able to recover quickly and finish the whole computation for just a 5% increased time.
+Overall, the performance is very good for conceptually unrelated computations.
+
+
+### 2.2 Spark execution model
+
+<figure class="main-container">
+ <img src="./cluster-overview.png" alt="MapReduce Execution Overview" />
+</figure>
+
+The Spark driver defines SparkContext which is the entry point for any job that defines the environment/configuration and the dependencies of the submitted job. It connects to the cluster manager and requests resources for further execution of the jobs.
+The cluster manager manages and allocates the required system resources to the Spark jobs. Furthermore, it coordinates and keeps track of the live/dead nodes in a cluster. It enables the execution of jobs submitted by the driver on the worker nodes (also called Spark workers) and finally tracks and shows the status of various jobs running by the worker nodes.
+A Spark worker executes the business logic submitted by the Spark driver. Spark workers are abstracted and are allocated dynamically by the cluster manager to the Spark driver for the execution of submitted jobs. The driver will listen for and accept incoming connections from its executors throughout its lifetime.
+
+***Job scheduler optimization :*** Spark’s job scheduler tracks the persistent RDD’s saved in memory. When an action (count or collect) is performed on a RDD, the scheduler first analyzes the lineage graph to build a DAG of stages to execute. These stages only contain the transformations having narrow dependencies. Outside these stages are the wider dependencies for which the scheduler has to fetch the missing partitions from other workers in order to build the target RDD. The job scheduler is highly performant. It assigns tasks to machines based on data locality or to the preferred machines in the contained RDD. If a task fails, the scheduler re-runs it on another node and also recomputes the stage’s parent is missing.
+
+***How are persistent RDD’s memory managed ?***
+
+Persistent RDDs are stored in memory as java objects (for performance) or in memory as serialized data (for less memory usage at cost of performance) or on disk. If the worker runs out of memory upon creation of a new RDD, LRU policy is applied to evict the least recently accessed RDD unless its same as the new RDD. In that case, the old RDD is excluded from eviction given the fact that it may be reused again in future. Long lineage chains involving wide dependencies are checkpointed to reduce the time in recovering a RDD. However, since RDDs are read-only, checkpointing is still ok since consistency is not a concern and there is no overhead to manage the consistency as is seen in distributed shared memory.
+
+
+### 2.3 Hive execution model
+
+The Hive execution model composes of the below important components (and as shown in the below diagram):
+
+- Driver : Similar to the Drivers of Spark/Map reduce application, the driver in Hive handles query submission & its flow across the system. It also manages the session and its statistics.
+
+- Metastore – A Hive metastore stores all information about the tables, their partitions, schemas, columns and their types, etc. enabling transparency of data format and its storage to the users. It in turn helps in data exploration, query compilation and optimization. Criticality of the Matastore for managing the structure of hadoop files requires it to be updated on a regular basis.
+
+- Query Compiler – The Hive Query compiler is similar to any traditional database compilers. it processes the query in three steps :
+ - Parse : In this phase it uses Antlr (A parser generator tool) to generate the Abstract syntax tree (AST) of the query.
+ - Transformation of AST to DAG (Directed acyclic graph) : In this phase it generates logical plan and does a compile type checking. Logical plan is generated using the metadata (stored in Metastore) information of the required tables. It can flag errors if any issues found during the type checking.
+
+ - Optimization : Optimization forms the core of any declarative interface. In case of Hive, optimization happens through chains of transformation of DAG. A transformation could include even a user defined optimization and it applies an action on the DAG only if a rule is satisfied. Every node in the DAG implements a special interface called as Node interface which makes it easy for the manipulation of the operator DAG using other interfaces like GraphWalker, Dispatcher, Rule and Processor. Hence, by transformation, we mean walking through a DAG and for every Node we encounter we perform a Rule satisfiability check. If a Rule is satisfied, a corresponding processor is invoked. A Dispatcher maintains a list of Rule to Processor mappings.
+
+<figure class="main-container">
+ <img src="./Hive-transformation.png" alt="Hive transformation" />
+</figure>
+*Figure to depict the transformation flow during optimization, from:* %cite thusoo2010hive --file big-data %}
+
+ Some of the important transformations are :
+
+ - Column Pruning - Consider only the required columns needed in the query processing for projection.
+ - Predicate Pushdown - Filter the rows as early as possible by pushing down the predicates. Its important that unnecessary records are filtered first and transformations are applied on only the needed ones.
+ - Partition Pruning - Predicates on partitioned columns are used to prune out files of partitions that do not satisfy the predicate.
+ - Map Side Joins - Smaller tables in the join operation can be replicated in all the mappers and the reducers.
+ - Join Reordering - Reduce reducer side join operation memory by keeping only smaller tables in memory. Larger tables need not be kept in memory.
+ - Repartitioning data to handle skew in GROUP BY processing can be achieved by performing GROUP BY in two MapReduce stages. In first stage data is distributed randomly to the reducers and partial aggregation is performed. In the second stage, these partial aggregations are distributed on GROUP BY columns to different reducers.
+ - Similar to combiners in Map reduce, hash based partial aggregations in the mappers can be performed reduce the data that is sent by the mappers to the reducers. This helps in reducing the amount of time spent in sorting and merging the resulting data.
+
+
+Execution Engine : Execution Engine finally executes the tasks in order of their dependencies. A MapReduce task first serializes its part of the plan into a plan.xml file. This file is then added to the job cache and mappers and reducers are spawned to execute relevant sections of the operator DAG. The final results are stored to a temporary location and then moved to the final destination (in the case of say INSERT INTO query).
+
+
+<figure class="main-container">
+ <img src="./Hive-architecture.png" alt="Hive architecture" />
+</figure>
+*Hive architecture diagram*
+
+Summarizing the flow - the query is first submitted via CLI/web UI/any other interface. The query undergoes all the compiler phases as explained above to form an optimized DAG of MapReduce and hdfs tasks which the execution engine executes in its correct order using Hadoop.
+
+### 2.4 SparkSQL execution model
+
+SparkSQL execution model leverages Catalyst framework for optimizing the SQL before submitting it to the Spark Core engine for scheduling the job.
+A Catalyst is a query optimizer. Query optimizers for map reduce frameworks can greatly improve performance of the queries developers write and also significantly reduce the development time. A good query optimizer should be able to optimize user queries, extensible for user to provide information about the data and even dynamically include developer defined specific rules.
+
+Catalyst leverages the Scala’s functional language features like pattern matching and runtime meta programming to allow developers to concisely specify complex relational optimizations.
+
+Catalyst includes both rule-based and cost-based optimization. It is extensible to include new optimization techniques and features to Spark SQL and also let developers provide data source specific rules.
+Catalyst executes the rules on its data type Tree - a composition of node objects where each node has a node type (subclasses of TreeNode class in Scala) and zero or more children. Node objects are immutable and can be manipulated. The transform method of a Tree applies pattern matching to match a subset of all possible input trees on which the optimization rules needs to be applied.
+
+Hence, in Spark SQL, transformation of user queries happens in four phases :
+
+<figure class="main-container">
+ <img src="./sparksql-data-flow.jpg" alt="SparkSQL optimization plan Overview" />
+</figure>
+
+***Analyzing a logical plan to resolve references :*** In the analysis phase a relation either from the abstract syntax tree (AST) returned by the SQL parser or from a DataFrame is analyzed to create a logical plan out of it, which is still unresolved (the columns referred may not exist or may be of wrong datatype). The logical plan is resolved using using the Catalyst’s Catalog object(tracks the table from all data sources) by mapping the named attributes to the input provided, looking up the relations by name from catalog, by propagating and coercing types through expressions.
+
+***Logical plan optimization :*** In this phase, several of the rules like constant folding, predicate push down, projection pruning, null propagation, boolean expression simplification are applied on the logical plan.
+
+***Physical planning :*** In this phase, Spark generates multiples physical plans out of the input logical plan and chooses the plan based on a cost model. The physical planner also performs rule-based physical optimizations, such as pipelining projections or filters into one Spark map operation. In addition, it can push operations from the logical plan into data sources that support predicate or projection pushdown.
+
+
+***Code Generation :*** The final phase generates the Java byte code that should run on each machine.Catalyst transforms the Tree which is an expression in SQL to an AST for Scala code to evaluate, compile and run the generated code. A special scala feature namely quasiquotes aid in the construction of abstract syntax tree(AST).
+
+
+## 3. Big Data Ecosystem
+*Hadoop Ecosystem*
+
+Apache Hadoop is an open-sourced framework that supports distributed processing of large dataset. It involves a long list of projects that you can find in this table https://hadoopecosystemtable.github.io/. In this section, it is also important to understand the key players in the system, namely two parts: the Hadoop Distributed File System (HDFS) and the open-sourced implementation of MapReduce model - Hadoop.
+
+<figure class="main-container">
+ <img src="./hadoop-ecosystem.jpg" alt="Hadoop Ecosystem" />
+</figure>
+*Figure is from http://thebigdatablog.weebly.com/blog/the-hadoop-ecosystem-overview*
+
+
+HDFS forms the data management layer, which is a distributed file system designed to provide reliable, scalable storage across large clusters of unreliable commodity machines. The idea was inspired by GFS{%cite ghemawat2003google --file big-data%}. Unlike closed GFS, HDFS is open-sourced and provides various libraries and interfaces to support different file systems, like S3, KFS etc.
+
+To satisfy different needs, big companies like Facebook and Yahoo developed additional tools. Facebook's Hive, as a warehouse system, can provide more declarative programming interface and translate to Hadoop jobs. Yahoo's Pig platform is an ad-hoc analysis tool that can structurize HDFS objects and support operations like grouping, joining and filtering.
+
+
+***Spark Ecosystem***
+
+Apache Spark's rich-ecosystem constitutes of third party libraries like Mesos{%cite hindman2011mesos --file big-data%}/Yarn{%cite vavilapalli2013apache --file big-data%} and several major components that have been already discussed in this article like Spark-core, SparkSQL, GraphX.
+In this section we will discuss the remaining yet very important components/libraries which help Spark deliver high performance.
+
+<figure class="main-container">
+ <img src="./spark-ecosystem.png" alt="Spark ecosystem" />
+</figure>
+
+*Spark Streaming - A Spark component for streaming workloads*
+
+Spark achieves fault tolerant, high throughput data streaming workloads in real-time through a light weight Spark Streaming API. Spark streaming is based on Discretized Streams model{% cite d-streams --file big-data%}. Spark Streaming processes streaming workloads as a series of small batch workloads by leveraging the fast scheduling capacity of Apache Spark Core and fault tolerance capabilities of a RDD. A RDD in here represents each batch of streaming data and transformations are applied on the same. Data source in Spark Streaming could be from many a live streams like Twitter, Apache Kafka, Akka Actors, IoT Sensors, Amazon Kinesis, Apache Flume, etc. Spark streaming also enables unification of batch and streaming workloads and hence developers can use the same code for both batch and streaming workloads. It supports integration of streaming data with historical data.
+
+
+*Apache Mesos*
+
+Apache Mesos{%cite hindman2011mesos --file big-data%} is an open source heterogenous cluster/resource manager developed at the University of California, Berkley and used by companies such as Twitter, Airbnb, Netflix etc. for handling workloads in a distributed environment through dynamic resource sharing and isolation. It aids in the deployment and management of applications in large-scale clustered environments. Mesos abstracts node allocation by combining the existing resources of the machines/nodes in a cluster into a single pool and enabling fault-tolerant elastic distributed systems. Variety of workloads can utilize the nodes from this single pool voiding the need of allocating specific machines for different workloads. Mesos is highly scalable, achieves fault tolerance through Apache Zookeeper {%cite hunt2010zookeeper --file big-data%} and is a efficient CPU and memory-aware resource scheduler.
+
+
+*Alluxio/Tachyon*
+
+Alluxio/Tachyon{% cite li2014tachyon --file big-data%} is an open source memory-centric distributed storage system that provides high throughput writes and reads enabling reliable data sharing at memory-speed across cluster jobs. Tachyon can integrate with different computation frameworks, such as Apache Spark and Apache MapReduce. In the big data ecosystem, Tachyon fits between computation frameworks or jobs like spark or mapreducce and various kinds of storage systems, such as Amazon S3, OpenStack Swift, GlusterFS, HDFS, or Ceph. It caches the frequently read datasets in memory, thereby avoiding going to disk to load every dataset. In Spark RDDs can automatically be stored inside Tachyon to make Spark more resilient and avoid GC overheads.
+
+
+
+
+
+## References
+{% bibliography --file big-data %}
diff --git a/chapter/8/cluster-overview.png b/chapter/8/cluster-overview.png
new file mode 100644
index 0000000..b1b7c1a
--- /dev/null
+++ b/chapter/8/cluster-overview.png
Binary files differ
diff --git a/chapter/8/ecosystem.png b/chapter/8/ecosystem.png
new file mode 100644
index 0000000..c632ec2
--- /dev/null
+++ b/chapter/8/ecosystem.png
Binary files differ
diff --git a/chapter/8/edge-cuts.png b/chapter/8/edge-cuts.png
new file mode 100644
index 0000000..e9475a8
--- /dev/null
+++ b/chapter/8/edge-cuts.png
Binary files differ
diff --git a/chapter/8/hadoop-ecosystem.jpg b/chapter/8/hadoop-ecosystem.jpg
new file mode 100644
index 0000000..2ba7aa9
--- /dev/null
+++ b/chapter/8/hadoop-ecosystem.jpg
Binary files differ
diff --git a/chapter/8/spark-ecosystem.png b/chapter/8/spark-ecosystem.png
new file mode 100644
index 0000000..d3569fc
--- /dev/null
+++ b/chapter/8/spark-ecosystem.png
Binary files differ
diff --git a/chapter/8/spark_pipeline.png b/chapter/8/spark_pipeline.png
new file mode 100644
index 0000000..ac8c383
--- /dev/null
+++ b/chapter/8/spark_pipeline.png
Binary files differ
diff --git a/chapter/8/sparksql-data-flow.jpg b/chapter/8/sparksql-data-flow.jpg
new file mode 100644
index 0000000..1cf98f5
--- /dev/null
+++ b/chapter/8/sparksql-data-flow.jpg
Binary files differ
diff --git a/chapter/8/sql-vs-dataframes-vs-datasets.png b/chapter/8/sql-vs-dataframes-vs-datasets.png
new file mode 100644
index 0000000..600c68b
--- /dev/null
+++ b/chapter/8/sql-vs-dataframes-vs-datasets.png
Binary files differ
diff --git a/chapter/8/trash.md b/chapter/8/trash.md
new file mode 100644
index 0000000..c9b90fe
--- /dev/null
+++ b/chapter/8/trash.md
@@ -0,0 +1,53 @@
+## Trash
+
+
+## Performance
+`TODO: re-organize` There are some practices in this paper that make the model work very well in Google, one of them is **backup tasks**: when a MapReduce operation is close to completion, the master schedules backup executions of the remaining in-progress tasks ("straggler"). The task is marked as completed whenever either the primary or the backup execution completes.
+In the paper, the authors measure the performance of MapReduce on two computations running on a large cluster of machines. One computation *grep* through approximately 1TB of data. The other computation *sort* approximately 1TB of data. Both computations take in the order of a hundred seconds. In addition, the backup tasks do help largely reduce execution time. In the experiment where 200 out of 1746 tasks were intentionally killed, the scheduler was able to recover quickly and finish the whole computation for just a 5% increased time.
+Overall, the performance is very good for conceptually unrelated computations.
+
+
+
+## Outline
+- 1. Programming Models
+ - 1.1. Data parallelism: what is data parallelism and how do the following models relate to each other?
+ - 1.1.1 MapReduce
+ - 1.1.2 FlumeJava
+ - 1.1.3 Dryad
+ - 1.1.4 Spark
+
+ - 1.2. Querying: we need more declarative interfaces, built on top MR models.
+ - Sawzall {%cite pike2005interpreting --file big-data %}: first one propose
+ - Pig {% cite olston2008pig --file big-data %}: on top of Hadoop, independent of execution platform, in theory can compiled into DryadLINQ too; what is the performance gain/lost? Easier to debug?
+ - Hive {%cite thusoo2009hive --file big-data %}
+ - DryadLINQ: SQL-like, uses Dryad as execution engine;
+ `Suggestion: Merge this with Dryad above?`
+ - Dremel, query natively w/o translating into MR jobs
+ - Spark SQL {%cite --file big-data %} - Limitations of Relational alone models? how SparkSQL model overcomes it? goals of SparkSQL? how it leverages the Spark programming model? what is a DataFrame and how is it different from a RDD? what are the operations a DataFrame provides? how is in-memory caching different from Spark?
+
+ - 1.3. Large-scale Parallelism on Graphs
+ - Why a separate graph processing model? what is a BSP? working of BSP? Do not stress more since its not a map reduce world exactly.
+ - GraphX programming model - discuss disadvantages graph-parallel model to data parallel model for large scale graph processing? how graphX combines the advantages of both the models? representation of a graph in GraphX? discuss the model, vertex cut partitioning and its importance? graph operations ?
+
+
+- 2. Execution Models
+ - 2.1 MapReduce (intermediate writes to disk): What is the sequence of actions when a MapReduce functions are called? How is write-to-disk good/bad (fault-tolerant/slow)? How does the data are transmitted across clusters efficiently (store locally)? To shorten the total time for MR operations, it uses backup tasks. When MR jobs are pipelined, what optimizations can be performed by FlumeJava? In spite of optimizations and pipelining, what is the inherent limitation (not support iterative algorithm?)
+ - 2.2 Spark (all in memory): introduce spark architecture, different layers, what happens when a spark job is executed? what is the role of a driver/master/worker, how does a scheduler schedule the tasks and what performance measures are considered while scheduling? how does a scheduler manage node failures and missing partitions? how are the user defined transformations passed to the workers? how are the RDDs stored and memory management measures on workers? do we need checkpointing at all given RDDs leverage lineage for recovery? if so why ?
+ - 2.3 Graphs :
+ - Pregel :Overview of Pregel. Its implementation and working. its limitations. Do not stress more since we have a better model GraphX to explain a lot.
+ - GraphX : Working on this.
+ - SparkSQL Catalyst & Spark execution model : Discuss Parser, LogicalPlan, Optimizer, PhysicalPlan, Execution Plan. Why catalyst? how catalyst helps in SparkSQL , data flow from sql-core-> catalyst->spark-core
+
+- 3. Evaluation: Given same algorithm, what is the performance differences between Hadoop, Spark, Dryad? There are no direct comparison for all those models, so we may want to compare separately:
+ - Hadoop vs. Spark
+ - Spark vs. SparkSQL from SparkSQL paper
+
+- 4. Big Data Ecosystem
+ Everything interoperates with GFS or HDFS, or makes use of stuff like protocol buffers so systems like Pregel and MapReduce and even MillWheel...
+ - GFS/HDFS for MapReduce/Hadoop: Machines are unreliable, how do they provide fault-tolerance? How does GFS deal with single point of failure (shadow masters)? How does the master manage partition, transmission of data chunks? Which
+ - Resource Management: Mesos. New frameworks keep emerging and users have to use multiple different frameworks(MR, Spark etc.) in the same clusters, so how should they share access to the large datasets instead of costly replicate across clusters?
+ - Introducing streaming: what happens when data cannot be complete? How does different programming model adapt? windowing `todo: more`
+
+ 2015 NSDI Ousterhout
+
+ latency numbers that every programmer should know
diff --git a/chapter/8/vertex-cuts.png b/chapter/8/vertex-cuts.png
new file mode 100644
index 0000000..b256630
--- /dev/null
+++ b/chapter/8/vertex-cuts.png
Binary files differ
diff --git a/resources/img/data-parallelism.png b/resources/img/data-parallelism.png
new file mode 100644
index 0000000..eea5bf8
--- /dev/null
+++ b/resources/img/data-parallelism.png
Binary files differ
diff --git a/resources/img/mapreduce-execution.png b/resources/img/mapreduce-execution.png
new file mode 100644
index 0000000..090878d
--- /dev/null
+++ b/resources/img/mapreduce-execution.png
Binary files differ