diff options
42 files changed, 2669 insertions, 95 deletions
@@ -1,3 +1,5 @@ _site .sass-cache .jekyll-metadata + +.DS_Store diff --git a/_bibliography/big-data.bib b/_bibliography/big-data.bib index 416b697..599b3c9 100644 --- a/_bibliography/big-data.bib +++ b/_bibliography/big-data.bib @@ -1,26 +1,208 @@ -@inproceedings{Uniqueness, - author = {Philipp Haller and - Martin Odersky}, - title = {Capabilities for Uniqueness and Borrowing}, - booktitle = {ECOOP 2010, Maribor, Slovenia, June 21-25, 2010.}, - pages = {354--378}, - year = {2010}, -} - -@inproceedings{Elsman2005, - author = {Martin Elsman}, - title = {Type-specialized serialization with sharing}, - booktitle = {Trends in Functional Programming}, - year = {2005}, - pages = {47-62}, -} - -@article{Kennedy2004, - author = {Andrew Kennedy}, - title = {Pickler combinators}, - journal = {J. Funct. Program.}, - volume = {14}, - number = {6}, - year = {2004}, - pages = {727-739}, -}
\ No newline at end of file +@inproceedings{armbrust2015spark, + title={Spark sql: Relational data processing in spark}, + author={Armbrust, Michael and Xin, Reynold S and Lian, Cheng and Huai, Yin and Liu, Davies and Bradley, Joseph K and Meng, Xiangrui and Kaftan, Tomer and Franklin, Michael J and Ghodsi, Ali and others}, + booktitle={Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data}, + pages={1383--1394}, + year={2015}, + organization={ACM} +} + +@article{bu2010haloop, + title={HaLoop: efficient iterative data processing on large clusters}, + author={Bu, Yingyi and Howe, Bill and Balazinska, Magdalena and Ernst, Michael D}, + journal={Proceedings of the VLDB Endowment}, + volume={3}, + number={1-2}, + pages={285--296}, + year={2010}, + publisher={VLDB Endowment} +} + +@inproceedings{chambers2010flumejava, + title={FlumeJava: easy, efficient data-parallel pipelines}, + author={Chambers, Craig and Raniwala, Ashish and Perry, Frances and Adams, Stephen and Henry, Robert R and Bradshaw, Robert and Weizenbaum, Nathan}, + booktitle={ACM Sigplan Notices}, + volume={45}, + number={6}, + pages={363--375}, + year={2010}, + organization={ACM} +} + + +@article{ching2015one, + title={One trillion edges: graph processing at Facebook-scale}, + author={Ching, Avery and Edunov, Sergey and Kabiljo, Maja and Logothetis, Dionysios and Muthukrishnan, Sambavi}, + journal={Proceedings of the VLDB Endowment}, + volume={8}, + number={12}, + pages={1804--1815}, + year={2015}, + publisher={VLDB Endowment} +} + +@article{dean2008mapreduce, + title={MapReduce: simplified data processing on large clusters}, + author={Dean, Jeffrey and Ghemawat, Sanjay}, + journal={Communications of the ACM}, + volume={51}, + number={1}, + pages={107--113}, + year={2008}, + publisher={ACM} +} + + +@inproceedings{ekanayake2010twister, + title={Twister: a runtime for iterative mapreduce}, + author={Ekanayake, Jaliya and Li, Hui and Zhang, Bingjing and Gunarathne, Thilina and Bae, Seung-Hee and Qiu, Judy and Fox, Geoffrey}, + booktitle={Proceedings of the 19th ACM International Symposium on High Performance Distributed Computing}, + pages={810--818}, + year={2010}, + organization={ACM} +} + +@inproceedings{ghemawat2003google, + title={The Google file system}, + author={Ghemawat, Sanjay and Gobioff, Howard and Leung, Shun-Tak}, + booktitle={ACM SIGOPS operating systems review}, + volume={37}, + number={5}, + pages={29--43}, + year={2003}, + organization={ACM} +} + +@inproceedings{hindman2011mesos, + title={Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center.}, + author={Hindman, Benjamin and Konwinski, Andy and Zaharia, Matei and Ghodsi, Ali and Joseph, Anthony D and Katz, Randy H and Shenker, Scott and Stoica, Ion}, + booktitle={NSDI}, + volume={11}, + pages={22--22}, + year={2011} +} + +@inproceedings{isard2007dryad, + title={Dryad: distributed data-parallel programs from sequential building blocks}, + author={Isard, Michael and Budiu, Mihai and Yu, Yuan and Birrell, Andrew and Fetterly, Dennis}, + booktitle={ACM SIGOPS Operating Systems Review}, + volume={41}, + number={3}, + pages={59--72}, + year={2007}, + organization={ACM} +} + + +@inproceedings{malewicz2010pregel, + title={Pregel: a system for large-scale graph processing}, + author={Malewicz, Grzegorz and Austern, Matthew H and Bik, Aart JC and Dehnert, James C and Horn, Ilan and Leiser, Naty and Czajkowski, Grzegorz}, + booktitle={Proceedings of the 2010 ACM SIGMOD International Conference on Management of data}, + pages={135--146}, + year={2010}, + organization={ACM} +} + + +@inproceedings{olston2008pig, + title={Pig latin: a not-so-foreign language for data processing}, + author={Olston, Christopher and Reed, Benjamin and Srivastava, Utkarsh and Kumar, Ravi and Tomkins, Andrew}, + booktitle={Proceedings of the 2008 ACM SIGMOD international conference on Management of data}, + pages={1099--1110}, + year={2008}, + organization={ACM} +} + +@article{pike2005interpreting, + title={Interpreting the data: Parallel analysis with Sawzall}, + author={Pike, Rob and Dorward, Sean and Griesemer, Robert and Quinlan, Sean}, + journal={Scientific Programming}, + volume={13}, + number={4}, + pages={277--298}, + year={2005}, + publisher={Hindawi Publishing Corporation} +} + +@inproceedings{shvachko2010hadoop, + title={The hadoop distributed file system}, + author={Shvachko, Konstantin and Kuang, Hairong and Radia, Sanjay and Chansler, Robert}, + booktitle={2010 IEEE 26th symposium on mass storage systems and technologies (MSST)}, + pages={1--10}, + year={2010}, + organization={IEEE} +} + +@online{WinNT, + author = {Tarau, Paul}, + title = {Bulk synchronous model}, + year = 2014, + url = {http://www.cse.unt.edu/~tarau/teaching/parpro/papers/Bulk%20synchronous%20parallel.pdf}, + urldate = {2016-11-24} +} + +@article{thusoo2009hive, + title={Hive: a warehousing solution over a map-reduce framework}, + author={Thusoo, Ashish and Sarma, Joydeep Sen and Jain, Namit and Shao, Zheng and Chakka, Prasad and Anthony, Suresh and Liu, Hao and Wyckoff, Pete and Murthy, Raghotham}, + journal={Proceedings of the VLDB Endowment}, + volume={2}, + number={2}, + pages={1626--1629}, + year={2009}, + publisher={VLDB Endowment} +} + +@inproceedings{thusoo2010hive, + title={Hive-a petabyte scale data warehouse using hadoop}, + author={Thusoo, Ashish and Sarma, Joydeep Sen and Jain, Namit and Shao, Zheng and Chakka, Prasad and Zhang, Ning and Antony, Suresh and Liu, Hao and Murthy, Raghotham}, + booktitle={2010 IEEE 26th International Conference on Data Engineering (ICDE 2010)}, + pages={996--1005}, + year={2010}, + organization={IEEE} +} +@inproceedings{vavilapalli2013apache, + title={Apache hadoop yarn: Yet another resource negotiator}, + author={Vavilapalli, Vinod Kumar and Murthy, Arun C and Douglas, Chris and Agarwal, Sharad and Konar, Mahadev and Evans, Robert and Graves, Thomas and Lowe, Jason and Shah, Hitesh and Seth, Siddharth and others}, + booktitle={Proceedings of the 4th annual Symposium on Cloud Computing}, + pages={5}, + year={2013}, + organization={ACM} +} +@inproceedings{xin2013graphx, + title={Graphx: A resilient distributed graph system on spark}, + author={Xin, Reynold S and Gonzalez, Joseph E and Franklin, Michael J and Stoica, Ion}, + booktitle={First International Workshop on Graph Data Management Experiences and Systems}, + pages={2}, + year={2013}, + organization={ACM} +} + +@inproceedings{yu2008dryadlinq, + title={DryadLINQ: A System for General-Purpose Distributed Data-Parallel Computing Using a High-Level Language.}, + author={Yu, Yuan and Isard, Michael and Fetterly, Dennis and Budiu, Mihai and Erlingsson, {\'U}lfar and Gunda, Pradeep Kumar and Currey, Jon}, + booktitle={OSDI}, + volume={8}, + pages={1--14}, + year={2008} +} + +@article{zaharia2010spark, + title={Spark: cluster computing with working sets.}, + author={Zaharia, Matei and Chowdhury, Mosharaf and Franklin, Michael J and Shenker, Scott and Stoica, Ion}, + journal={HotCloud}, + volume={10}, + pages={10--10}, + year={2010} +} + + +@article{zhang2012imapreduce, + title={imapreduce: A distributed computing framework for iterative computation}, + author={Zhang, Yanfeng and Gao, Qixin and Gao, Lixin and Wang, Cuirong}, + journal={Journal of Grid Computing}, + volume={10}, + number={1}, + pages={47--68}, + year={2012}, + publisher={Springer} +} diff --git a/_bibliography/dist-langs.bib b/_bibliography/dist-langs.bib index 416b697..657ca63 100644 --- a/_bibliography/dist-langs.bib +++ b/_bibliography/dist-langs.bib @@ -23,4 +23,24 @@ number = {6}, year = {2004}, pages = {727-739}, -}
\ No newline at end of file +} + +@article{Mernik2005, + author = {Marjan Mernik}, + title = {When and how to develop domain-specific languages}, + journal = {ACM computing surveys}, + volume = {37}, + number = {4}, + year = {2005}, + pages = {316-344}, +} + +@article{Armstrong2010, + author = {Joe Armstrong}, + title = {Erlang}, + journal = {Communications of the ACM}, + volume = {53}, + number = {9}, + year = {2010}, + pages = {68-75}, +} diff --git a/_bibliography/message-passing.bib b/_bibliography/message-passing.bib index 416b697..acb92a4 100644 --- a/_bibliography/message-passing.bib +++ b/_bibliography/message-passing.bib @@ -1,26 +1,253 @@ -@inproceedings{Uniqueness, - author = {Philipp Haller and - Martin Odersky}, - title = {Capabilities for Uniqueness and Borrowing}, - booktitle = {ECOOP 2010, Maribor, Slovenia, June 21-25, 2010.}, - pages = {354--378}, - year = {2010}, -} - -@inproceedings{Elsman2005, - author = {Martin Elsman}, - title = {Type-specialized serialization with sharing}, - booktitle = {Trends in Functional Programming}, - year = {2005}, - pages = {47-62}, -} - -@article{Kennedy2004, - author = {Andrew Kennedy}, - title = {Pickler combinators}, - journal = {J. Funct. Program.}, - volume = {14}, - number = {6}, - year = {2004}, - pages = {727-739}, -}
\ No newline at end of file +@inproceedings{DeKoster:2016:YAT:3001886.3001890, + author = {De Koster, Joeri and Van Cutsem, Tom and De Meuter, Wolfgang}, + title = {43 Years of Actors: A Taxonomy of Actor Models and Their Key Properties}, + booktitle = {Proceedings of the 6th International Workshop on Programming Based on Actors, Agents, and Decentralized Control}, + series = {AGERE 2016}, + year = {2016}, + isbn = {978-1-4503-4639-9}, + location = {Amsterdam, Netherlands}, + pages = {31--40}, + numpages = {10}, + url = {http://doi.acm.org/10.1145/3001886.3001890}, + doi = {10.1145/3001886.3001890}, + acmid = {3001890}, + publisher = {ACM}, + address = {New York, NY, USA}, + keywords = {Actor Model, Concurrency}, +} + +@article{Yonezawa:1986:OCP:960112.28722, + author = {Yonezawa, Akinori and Briot, Jean-Pierre and Shibayama, Etsuya}, + title = {Object-oriented Concurrent Programming in ABCL/1}, + journal = {SIGPLAN Not.}, + issue_date = {Nov. 1986}, + volume = {21}, + number = {11}, + month = jun, + year = {1986}, + issn = {0362-1340}, + pages = {258--268}, + numpages = {11}, + url = {http://doi.acm.org/10.1145/960112.28722}, + doi = {10.1145/960112.28722}, + acmid = {28722}, + publisher = {ACM}, + address = {New York, NY, USA}, +} + +@inproceedings{Dedecker:2006:APA:2171327.2171349, + author = {Dedecker, Jessie and Van Cutsem, Tom and Mostinckx, Stijn and D\&\#39;Hondt, Theo and De Meuter, Wolfgang}, + title = {Ambient-Oriented Programming in Ambienttalk}, + booktitle = {Proceedings of the 20th European Conference on Object-Oriented Programming}, + series = {ECOOP'06}, + year = {2006}, + isbn = {3-540-35726-2, 978-3-540-35726-1}, + location = {Nantes, France}, + pages = {230--254}, + numpages = {25}, + url = {http://dx.doi.org/10.1007/11785477_16}, + doi = {10.1007/11785477_16}, + acmid = {2171349}, + publisher = {Springer-Verlag}, + address = {Berlin, Heidelberg}, +} + +@inproceedings{Cutsem:2007:AOE:1338443.1338745, + author = {Cutsem, Tom Van and Mostinckx, Stijn and Boix, Elisa Gonzalez and Dedecker, Jessie and Meuter, Wolfgang De}, + title = {AmbientTalk: Object-oriented Event-driven Programming in Mobile Ad Hoc Networks}, + booktitle = {Proceedings of the XXVI International Conference of the Chilean Society of Computer Science}, + series = {SCCC '07}, + year = {2007}, + isbn = {0-7695-3017-6}, + pages = {3--12}, + numpages = {10}, + url = {http://dx.doi.org/10.1109/SCCC.2007.4}, + doi = {10.1109/SCCC.2007.4}, + acmid = {1338745}, + publisher = {IEEE Computer Society}, + address = {Washington, DC, USA}, +} + +@book{ReactiveSystems, + author = {Hugh McKee}, + title = {Designing Reactive Systems: The Role of Actors in Distributed Architecture}, + year = {2016}, +} + +@inproceedings{Miller:2005:CSP:1986262.1986274, + author = {Miller, Mark S. and Tribble, E. Dean and Shapiro, Jonathan}, + title = {Concurrency Among Strangers: Programming in E As Plan Coordination}, + booktitle = {Proceedings of the 1st International Conference on Trustworthy Global Computing}, + series = {TGC'05}, + year = {2005}, + isbn = {3-540-30007-4, 978-3-540-30007-6}, + location = {Edinburgh, UK}, + pages = {195--229}, + numpages = {35}, + url = {http://dl.acm.org/citation.cfm?id=1986262.1986274}, + acmid = {1986274}, + publisher = {Springer-Verlag}, + address = {Berlin, Heidelberg}, +} + +@article{Agha:1990:COP:83880.84528, + author = {Agha, Gul}, + title = {Concurrent Object-oriented Programming}, + journal = {Commun. ACM}, + issue_date = {Sept. 1990}, + volume = {33}, + number = {9}, + month = sep, + year = {1990}, + issn = {0001-0782}, + pages = {125--141}, + numpages = {17}, + url = {http://doi.acm.org/10.1145/83880.84528}, + doi = {10.1145/83880.84528}, + acmid = {84528}, + publisher = {ACM}, + address = {New York, NY, USA}, +} + +@article{Armstrong:2010:ERL:1810891.1810910, + author = {Armstrong, Joe}, + title = {Erlang}, + journal = {Commun. ACM}, + issue_date = {September 2010}, + volume = {53}, + number = {9}, + month = sep, + year = {2010}, + issn = {0001-0782}, + pages = {68--75}, + numpages = {8}, + url = {http://doi.acm.org/10.1145/1810891.1810910}, + doi = {10.1145/1810891.1810910}, + acmid = {1810910}, + publisher = {ACM}, + address = {New York, NY, USA}, +} + +@inproceedings{Haller:2012:IAM:2414639.2414641, + author = {Haller, Philipp}, + title = {On the Integration of the Actor Model in Mainstream Technologies: The Scala Perspective}, + booktitle = {Proceedings of the 2Nd Edition on Programming Systems, Languages and Applications Based on Actors, Agents, and Decentralized Control Abstractions}, + series = {AGERE! 2012}, + year = {2012}, + isbn = {978-1-4503-1630-9}, + location = {Tucson, Arizona, USA}, + pages = {1--6}, + numpages = {6}, + url = {http://doi.acm.org/10.1145/2414639.2414641}, + doi = {10.1145/2414639.2414641}, + acmid = {2414641}, + publisher = {ACM}, + address = {New York, NY, USA}, + keywords = {actors, concurrent programming, distributed programming, scala, threads}, +} + +@inproceedings{Hewitt:1973:UMA:1624775.1624804, + author = {Hewitt, Carl and Bishop, Peter and Steiger, Richard}, + title = {A Universal Modular ACTOR Formalism for Artificial Intelligence}, + booktitle = {Proceedings of the 3rd International Joint Conference on Artificial Intelligence}, + series = {IJCAI'73}, + year = {1973}, + location = {Stanford, USA}, + pages = {235--245}, + numpages = {11}, + url = {http://dl.acm.org/citation.cfm?id=1624775.1624804}, + acmid = {1624804}, + publisher = {Morgan Kaufmann Publishers Inc.}, + address = {San Francisco, CA, USA}, +} + +@article {vantcutsem14ambienttalk, + title = {AmbientTalk: programming responsive mobile peer-to-peer applications with actors}, + journal = {Computer Languages, Systems and Structures, SCI Impact factor in 2013: 0.296, 5 year impact factor 0.329 (to appear)}, + year = {2014}, + publisher = {Elsevier}, + issn = {1477-8424}, + author = {Tom Van Cutsem and Elisa Gonzalez Boix and Christophe Scholliers and Andoni Lombide Carreton and Dries Harnie and Kevin Pinte and Wolfgang De Meuter}, + editor = {Nick Benton} +} + +@inproceedings{Bykov:2011:OCC:2038916.2038932, + author = {Bykov, Sergey and Geller, Alan and Kliot, Gabriel and Larus, James R. and Pandya, Ravi and Thelin, Jorgen}, + title = {Orleans: Cloud Computing for Everyone}, + booktitle = {Proceedings of the 2Nd ACM Symposium on Cloud Computing}, + series = {SOCC '11}, + year = {2011}, + isbn = {978-1-4503-0976-9}, + location = {Cascais, Portugal}, + pages = {16:1--16:14}, + articleno = {16}, + numpages = {14}, + url = {http://doi.acm.org/10.1145/2038916.2038932}, + doi = {10.1145/2038916.2038932}, + acmid = {2038932}, + publisher = {ACM}, + address = {New York, NY, USA}, + keywords = {cloud computing, distributed actors, programming models}, +} + +@article{Tomlinson:1988:ROC:67387.67410, + author = {Tomlinson, C. and Kim, W. and Scheevel, M. and Singh, V. and Will, B. and Agha, G.}, + title = {Rosette: An Object-oriented Concurrent Systems Architecture}, + journal = {SIGPLAN Not.}, + issue_date = {April 1989}, + volume = {24}, + number = {4}, + month = sep, + year = {1988}, + issn = {0362-1340}, + pages = {91--93}, + numpages = {3}, + url = {http://doi.acm.org/10.1145/67387.67410}, + doi = {10.1145/67387.67410}, + acmid = {67410}, + publisher = {ACM}, + address = {New York, NY, USA}, +} + +@article{Haller:2009:SAU:1496391.1496422, + author = {Haller, Philipp and Odersky, Martin}, + title = {Scala Actors: Unifying Thread-based and Event-based Programming}, + journal = {Theor. Comput. Sci.}, + issue_date = {February, 2009}, + volume = {410}, + number = {2-3}, + month = feb, + year = {2009}, + issn = {0304-3975}, + pages = {202--220}, + numpages = {19}, + url = {http://dx.doi.org/10.1016/j.tcs.2008.09.019}, + doi = {10.1016/j.tcs.2008.09.019}, + acmid = {1496422}, + publisher = {Elsevier Science Publishers Ltd.}, + address = {Essex, UK}, + keywords = {Actors, Concurrent programming, Events, Threads}, +} + +@inproceedings{epstein2011, + acmid = {2034690}, + added-at = {2014-12-10T16:12:02.000+0100}, + address = {New York, NY, USA}, + author = {Epstein, Jeff and Black, Andrew P. and Peyton-Jones, Simon}, + biburl = {http://www.bibsonomy.org/bibtex/24882f140b6bbca2806c57a22c57b5c5d/chesteve}, + booktitle = {Proceedings of the 4th ACM symposium on Haskell}, + doi = {10.1145/2034675.2034690}, + interhash = {48386ecc60d5772410dec97e267a570b}, + intrahash = {4882f140b6bbca2806c57a22c57b5c5d}, + isbn = {978-1-4503-0860-1}, + keywords = {imported}, + location = {Tokyo, Japan}, + numpages = {12}, + pages = {118--129}, + publisher = {ACM}, + series = {Haskell '11}, + timestamp = {2014-12-10T16:12:02.000+0100}, + title = {Towards Haskell in the cloud}, + xxxurl = {http://doi.acm.org/10.1145/2034675.2034690}, + year = 2011 +} diff --git a/_bibliography/rpc.bib b/_bibliography/rpc.bib index 416b697..638cf1b 100644 --- a/_bibliography/rpc.bib +++ b/_bibliography/rpc.bib @@ -7,20 +7,464 @@ year = {2010}, } -@inproceedings{Elsman2005, - author = {Martin Elsman}, - title = {Type-specialized serialization with sharing}, - booktitle = {Trends in Functional Programming}, - year = {2005}, - pages = {47-62}, -} - -@article{Kennedy2004, - author = {Andrew Kennedy}, - title = {Pickler combinators}, - journal = {J. Funct. Program.}, - volume = {14}, - number = {6}, - year = {2004}, - pages = {727-739}, -}
\ No newline at end of file +@article{implementingrpc, + title={Implementing remote procedure calls}, + author={Birrell, Andrew D and Nelson, Bruce Jay}, + journal={ACM Transactions on Computer Systems (TOCS)}, + volume={2}, + number={1}, + pages={39--59}, + year={1984}, + publisher={ACM} +} + +@article{rmipaper, + title={A Distributed Object Model for the Java\^{} T\^{} M System}, + author={Wollrath, Ann and Riggs, Roger and Waldo, Jim}, + year={1996} +} + +@book{rmibook, + title={Java. rmi: The Remote Method Invocation Guide}, + author={Pitt, Esmond and McNiff, Kathy}, + year={2001}, + publisher={Addison-Wesley Longman Publishing Co., Inc.} +} + +@book{critiqueofrpc, + title={A critique of the remote procedure call paradigm}, + author={Tanenbaum, Andrew Stuart and van Renesse, Robbert}, + year={1987} +} + +@inproceedings{rpcoverrdma, + title={FaSST: Fast, Scalable and Simple Distributed Transactions with Two-Sided (RDMA) Datagram RPCs}, + author={Kalia, Anuj and Kaminsky, Michael and Andersen, David G}, + booktitle={12th USENIX Symposium on Operating Systems Design and Implementation (OSDI 16)}, + pages={185--201}, + organization={USENIX Association} +} + +@inproceedings{sunnfs, + title={Design and implementation of the Sun network filesystem}, + author={Sandberg, Russel and Goldberg, David and Kleiman, Steve and Walsh, Dan and Lyon, Bob}, + booktitle={Proceedings of the Summer USENIX conference}, + pages={119--130}, + year={1985} +} + +@misc{thrift, + title={Apache Thrift}, + author={Prunicki, Andrew}, + year={2009} +} + +@book{corba, + title={CORBA 3 fundamentals and programming}, + author={Siegel, Jon and Ph. D.}, + volume={2}, + year={2000}, + publisher={John Wiley \& Sons New York, NY, USA:} +} + +@misc{grpc, + title = {gRPC}, + author={Google}, + url = {http://www.grpc.io/}, + note = {Accessed: 2016-11-11}, +} + +@misc{soaparticle1, + title = {Exclusive .NET Developer's Journal "Indigo" Interview with Microsoft's Don Box}, + author={Derek Ferguson}, + url = {http://dotnet.sys-con.com/node/45908}, + note = {Accessed: 2016-11-11}, +} + +@misc{corbasite, + title = {CORBA-OMG}, + author={CORBA}, + url = {http://www.corba.org/}, + note = {Accessed: 2016-11-11}, +} + + +@inproceedings{finagle, + title={Your server as a function}, + author={Eriksen, Marius}, + booktitle={Proceedings of the Seventh Workshop on Programming Languages and Operating Systems}, + pages={5}, + year={2013}, + organization={ACM} +} + +@inproceedings{anycastrpc, + title={Anycast-RPC for Wireless Sensor Networks}, + author={Bergstrom, Eric and Pandey, Raju}, + booktitle={2007 IEEE International Conference on Mobile Adhoc and Sensor Systems}, + pages={1--8}, + year={2007}, + organization={IEEE} +} + +@article{rpcrfc, + title={RFC 1831 - RPC: Remote procedure call protocol specification version 2}, + author={Srinivasan, Raj}, + year={1995} +} + +@misc{microservices1rpc, + title={Delving Into the Microservices Architecture}, + author={Mueller, John}, + year={2015}, + url = {http://blog.smartbear.com/microservices/delving-into-the-microservices-architecture/}, +} + +@article{rpcorigin, + title={High-level framework for network-based resource sharing}, + author={White, James E}, + year={1975} +} + +@inproceedings{interweave1, + title={Integrating remote invocation and distributed shared state}, + author={Tang, Chunqiang and Chen, DeQing and Dwarkadas, Sandhya and Scott, Michael L}, + booktitle={Parallel and Distributed Processing Symposium, 2004. Proceedings. 18th International}, + pages={30}, + year={2004}, + organization={IEEE} +} + + +@inproceedings{interweave2, + title={Interweave: A middleware system for distributed shared state}, + author={Chen, DeQing and Dwarkadas, Sandhya and Parthasarathy, Srinivasan and Pinheiro, Eduardo and Scott, Michael L}, + booktitle={International Workshop on Languages, Compilers, and Run-Time Systems for Scalable Computers}, + pages={207--220}, + year={2000}, + organization={Springer} +} + +@inproceedings{interweave3, + title={Multi-level shared state for distributed systems}, + author={Chen, DeQing and Tang, Chunqiang and Chen, Xiangchuan and Dwarkadas, Sandhya and Scott, Michael L}, + booktitle={Parallel Processing, 2002. Proceedings. International Conference on}, + pages={131--140}, + year={2002}, + organization={IEEE} +} + +@article{offloading1, + title={A survey of computation offloading for mobile systems}, + author={Kumar, Karthik and Liu, Jibang and Lu, Yung-Hsiang and Bhargava, Bharat}, + journal={Mobile Networks and Applications}, + volume={18}, + number={1}, + pages={129--140}, + year={2013}, + publisher={Springer} +} + +@misc{ibis, + title={Ibis Communication middleware}, + author={Ibis}, + url = {https://www.cs.vu.nl/ibis/rmi.html}, +} + +@article{cuckoo, + title={Cuckoo: flexible compute-intensive task offloading in mobile cloud computing}, + author={Zhou, Zhigang and Zhang, Hongli and Ye, Lin and Du, Xiaojiang}, + journal={Wireless Communications and Mobile Computing}, + year={2016}, + publisher={Wiley Online Library} +} + +@inproceedings{maui, + title={MAUI: making smartphones last longer with code offload}, + author={Cuervo, Eduardo and Balasubramanian, Aruna and Cho, Dae-ki and Wolman, Alec and Saroiu, Stefan and Chandra, Ranveer and Bahl, Paramvir}, + booktitle={Proceedings of the 8th international conference on Mobile systems, applications, and services}, + pages={49--62}, + year={2010}, + organization={ACM} +} + +@article{docker, + title={Docker: lightweight linux containers for consistent development and deployment}, + author={Merkel, Dirk}, + journal={Linux Journal}, + volume={2014}, + number={239}, + pages={2}, + year={2014}, + publisher={Belltown Media} +} + +@inproceedings{selfdest, + title={RFID systems and security and privacy implications}, + author={Sarma, Sanjay E and Weis, Stephen A and Engels, Daniel W}, + booktitle={International Workshop on Cryptographic Hardware and Embedded Systems}, + pages={454--469}, + year={2002}, + organization={Springer} +} + +@misc{oraclenfs, + title={Overview of Secure RPC}, + author={Oracle}, + url = {https://docs.oracle.com/cd/E23823_01/html/816-4557/auth-2.html}, +} + +@misc{capnprotosecure, + title={Is Cap'n Proto Secure?}, + author={Kenton}, + url = {https://capnproto.org/faq.html#is-capn-proto-secure}, +} + + +@article{grid1, + title={High performance GridRPC middleware}, + author={Caniou, Yves and Caron, Eddy and Desprez, Fr{\'e}d{\'e}ric and Nakada, Hidemoto and Tanaka, Yoshio and Seymour, Keith}, + journal={Recent Developments in Grid Technology and Applications, Nova Science Publishers}, + pages={141--181}, + year={2008} +} + +@incollection{gridsolve1, + title={Gridsolve: The evolution of a network enabled solver}, + author={YarKhan, Asim and Dongarra, Jack and Seymour, Keith}, + booktitle={Grid-Based Problem Solving Environments}, + pages={215--224}, + year={2007}, + publisher={Springer} +} + +@article{gridsolve2, + title={Interactive Grid-access using Gridsolve and giggle}, + author={Hardt, M and Seymour, Keith and Dongarra, Jack and Zapf, M and Ruitter, NV}, + journal={Computing and Informatics}, + volume={27}, + number={2}, + pages={233--248}, + year={2012} +} + +@article{ninf, + title={Ninf-G: A reference implementation of RPC-based programming middleware for Grid computing}, + author={Tanaka, Yoshio and Nakada, Hidemoto and Sekiguchi, Satoshi and Suzumura, Toyotaro and Matsuoka, Satoshi}, + journal={Journal of Grid computing}, + volume={1}, + number={1}, + pages={41--51}, + year={2003}, + publisher={Springer} +} + +@article{erlang, + title={Concurrent programming in ERLANG}, + author={Armstrong, Joe and Virding, Robert and Wikstr{\"o}m, Claes and Williams, Mike}, + year={1993}, + publisher={Citeseer} +} + +@misc{Apigee, + title={gRPC: The Story of Microservices at Square}, + author={Surtani, Manick and Ho, Alan}, + url = {https://www.youtube.com/watch?v=-2sWDr3Z0Wo}, +} + +@misc{CoreSurfaceAPIs, + title={gRPC core APIs}, + author={Google}, + url = {https://github.com/grpc/grpc/tree/master/src/core}, +} + +@misc{gRPCCompanies, + title={About gRPC}, + author={Google}, + url = {http://www.grpc.io/about/}, +} + +@misc{gRPCLanguages, + title={gRPC Documentation}, + author={Google}, + url = {http://www.grpc.io/docs/}, +} + +@misc{gRPCProtos, + title={Google APIs}, + author={Google}, + url = {https://github.com/googleapis/googleapis/}, +} + +@misc{rpcimage, + title={Remote Procedure Call (RPC)}, + author={Taing, Nguonly}, + url = {http://lycog.com/distributed-systems/remote-procedure-call/}, + note = {Image URL: http://lycog.com/wp-content/uploads/2011/03/rpc-10-steps.png} +} + +@misc{trendrpcthrift, + title={Remote Procedure Call (RPC)}, + author={Google Trends}, + url = {https://www.google.com/trends/explore?cat=31&date=today%2012-m&q=apache%20thrift,grpc&hl=en-US} +} + +@misc{grpcauth, + title={GRPC Authentication}, + author={Google}, + url = {http://www.grpc.io/docs/guides/auth.html}, +} + +@misc{rfc707, + title={RFC 707: A high-level framework for network-based resource sharing}, + author={White, James E}, + year={1975}, + publisher={December} +} + +@techreport{rfc674, + title={RFC 674: Procedure call documents: Version 2}, + author={Postel, J and White, JE}, + year={1974} +} + +@article{rfc684, + title={RFC 684: Commentary on procedure calling as a network protocol}, + author={Schantz, Richard}, + year={1975} +} + +@misc{grpcbetter, + title={GRPC Authentication}, + author={Google}, + url = {https://www.quora.com/Is-GRPC-better-than-Thrift}, + publisher = {Quora} +} + +@misc{multiplexingthrift, + title={Added service multiplexing support}, + author={Yu, Lixin}, + url = {https://github.com/eleme/thriftpy/pull/88/commits/0877531f9246ca993c1d9af5d29cd009ee6ec7d4}, + publisher = {Github} +} + +@techreport{rfc5531, + title={RFC 5531: RPC: Remote Procedure Call Protocol Specification Version 2}, + author={Thurlow, R}, + year={2009} +} + +@techreport{rfc1831, + title={RFC 1831: RPC: Remote Procedure Call Protocol Specification Version 2}, + author={Srinivasan, R}, + year={1995} +} + +@misc{grpcchapter, + title={gRPC}, + booktitle={Programming Models for Distributed Computation}, + author={Grosu, P and Abdul Rehman, M and Anderson, E and Pai, V and Miller, H}, + url = {http://dist-prog-book.com/chapter/1/gRPC.html}, + publisher = {Github} +} + +@misc{stubrpc, + title={stub}, + author={Rouse, M}, + url = {http://whatis.techtarget.com/definition/stub}, + publisher = {WhatIs} +} + + +@misc{grpcpersec, + title={Scalable Realtime Microservices with Kubernetes and gRPC}, + author={Mandel, M}, + url = {https://www.youtube.com/watch?v=xb8u2s7cxzg&t=486s}, + publisher = {YouTube} +} + +@misc{usgdp, + title={2016 United States Budget Estimate}, + author={InsideGov}, + url = {http://federal-budget.insidegov.com/l/119/2016-Estimate}, + publisher = {InsideGov} +} + + +@incollection{notedistributed, + title={A note on distributed computing}, + author={Waldo, Jim and Wyant, Geoff and Wollrath, Ann and Kendall, Sam}, + booktitle={Mobile Object Systems Towards the Programmable Internet}, + pages={49--64}, + year={1997}, + publisher={Springer} +} + +@misc{dewan, + title={Synchronous vs Asynchronous}, + author={Dewan, P}, + url = {http://www.cs.unc.edu/~dewan/242/s07/notes/ipc/node9.html}, + year={2006}, + publisher = {UNC} +} + + +@misc{async, + title={Asynchronous RPC}, + url = {https://msdn.microsoft.com/en-us/library/windows/desktop/aa373550(v=vs.85).aspx}, + year={2006}, + publisher = {Microsoft} +} + +@misc{roi, + title={Remote Object Invocation}, + author={Setia, S}, + url = {https://cs.gmu.edu/~setia/cs707/slides/rmi-imp.pdf}, + publisher = {QMU} +} + + +@misc{Wiener, + title={XML-RPC}, + author={Weiner, D}, + url = {http://xmlrpc.scripting.com/spec.html}, + year={1999}, + publisher = {QMU} +} + + +@misc{soapvsxml, + title={XML-RPC}, + author={Jones, M}, + url = {https://www.quora.com/What-is-the-difference-between-xml-rpc-and-soap}, + year={2014}, + publisher = {Quora} +} + +@misc{thrifttut, + title={Apache THRIFT: A Much Needed Tutorial}, + author={Maheshwar, C}, + url = {http://digital-madness.in/blog/wp-content/uploads/2012/11/BSD_08_2013.8-18.pdf}, + year={2013}, + publisher = {Digital Madness} +} + +@misc{finagletut, + title={Finagle-Quickstart}, + author={Twitter}, + url = {https://twitter.github.io/finagle/guide/Quickstart.html}, + year={2016}, + publisher = {Twitter} +} + + +@misc{norman, + title={Communication Fundamentals}, + author={Norman, S}, + url = {http://slideplayer.com/slide/8555756/}, + year={2015}, + publisher = {SlideShare} +} + + + diff --git a/chapter/1/figures/grpc-benchmark.png b/chapter/1/figures/grpc-benchmark.png Binary files differnew file mode 100644 index 0000000..9f39c71 --- /dev/null +++ b/chapter/1/figures/grpc-benchmark.png diff --git a/chapter/1/figures/grpc-client-transport-handler.png b/chapter/1/figures/grpc-client-transport-handler.png Binary files differnew file mode 100644 index 0000000..edd5236 --- /dev/null +++ b/chapter/1/figures/grpc-client-transport-handler.png diff --git a/chapter/1/figures/grpc-cross-language.png b/chapter/1/figures/grpc-cross-language.png Binary files differnew file mode 100644 index 0000000..c600f67 --- /dev/null +++ b/chapter/1/figures/grpc-cross-language.png diff --git a/chapter/1/figures/grpc-googleapis.png b/chapter/1/figures/grpc-googleapis.png Binary files differnew file mode 100644 index 0000000..62718e5 --- /dev/null +++ b/chapter/1/figures/grpc-googleapis.png diff --git a/chapter/1/figures/grpc-languages.png b/chapter/1/figures/grpc-languages.png Binary files differnew file mode 100644 index 0000000..1f1c50d --- /dev/null +++ b/chapter/1/figures/grpc-languages.png diff --git a/chapter/1/figures/grpc-server-transport-handler.png b/chapter/1/figures/grpc-server-transport-handler.png Binary files differnew file mode 100644 index 0000000..fe895c0 --- /dev/null +++ b/chapter/1/figures/grpc-server-transport-handler.png diff --git a/chapter/1/figures/hello-world-client.png b/chapter/1/figures/hello-world-client.png Binary files differnew file mode 100644 index 0000000..c4cf7d4 --- /dev/null +++ b/chapter/1/figures/hello-world-client.png diff --git a/chapter/1/figures/hello-world-server.png b/chapter/1/figures/hello-world-server.png Binary files differnew file mode 100644 index 0000000..a51554b --- /dev/null +++ b/chapter/1/figures/hello-world-server.png diff --git a/chapter/1/figures/http2-frame.png b/chapter/1/figures/http2-frame.png Binary files differnew file mode 100644 index 0000000..59d6ed5 --- /dev/null +++ b/chapter/1/figures/http2-frame.png diff --git a/chapter/1/figures/http2-stream-lifecycle.png b/chapter/1/figures/http2-stream-lifecycle.png Binary files differnew file mode 100644 index 0000000..87333cb --- /dev/null +++ b/chapter/1/figures/http2-stream-lifecycle.png diff --git a/chapter/1/figures/protobuf-types.png b/chapter/1/figures/protobuf-types.png Binary files differnew file mode 100644 index 0000000..aaf3a1e --- /dev/null +++ b/chapter/1/figures/protobuf-types.png diff --git a/chapter/1/gRPC.md b/chapter/1/gRPC.md new file mode 100644 index 0000000..f6c47b7 --- /dev/null +++ b/chapter/1/gRPC.md @@ -0,0 +1,323 @@ +--- +layout: page +title: "gRPC" +by: "Paul Grosu (Northeastern U.), Muzammil Abdul Rehman (Northeastern U.), Eric Anderson (Google, Inc.), Vijay Pai (Google, Inc.), and Heather Miller (Northeastern U.)" +--- + +<h1> +<p align="center">gRPC</p> +</h1> + +<h4><em> +<p align="center">Paul Grosu (Northeastern U.), Muzammil Abdul Rehman (Northeastern U.), Eric Anderson (Google, Inc.), Vijay Pai (Google, Inc.), and Heather Miller (Northeastern U.)</p> +</em></h4> + +<hr> + +<h3><em><p align="center">Abstract</p></em></h3> + +<em>gRPC has been built from a collaboration between Google and Square as a public replacement of Stubby, ARCWire and Sake {% cite Apigee %}. The gRPC framework is a form of an Actor Model based on an IDL (Interface Description Language), which is defined via the Protocol Buffer message format. With the introduction of HTTP/2 the internal Google Stubby and Square Sake frameworks are now been made available to the public. By working on top of the HTTP/2 protocol, gRPC enables messages to be multiplexed and compressed bi-directionally as premptive streams for maximizing capacity of any microservices ecosystem. Google has also a new approach to public projects, where instead of just releasing a paper describing the concepts will now also provide the implementation of how to properly interpret the standard. +</em> + +<h3><em>Introduction</em></h3> + +In order to understand gRPC and the flexibity of enabling a microservices ecosystem to become into a Reactive Actor Model, it is important to appreciate the nuances of the HTTP/2 Protocol upon which it is based. Afterward we will describe the gRPC Framework - focusing specifically on the gRPC-Java implementation - with the scope to expand this chapter over time to all implementations of gRPC. At the end we will cover examples demonstrating these ideas, by taking a user from the initial steps of how to work with the gRPC-Java framework. + +<h3>1 <em>HTTP/2</em></h3> + +The HTTP 1.1 protocol has been a success for some time, though there were some key features which began to be requested by the community with the increase of distributed computing, especially in the area of microservices. The phenomenon of creating more modularized functional units that are organically constructed based on a <em>share-nothing model</em> with a bidirectional, high-throughput request and response methodology demands a new protocol for communication and integration. Thus the HTTP/2 was born as a new standard, which is a binary wire protocol providing compressed streams that can be multiplexed for concurrency. As many microservices implementations currently scan header messages before actually processing any payload in order to scale up the processing and routing of messages, HTTP/2 now provides header compression for this purpose. One last important benefit is that the server endpoint can actually push cached resources to the client based on anticipated future communication, dramatically saving client communication time and processing. + +<h3>1.1 <em>HTTP/2 Frames</em></h3> + +The HTTP/2 protocol is now a framed protocol, which expands the capability for bidirectional, asynchronous communication. Every message is thus part of a frame that will have a header, frame type and stream identifier aside from the standard frame length for processing. Each stream can have a priority, which allows for dependency between streams to be achieved forming a <em>priority tree</em>. The data can be either a request or response which allows for the bidirectional communication, with the capability of flagging the communication for stream termination, flow control with priority settings, continuation and push responses from the server for client confirmation. Below is the format of the HTTP/2 frame {% cite RFC7540 %}: + +<p align="center"> + <img src="figures/http2-frame.png" /><br> + <em>Figure 1: The encoding a HTTP/2 frame.</em> +</p> + +<h3>1.2 <em>Header Compression</em></h3> + +The HTTP header is one of the primary methods of passing information about the state of other endpoints, the request or response and the payload. This enables endpoints to save time when processing a large quantity to streams, with the ability to forward information along without wasting time to inspect the payload. Since the header information can be quite large, it is possible to now compress the them to allow for better throughput and capacity of stored stateful information. + +<h3>1.3 <em>Multiplexed Streams</em></h3> + +As streams are core to the implementation of HTTP/2, it is important to discuss the details of their implemenation in the protocol. As many streams can be open simultanously from many endpoints, each stream will be in one of the following states. Each stream is multiplexed together forming a chain of streams that are transmitted over the wire, allowing for asynchronous bi-directional concurrency to be performed by the receiving endpoint. Below is the lifecycle of a stream {% cite RFC7540 %}: + +<p align="center"> + <img src="figures/http2-stream-lifecycle.png" /><br> + <em>Figure 2: The lifecycle of a HTTP/2 stream.</em> +</p> + +To better understand this diagram, it is important to define some of the terms in it: + +<em>PUSH_PROMISE</em> - This is being performed by one endpoint to alert another that it will be sending some data over the wire. + +<em>RST_STREAM</em> - This makes termination of a stream possible. + +<em>PRIORITY</em> - This is sent by an endpoint on the priority of a stream. + +<em>END_STREAM</em> - This flag denotes the end of a <em>DATA</em> frame. + +<em>HEADERS</em> - This frame will open a stream. + +<em>Idle</em> - This is a state that a stream can be in when it is opened by receiving a <em>HEADERS</em> frame. + +<em>Reserved (Local)</em> - To be in this state is means that one has sent a PUSH_PROMISE frame. + +<em>Reserved (Remote)</em> - To be in this state is means that it has been reserved by a remote endpoint. + +<em>Open</em> - To be in this state means that both endpoints can send frames. + +<em>Closed</em> - This is a terminal state. + +<em>Half-Closed (Local)</em> - This means that no frames can be sent except for <em>WINDOW_UPDATE</em>, <em>PRIORITY</em>, and <em>RST_STREAM</em>. + +<em>Half-Closed (Remote)</em> - This means that a frame is not used by the remote endpoint to send frames of data. + +<h3>1.4 <em>Flow Control of Streams</em></h3> + +Since many streams will compete for the bandwidth of a connection, in order to prevent bottlenecks and collisions in the transmission. This is done via the <em>WINDOW_UPDATE</em> payload for every stream - and the overall connection as well - to let the sender know how much room the receiving endpoint has for processing new data. + +<h3>2 <em>Protocol Buffers with RPC</em></h3> + +Though gRPC was built on top of HTTP/2, an IDL had to be used to perform the communication between endpoints. The natural direction was to use Protocol Buffers is the method of stucturing key-value-based data for serialization between a server and client. At the time of the start of gRPC development only version 2.0 (proto2) was available, which only implemented data structures without any request/response mechanism. An example of a Protocol Buffer data structure would look something like this: + +``` +// A message containing the user's name. +message Hello { + string name = 1; +} +``` +<p align="center"> + <em>Figure 3: Protocol Buffer version 2.0 representing a message data-structure.</em> +</p> + +This message will also be encoded for highest compression when sent over the wire. For example, let us say that the message is the string <em>"Hi"</em>. Every Protocol Buffer type has a value, and in this case a string has a value of `2`, as noted in the Table 1 {% cite Protobuf-Types %}. + +<p align="center"> + <img src="figures/protobuf-types.png" /><br> + <em>Table 1: Tag values for Protocol Buffer types.</em> +</p> + +One will notice that there is a number associated with each field element in the Protocol Buffer definition, which represents its <em>tag</em>. In Figure 3, the field `name` has a tag of `1`. When a message gets encoded each field (key) will start with a one byte value (8 bits), where the least-significant 3-bit value encode the <em>type</em> and the rest the <em>tag</em>. In this case tag which is `1`, with a type of 2. Thus the encoding will be `00001 010`, which has a hexdecimal value of `A`. The following byte is the length of the string which is `2`, followed by the string as `48` and `69` representing `H` and `i`. Thus the whole tranmission will look as follows: + +``` +A 2 48 69 +``` + +Thus the language had to be updated to support gRPC and the development of a service message with a request and a response definition was added for version version 3.0.0 of Protocol Buffers. The updated implementation would look as follows {% cite HelloWorldProto %}: + +``` +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} +``` +<p align="center"> + <em>Figure 4: Protocol Buffer version 3.0.0 representing a message data-structure with the accompanied RPC definition.</em> +</p> + +Notice the addition of a service, where the RPC call would use one of the messages as the structure of a <em>Request</em> with the other being the <em>Response</em> message format. + +Once of these Proto file gets generated, one would then use them to compile with gRPC to for generating the <em>Client</em> and <em>Server</em> files representing the classical two endpoints of a RPC implementation. + +<h3>3 <em>gRPC</em></h3> + +gRPC was built on top of HTTP/2, and we will cover the specifics of gRPC-Java, but expand it to all the implementations with time. gRPC is a cross-platform framework that allows integration across many languages as denoted in Figure 5 {% cite gRPC-Overview %}. + +<p align="center"> + <img src="figures/grpc-cross-language.png" /><br> + <em>Figure 5: gRPC allows for asynchronous language-agnostic message passing via Protocol Buffers.</em> +</p> + +To ensure scalability, benchmarks are run on a daily basis to ensure that gRPC performs optimally under high-throughput conditions as illustrated in Figure 6 {% cite gRPC-Benchmark %}. + +<p align="center"> + <img src="figures/grpc-benchmark.png" /><br> + <em>Figure 6: Benchmark showing the queries-per-second on two virtual machines with 32 cores each.</em> +</p> + +To standardize, most of the public Google APIs - including the Speech API, Vision API, Bigtable, Pub/Sub, etc. - have been ported to support gRPC, and their definitions can be found at the following location: + +<p align="center"> + <img src="figures/grpc-googleapis.png" /><br> + <em>Figure 7: The public Google APIs have been updated for gRPC, and be found at <a href="https://github.com/googleapis/googleapis/tree/master/google">https://github.com/googleapis/googleapis/tree/master/google</a></em> +</p> + + +<h3>3.1 <em>Supported Languages</em></h3> + +The officially supported languages are listed in Table 2 {% cite gRPC-Languages %}. + +<p align="center"> + <img src="figures/grpc-languages.png" /><br> + <em>Table 2: Officially supported languages by gRPC.</em> +</p> + +<h3>3.2 <em>Authentication</em></h3> + +There are two methods of authentication that are available in gRPC: + +* SSL/TLS +* Google Token (via OAuth2) + +gRPC is flexible in that once can plug in their custom authentication system if that is preferred. + +<h3>3.3 <em>Development Cycle</em></h3> + +In its simplest form gRPC has a structured set of steps one goes about using it, which has this general flow: + +<em>1. Download gRPC for the language of interest.</em> + +<em>2. Implement the Request and Response definition in a ProtoBuf file.</em> + +<em>3. Compile the ProtoBuf file and run the code-generators for the the specific language. This will generate the Client and Server endpoints.</em> + +<em>4. Customize the Client and Server code for the desired implementation.</em> + +Most of these will require tweaking the Protobuf file and testing the throughput to ensure that the network and CPU capacities are optimally maximized. + +<h3>3.4 <em>The gRPC Framework (Stub, Channel and Transport Layer)</em></h3> + +One starts by initializing a communication <em>Channel</em> between <em>Client</em> to a <em>Server</em> and storing that as a <em>Stub</em>. The <em>Credentials</em> are provided to the Channel when being initialized. These form a <em>Context</em> for the Client's connection to the Server. Then a <em>Request</em> can be built based on the definition in the Protobuf file. The Request and associated expected<em>Response</em> is executed by the <em>service</em> constructed in the Protobuf file. The Response is them parsed for any data coming from the Channel. + +The connection can be asynchronous and bi-directionally streaming so that data is constantly flowing back and available to be read when ready. This allows one to treat the Client and Server as endpoints where one can even adjust not just the flow but also intercept and decoration to filter and thus request and retrieve the data of interest. + +The <em>Transport Layer</em> performs the retrieval and placing of binary protocol on the wire. For <em>gRPC-Java</em> has three implementations, though a user can implement their own: <em>Netty, OkHttp, and inProcess.</em> + +<h3>3.5 <em>gRPC Java</em></h3> + +The Java implementation of gRPC been built with Mobile platform in mind and to provide that capability it requires JDK 6.0 to be supported. Though the core of gRPC is built with data centers in mind - specifically to support C/C++ for the Linux platform - the Java and Go implementations are two very reliable platform to experiment the microservice ecosystem implementations. + +There are several moving parts to understanding how gRPC-Java works. The first important step is to ensure that the Client and Server stub inferface code get generated by the Protobuf plugin compiler. This is usually placed in your <em>Gradle</em> build file called `build.gradle` as follows: + +``` + compile 'io.grpc:grpc-netty:1.0.1' + compile 'io.grpc:grpc-protobuf:1.0.1' + compile 'io.grpc:grpc-stub:1.0.1' +``` + +When you build using Gradle, then the appropriate base code gets generated for you, which you can override to build your preferred implementation of the Client and Server. + +Since one has to implement the HTTP/2 protocol, the chosen method was to have a <em>Metadata</em> class that will convert the key-value pairs into HTTP/2 Headers and vice-versa for the Netty implementation via <em>GrpcHttp2HeadersDecoder</em> and <em>GrpcHttp2OutboundHeaders</em>. + +Another key insight is to understand that the code that handles the HTTP/2 conversion for the Client and the Server are being done via the <em>NettyClientHandler.java</em> and <em>NettyServerHandler.java</em> classes shown in Figures 8 and 9. + +<p align="center"> + <img src="figures/grpc-client-transport-handler.png" /><br> + <em>Figure 8: The Client Tranport Handler for gRPC-Java.</em> +</p> + +<p align="center"> + <img src="figures/grpc-server-transport-handler.png" /><br> + <em>Figure 9: The Server Tranport Handler for gRPC-Java.</em> +</p> + + +<h3>3.5.1 <em>Downloading gRPC Java</em></h3> + +The easiest way to download the gRPC-Java implementation is by performing the following command: + +``` +git clone -b v1.0.0 https://github.com/grpc/grpc-java.git +``` + +Next compile on a Windows machine using Gradle (or Maven) using the following steps - and if you are using any Firewall software it might be necessary to temporarily disable it while compiling gRPC-Java as sockets are used for the tests: + +``` +cd grpc-java +set GRADLE_OPTS=-Xmx2048m +set JAVA_OPTS=-Xmx2048m +set DEFAULT_JVM_OPTS="-Dfile.encoding=utf-8" +echo skipCodegen=true > gradle.properties +gradlew.bat build -x test +cd examples +gradlew.bat installDist +``` + +If you are having issues with Unicode (UTF-8) translation when using Git on Windows, you can try the following commands after entering the `examples` folder: + +``` +wget https://raw.githubusercontent.com/benelot/grpc-java/feb88a96a4bc689631baec11abe989a776230b74/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java + +copy RouteGuideServer.java src\main\java\io\grpc\examples\routeguide\RouteGuideServer.java +``` + +<h3>3.5.2 <em>Running the Hello World Demonstration</em></h3> + +Make sure you open two Command (Terminal) windows, each within the `grpc-java\examples\build\install\examples\bin` folder. In the first of the two windows type the following command: + +``` +hello-world-server.bat +``` + +You should see the following: + +<p align="center"> + <img src="figures/hello-world-server.png" /><br> + <em>Figure 10: The Hello World gRPC Server.</em> +</p> + +In the second of the two windows type the following command: + +``` +hello-world-client.bat +``` + +You should see the following response: + +<p align="center"> + <img src="figures/hello-world-client.png" /><br> + <em>Figure 10: The Hello World gRPC Client and the response from the Server.</em> +</p> + +<h3>4 <em>Conclusion</em></h3> + +This chapter presented an overview of the concepts behing gRPC, HTTP/2 and will be expanded in both breadth and language implementations. The area of microservices one can see how a server endpoint can actually spawn more endpoints where the message content is the protobuf definition for new endpoints to be generated for load-balancing like for the classical Actor Model. + +## References + +` `[Apigee]: https://www.youtube.com/watch?v=-2sWDr3Z0Wo + +` `[Authentication]: http://www.grpc.io/docs/guides/auth.html + +` `[Benchmarks]: http://www.grpc.io/docs/guides/benchmarking.html + +` `[CoreSurfaceAPIs]: https://github.com/grpc/grpc/tree/master/src/core + +` `[ErrorModel]: http://www.grpc.io/docs/guides/error.html + +` `[gRPC]: https://github.com/grpc/grpc/blob/master/doc/g_stands_for.md + +` `[gRPC-Companies]: http://www.grpc.io/about/ + +` `[gRPC-Languages]: http://www.grpc.io/docs/ + +` `[gRPC-Protos]: https://github.com/googleapis/googleapis/ + +` `[Netty]: http://netty.io/ + +` `[RFC7540]: http://httpwg.org/specs/rfc7540.html + +` `[HelloWorldProto]: https://github.com/grpc/grpc/blob/master/examples/protos/ +helloworld.proto + +` `[Protobuf-Types]: https://developers.google.com/protocol-buffers/docs/encoding + +` `[gRPC-Overview]: http://www.grpc.io/docs/guides/ + +` `[gRPC-Languages]: http://www.grpc.io/about/#osp + +` `[gRPC-Benchmark]: http://www.grpc.io/docs/guides/benchmarking.html diff --git a/chapter/1/rpc.md b/chapter/1/rpc.md index b4bce84..ccc9739 100644 --- a/chapter/1/rpc.md +++ b/chapter/1/rpc.md @@ -1,11 +1,381 @@ --- layout: page -title: "Remote Procedure Call" -by: "Joe Schmoe and Mary Jane" +title: "RPC is Not Dead: Rise, Fall and the Rise of Remote Procedure Calls" +by: "Muzammil Abdul Rehman and Paul Grosu" --- -Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. {% cite Uniqueness --file rpc %} +## Introduction: + +*Remote Procedure Call* (RPC) is a design *paradigm* that allow two entities to communicate over a communication channel in a general request-response mechanism. The definition of RPC has mutated and evolved significantly over the past three decades, and therefore RPC *paradigm* is a generic, broadly classifying term to refer to all RPC-esque systems that have arisen over the past four decades. The *definition* of RPC has evolved over the decades. It has moved on from a simple *client-server* design to a group of inter-connected *services*. While the initial RPC *implementations* were designed as tools for outsourcing computation to a server in a distributed system, however, RPC has evolved over the years to build language-agnostic ecosystem of applications. This RPC *paradigm* has been part of the driving force in creating truly revolutionizing distributed systems and giving rise to various communication schemes and protocols between diverse systems. + +RPC *paradigm* has been used to implement our every-day systems. From lower level applications like Network File Systems{% cite sunnfs --file rpc %} and Remote Direct Memory Access{% cite rpcoverrdma --file rpc %} to access protocols to developing an ecosystem of microservices, RPC has been used everywhere. RPC has a diverse variety of applications -- SunNFS{% cite sunnfs --file rpc %}, Twitter's Finagle{% cite finagle --file rpc %}, Apache Thrift{% cite thrift --file rpc %}, Java RMI{% cite rmipaper --file rpc %}, SOAP, CORBA{% cite corba --file rpc %} and Google's gRPC{% cite grpc --file rpc %} to name a few. + +RPC has evolved over the years. Starting off as a synchronous, insecure, request-response system, RPC has evolved into a secure, asynchronous, resilient *paradigm* that has influenced protocols and programming designs, like, HTTP, REST, and just about anything with a request-response system. It has transitioned to an asynchronous, bidirectional, communication mechanism for connecting services and devices across the internet. While the initial RPC implementations mainly focused on a local, private network with multiple clients communicating with a server and synchronously waiting for the response from the server, modern RPC systems have *endpoints* communicating with each other, asynchronously passing arguments and processing responses, as well having two-way request-response streams(from client to server, and also from server to client). RPC has influenced various design paradigms and communication protocols. + +## Remote Procedure Calls: + +The *Remote Procedure Call paradigm* can be defined, at a high level, as a set of two communication *endpoints* connected over a network with one endpoint sending a request and the other endpoint generating a response based on that request. In the simplest terms, it's a request-response paradigm where the two *endpoints*/hosts have different *address space*. The host that requests a remote procedure can be referred to as *caller* and the host that responds to this can be referred to as *callee*. + +The *endpoints* in the RPC can either be a client and a server, two nodes in a peer-to-peer network, two hosts in a grid computation system, or even two microservices. The RPC communication is not limited to two hosts, rather could have multiple hosts or *endpoints* involved {% cite anycastrpc --file rpc %}. + +<p align="center"> +[ Image Source: {% cite rpcimage --file rpc %}] +</p> +<figure> + <img src="{{ site.baseurl }}/resources/img/rpc_chapter_1_ycog_10_steps.png" alt="RPC in 10 Steps." /> +<p>Fig1. - Remote Procedure Call.</p> +</figure> + +The simplest RPC implementation looks like Fig1. In this case, the *client*(or *caller*) and the *server*(or *callee*) are separated by a physical network. The main components of the system are the client routine/program, the client stub, the server routine/program, the server stub, and the network routines. A *stub* is a small program that is generally used as a stand-in(or an interface) for a larger program{% cite stubrpc --file rpc %}. A *client stub* exposes the functionality provided by the server routine to the client routine while the server stub provides a client-like program to the server routine{% cite rpcimage --file rpc %}. The client stub takes the input arguments from the client program and returns the result, while the server stub provides input arguments to the server program and gets the results. The client program can only interact with the client stub that provides the interface of the remote server to the client. This stub also provides marshalling/pickling/serialization of the input arguments sent to the stub by the client routine. Similarly, the server stub provides a client interface to the server routines as well as the marshalling services. + +When a client routine performs a *remote procedure*, it calls the client stub, which serializes the input argument. This serialized data is sent to the server using OS network routines (TCP/IP){% cite rpcimage --file rpc %}. The data is serialized by the server stub, present to the server routines for the given arguments. The return value from the server routines is serialized again and sent over the network back to the client where it's deserialized by the client stub and presented to the client routine. This *remote procedure* is generally hidden from the client routine and it appears as a *local procedure* to the client. RPC services also require a discovery service/host-resolution mechanism to bootstrap the communication between the client and the server. + +One important feature of RPC is different *address space* {% cite implementingrpc --file rpc %} for all the endpoints, however, passing the locations to a global storage(Amazon S3, Microsoft Azure, Google Cloud Store) is not impossible. In RPC, all the hosts have separate *address spaces*. They can't share pointers or references to a memory location in one host. This *address space* isolation means that all the information is passed in the messages between the host communicating as a value (objects or variables) but not by reference. Since RPC is a *remote* procedure call, the values sent to the *remote* host cannot be pointers or references to a *local* memory. However, passing links to a global shared memory location is not impossible but rather dependent on the type of system (see *Applications* section for detail). + +Originally, RPC was developed as a synchronous request-response mechanism, tied to a specific programming language implementation, with a custom network protocol to outsource computation {% cite implementingrpc --file rpc %}. It had registry system to register all the servers. One of the earliest RPC-based system {% cite implementingrpc --file rpc %} was implemented in the Cedar programming language in early 1980's. The goal of this system was to provide similar programming semantics as local procedure calls. Developed for a LAN network with an inefficient network protocol and a *serialization* scheme to transfer information using the said network protocol, this system aimed at executing a *procedure*(also referred as *method* or a *function*) in a remote *address space*. The single-thread synchronous client and the server were written in an old *Cedar* programming language with a registry system used by the servers to *bind*(or register) their procedures. The clients used this registry system to find a specific server to execute their *remote* procedures. This RPC implementation {% cite implementingrpc --file rpc %} had a very specific use-case. It was built specifically for outsourcing computation between a "Xerox research internetwork", a small, closed, ethernet network with 16-bit addresses{% cite implementingrpc --file rpc %}. + +Modern RPC-based systems are language-agnostic, asynchronous, load-balanced systems. Authentication and authorization to these systems have been added as needed along with other security features. Most of these systems have fault-handling built into them as modules and the systems are generally spread all across the internet. + +RPC programs have a network (or a communication channel), therefore, they need to handle remote errors and be able to communication information successfully. Error handling generally varies and is categorized as *remote-host* or *network* failure handling. Depending on the type of the system, and the error, the caller (or the callee) return an error and these errors can be handled accordingly. For asynchronous RPC calls, it's possible to specify events to ensure progress. + +RPC implementations use a *serialization*(also referred to as *marshalling* or *pickling*) scheme on top of an underlying communication protocol (traditionally TCP over IP). These *serialization* schemes allow both the caller *caller* and *callee* to become language agnostic allowing both these systems to be developed in parallel without any language restrictions. Some examples of serialization schemes are JSON, XML, or Protocol Buffers {% cite grpc --file rpc %}. + +Modern RPC systems allow different components of a larger system to be developed independently of one another. The language-agnostic nature combined with a decoupling of some parts of the system allows the two components (caller and callee) to scale separately and add new functionalities. This independent scaling of the system might lead to a mesh of interconnected RPC *services* facilitating one another. + +### Examples of RPC + +RPC has become very predominant in modern systems. Google even performs orders of 10^10^ RPC calls per second {% cite grpcpersec --file rpc %}. That's *tens of trillions* of RPC calls *every second*. It's more than the *annual GDP of United States* {%cite usgdp --file rpc%}. + +In the simplest RPC systems, a client connects to a server over a network connection and performs a *procedure*. This procedure could be as simple as `return "Hello World"` in your favorite programming language. However, the complexity of the of this remote procedure has no upper bound. + +Here's the code of this simple RPC server, written in Python3. +```python +from xmlrpc.server import SimpleXMLRPCServer + +# a simple RPC function that returns "Hello World!" +def remote_procedure(n): + return "Hello World!" + +server = SimpleXMLRPCServer(("localhost", 8080)) +print("RPC Server listening on port 8080...") +server.register_function(remote_procedure, "remote_procedure") +server.serve_forever() +``` + +This code for a simple RPC client for the above server, written in Python3, is as follows. + +```python +import xmlrpc.client + +with xmlrpc.client.ServerProxy("http://localhost:8080/") as proxy: + print(proxy.remote_procedure()) +``` + +In the above example, we create a simple function called `remote_procedure` and *bind* it to port *8080* on *localhost*. The RPC client then connects to the server and *request* the `remote_procedure` with no input arguments. The server then *responds* with a return value of the `remote_procedure`. + +One can even view the *three-way handshake* as an example of RPC paradigm. The *three-way handshake* is most commonly used in establishing a TCP connection. Here, a server-side application *binds* to a port on the server, and adds a hostname resolution entry is added to a DNS server(can be seen as a *registry* in RPC). Now, when the client has to connect to the server, it requests a DNS server to resolve the hostname to an IP address and the client sends a SYN packet. This SYN packet can be seen as a *request* to another *address space*. The server, upon receiving this, returns a SYN-ACK packet. This SYN-ACK packet from the server can be seen as *response* from the server, as well as a *request* to establish the connection. The client then *responds* with an ACK packet. + +## Evolution of RPC: + +RPC paradigm was first proposed in 1980’s and still continues as a relevant model of performing distributed computation, which initially was developed for a LAN and now can be implemented on open networks, as web services across the internet. It has had a long and arduous journey to its current state. Here are the three main(overlapping) stages that RPC went through. + +### The Rise: All Hail RPC(Early 1970's - Mid 1980's) + +RPC started off strong. With RFC 674{% cite rfc674 --file rpc %} and RFC 707{% cite rfc674 rfc707 --file rpc %} coming out and specifying the design of Remote Procedure Calls, followed by Nelson et. al{% cite implementingrpc --file rpc %} coming up with a first RPC implementation for the Cedar programming language, RPC revolutionized systems in general and gave rise to one of the earliest distributed systems(apart from the internet, of course). + +With these early achievements, people started using RPC as the defacto design choice. It became a Holy Grail in the systems community for a few years after the first implementation. + +### The Fall: RPC is Dead(Late 1970's - Late 1990's) + +RPC, despite being an initial success, wasn't without flaws. Within a year of its inception, the limitation of the RPC started to catch up with it. RFC 684 criticized RPC for latency, failures, and the cost. It also focussed on message-passing systems as an alternative to RPC design. Similarly, a few years down the road, in 1988, Tenenbaum et.~al presented similar concerns against RPC {%cite critiqueofrpc --file rpc %}. It talked about problems heterogeneous devices, message passing as an alternative, packet loss, network failure, RPC's synchronous nature, and highlighted that RPC is not a one-size-fits-all model. + +In 1994, *A Note on Distributed Computing* was published. This paper claimed RPC to be "fundamentally flawed" {%cite notedistributed --file rpc %}. It talked about a unified object view and cited four main problems with dividing these objects for distributed computing in RPC: communication latency, address space separation, partial failures and concurrency issues(resulting from accessing same remote object by two concurrent client requests). Although most of these problems(except partial failures) were inherently associated with distributed computing itself but partial failures for RPC systems meant that progress might not always be possible in an RPC system. + +This era wasn't a dead end for RPC, though. Some of the preliminary designs for modern RPC systems were introduced in this era. Perhaps, the earliest system in this era was SunRPC {% cite sunnfs --file rpc %} used for the Sun Network File System(NFS). Soon to follow SunRPC was CORBA{% cite corba --file rpc %} which was followed by Java RMI{% cite rmipaper --file rpc %}. + +However, the initial implementations of these systems were riddled with various issues and design flaws. For instance, Java RMI didn't handle network failures and assumed a reliable network with zero-latency{% cite rmipaper --file rpc %}. + +### The Rise, Again: Long Live RPC(Late 1990's - Today) + +Despite facing problems in its early days, RPC withstood the test of time. Researchers realized the limitations of RPC and focussed on rectifying and instead of enforcing RPC, they started to use RPC in applications where it was needed. The designer started adding exception-handling, async, network failure handling and heterogeneity between different languages/devices to RPC. + +In this era, SunRPC went through various additions and became came to be known as Open Network Computing RPC(ONC RPC). CORBA and RMI have also undergone various modifications as internet standards were set. + +A new breed of RPC also started in this era, Async(asynchronous) RPC, giving rise to systems that use *futures* and *promises*, like Finagle{% cite finagle --file rpc %} and Cap'n Proto(post-2010). + + +<p align="center"> +[ Image Source: {% cite norman --file rpc %}] +</p> +<figure> + <img src="{{ site.baseurl }}/resources/img/rpc_chapter_1_syncrpc.jpg" alt="RPC in 10 Steps." /> +<p>Fig2. - Synchronous RPC.</p> +</figure> + + +<p align="center"> +[ Image Source: {% cite norman --file rpc %}] +</p> +<figure> + <img src="{{ site.baseurl }}/resources/img/rpc_chapter_1_asyncrpc.jpg" alt="RPC in 10 Steps." /> +<p>Fig3. - Asynchronous RPC.</p> +</figure> + + +A traditional, synchronous RPC is a *blocking* operation while an asynchronous RPC is a *non-blocking* operation{%cite dewan --file rpc %}. Fig2. shows a synchronous RPC call while Fig3. shows an asynchronous RPC call. In synchronous RPC, the client sends a request to the server and blocks and waits for the server to perform its computation and return the result. Only after getting the result from the server, the client proceeds onwards. In an asynchronous RPC, the client performs a request to the server and waits only for the acknowledgment of the delivery of input parameters/arguments. After this, the client proceeds onwards and when the server is finished processing, it sends an interrupt to the client. The client receives this message from the server, receives the results, and continues. + +Asynchronous RPC makes it possible to separate the remote call from the return value making it possible to write a single-threaded client to handle multiple RPC calls at the specific intervals it needs to process{%cite async --file rpc%}. It also allows easier handling of slow clients/servers as well as transferring large data easily(due to their incremental nature){%cite async --file rpc%}. + +In the post-2000 era, MAUI{% cite maui --file rpc %}, Cap'n Proto{% cite capnprotosecure --file rpc %}, gRPC{% cite grpc --file rpc %}, Thrift{% cite thrift --file rpc %} and Finagle{% cite finagle --file rpc %} have been released, which have significantly boosted the widespread use of RPC. + +Most of these newer systems came up with their Interface Description Languages(IDLs). These IDLs specified the common protocols and interfacing language that could be used to transfer information clients and servers written in different programming languages, making these RPC implementations language-agnostic. Some of the most common IDLs are JSON, XML, and ProtoBufs. + +A high-level overview of some of the most important RPC implementation is as follows. + +#### Java Remote Method Invocation +Java RMI (Java Remote Method Invocation){% cite rmibook --file rpc %} is a Java implementation for performing RPC (Remote Procedure Calls) between a client and a server. The client using a stub passes via a socket connection the information over the network to the server that contains remote objects. The Remote Object Registry (ROR){% cite rmipaper --file rpc %} on the server contains the references to objects that can be accessed remotely and through which the client will connect to. The client then can request the invocation of methods on the server for processing the requested call and then responds with the answer. + +RMI provides some security by being encoded but not encrypted, though that can be augmented by tunneling over a secure connection or other methods. Moreover, RMI is very specific to Java. It cannot be used to take advantage of the language-independence feature that is inherent to most RPC implementations. Perhaps the main problem with RMI is that it doesn't provide *access transparency*. This means that a programmer(not the client program) cannot distinguish between the local objects or the remote objects making it relatively difficult handle partial failures in the network{%cite roi --file rpc %}. + +#### CORBA +CORBA (Common Object Request Broker Architecture){% cite corba --file rpc %} was created by the Object Management Group {% cite corbasite --file rpc %} to allow for language-agnostic communication among multiple computers. It is an object-oriented model defined via an Interface Definition Language (IDL) and the communication is managed through an Object Request Broker (ORB). This ORB acts as a broker for objects. CORBA can be viewed as a language-independent RMI system where each client and server have an ORB by which they communicate. The benefits of CORBA is that it allows for multi-language implementations that can communicate with each other, but much of the criticism around CORBA relates to poor consistency among implementations and it's relatively outdated by now. Moreover, CORBA suffers from same access transparency issues as Java RMI. + +#### XML-RPC and SOAP +The XML-RPC specifications {% cite Wiener --file rpc%} performs an HTTP Post request to a server formatted as XML composed of a *header* and *payload* that calls only one method. It was originally released in the late 1990's and unlike RMI, it provides transparency by using HTTP as a transparent mechanism. + +The header has to provide the basic information, like user agent and the size of the payload. The payload has to initiate a `methodCall` structure by specifying the name via `methodName` and associated parameter values. Parameters for the method can be scalar, structures or (recursive) arrays. The types of scalar can be one of `i4`, `int`, `boolean`, `string`, `double`, `dateTime.iso8601` or `base64`. The scalars are used to create more complex structures and arrays. + +Below is an example as provided by the XML-RPC documentation{% cite Wiener --file rpc%}: + +```XML + +POST /RPC2 HTTP/1.0 +User-Agent: Frontier/5.1.2 (WinNT) +Host: betty.userland.com +Content-Type: text/xml +Content-length: 181 + +<?xml version="1.0"?> +<methodCall> + <methodName>examples.getStateName</methodName> + <params> + <param> + <value><i4>41</i4></value> + </param> + </params> + </methodCall> +``` + +The response to a request will have the `methodResponse` with `params` and values, or a `fault` with the associated `faultCode` in case of an error {% cite Wiener --file rpc %}: + +```XML +HTTP/1.1 200 OK +Connection: close +Content-Length: 158 +Content-Type: text/xml +Date: Fri, 17 Jul 1998 19:55:08 GMT +Server: UserLand Frontier/5.1.2-WinNT + +<?xml version="1.0"?> +<methodResponse> + <params> + <param> + <value><string>South Dakota</string></value> + </param> + </params> + </methodResponse> +``` + +SOAP (Simple Object Access Protocol) is a successor of XML-RPC as a web-services protocol for communicating between a client and server. It was initially designed by a group at Microsoft {% cite soaparticle1 --file rpc %}. The SOAP message is an XML-formatted message composed of an envelope inside which a header and a payload are provided(just like XML-RPC). The payload of the message contains the request and response of the message, which is transmitted over HTTP or SMTP(unlike XML-RPC). + +SOAP can be viewed as the superset of XML-RPC that provides support for more complex authentication schemes{%cite soapvsxml --file rpc %} as well as its support for WSDL(Web Services Description Language), allowing easier discovery and integration with remote web services{%cite soapvsxml --file rpc %}. + +The benefit of SOAP is that it provides the flexibility for transmission over multiple transport protocol. The XML-based messages allow SOAP to become language agnostic, though parsing such messages could become a bottleneck. + +#### Thrift +Thrift is an *asynchronous* RPC system created by Facebook and now part of the Apache Foundation {% cite thrift --file rpc %}. It is a language-agnostic Interface Description Language(IDL) by which one generates the code for the client and server. It provides the opportunity for compressed serialization by customizing the protocol and the transport after the description file has been processed. + +Perhaps, the biggest advantage of Thrift is that its binary data format has a very low overhead. It has a relatively lower transmission cost(as compared to other alternatives like SOAP){%cite thrifttut --file rpc %} making it very efficient for large amounts of data transfer. + +#### Finagle +Finagle is a fault-tolerant, protocol-agnostic runtime for doing RPC and high-level API for composing futures(see Async RPC section), with RPC calls generated under the hood. It was created by Twitter and is written in Scala to run on a JVM. It is based on three object types: Service objects, Filter objects and Future objects {% cite finagle --file rpc %}. + +The Future objects act by asynchronously being requested for a computation that would return a response at some time in the future. These Future objects are the main communication mechanism in Finagle. All the inputs and the output are represented as Future objects. + +The Service objects are an endpoint that will return a Future upon processing a request. These Service objects can be viewed as the interfaces used to implement a client or a server. + +A sample Finagle Server that reads a request and returns the version of the request is shown below. This example is taken from Finagle documentation{% cite finagletut --file rpc %} + +```Scala +import com.twitter.finagle.{Http, Service} +import com.twitter.finagle.http +import com.twitter.util.{Await, Future} + +object Server extends App { + val service = new Service[http.Request, http.Response] { + def apply(req: http.Request): Future[http.Response] = + Future.value( + http.Response(req.version, http.Status.Ok) + ) + } + val server = Http.serve(":8080", service) + Await.ready(server) +} +``` + +A Filter object transforms requests for further processing in case additional customization is required from a request. These provide program-independent operations like, timeouts, etc. They take in a Service and provide a new Service object with the applied Filter. Aggregating multiple Filters is alos possible in Finagle. + +A sample timeout Filter that takes in a service and creates a new service with timeouts is shown below. This example is taken from Finagle documentation{% cite finagletut --file rpc %} + +```Scala +import com.twitter.finagle.{Service, SimpleFilter} +import com.twitter.util.{Duration, Future, Timer} + +class TimeoutFilter[Req, Rep](timeout: Duration, timer: Timer) + extends SimpleFilter[Req, Rep] { + + def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = { + val res = service(request) + res.within(timer, timeout) + } +} +``` + +#### Open Network Computing RPC(ONC RPC) +ONC was originally introduced as SunRPC {%cite sunnfs --file rpc %} for the Sun NFS. The Sun NFS system had a stateless server, with client-side caching, unique file handlers, and supported NFS read, write, truncate, unlink, etc operations. However, SunRPC was later revised as ONC in 1995 {%cite rfc1831 --file rpc %} and then in 2009 {%cite rfc5531 --file rpc %}. The IDL used in ONC(and SunRPC) is External Data Representation (XDR), a serialization mechanism specific to networks communication and therefore, ONC is limited to applications like Network File Systems. + +#### Mobile Assistance Using Infrastructure(MAUI) +The MAUI project {% cite maui --file rpc %}, developed by Microsoft is a computation offloading system for mobile systems. It's an automated system that offloads a mobile code to a dedicated infrastructure in order to increase the battery life of the mobile, minimize the load on the programmer and perform complex computations offsite. MAUI uses RPC as the communication protocol between the mobile and the infrastructure. + +#### gRPC + +gRPC is a multiplexed, bi-directional streaming RPC protocol developed Google and Square. The IDL for gRPC is Protocol Buffers(also referred as ProtoBuf) and is meant as a public replacement of Stubby, ARCWire, and Sake {% cite Apigee --file rpc %}. More details on Protocol Buffers, Stubby, ARCWire, and Sake are available in our gRPC chapter{% cite grpcchapter --file rpc %}. + +gRPC provides a platform for scalable, bi-directional streaming using both synchronized and asynchronous communication. + +In a general RPC mechanism, the client initiates a connection to the server and only the client can *request* while the server can only *respond* to the incoming requests. However, in bi-directional gRPC streams, although the initial connection is initiated by the client(call it *endpoint 1*), once the connection is established, both the server(call it *endpoint 2*) and the *endpoint 1* can send *requests* and receive *responses*. This significantly eases the development where both *endpoints* are communicating with each other(like, grid computing). It also saves the hassle of creating two separate connections between the endpoints (one from *endpoint 1* to *endpoint 2* and another from *endpoint 2* to *endpoint 1*) since both streams are independent. + +It multiplexes the requests over a single connection using header compression. This makes it possible for gRPC to be used for mobile clients where battery life and data usage are important. +The core library is in C -- except for Java and GO -- and surface APIs are implemented for all the other languages connecting through it{% cite CoreSurfaceAPIs --file rpc %}. + +Since Protocol Buffers has been utilized by many individuals and companies, gRPC makes it natural to extend their RPC ecosystems via gRPC. Companies like Cisco, Juniper and Netflix {% cite gRPCCompanies --file rpc %} have found it practical to adopt it. +A majority of the Google Public APIs, like their places and maps APIs, have been ported to gRPC ProtoBuf {% cite gRPCProtos --file rpc %} as well. + +More details about gRPC and bi-directional streaming can be found in our gRPC chapter {% cite grpcchapter --file rpc %} + +#### Cap'n Proto +CapnProto{% cite capnprotosecure --file rpc %} is a data interchange RPC system that bypasses data-encoding step(like JSON or ProtoBuf) to significantly improve the performance. It's developed by the original author of gRPC's ProtoBuf, but since it uses bytes(binary data) for encoding/decoding, it outperforms gRPC's ProtoBuf. It uses futures and promises to combine various remote operations into a single operation to save the transportation round-trips. This means if an client calls a function `foo` and then calls another function `bar` on the output of `foo`, Cap'n Proto will aggregate these two operations into a single `bar(foo(x))` where `x` is the input to the function `foo` {% cite capnprotosecure --file rpc %}. This saves multiple roundtrips, especially in object-oriented programs. + +### The Heir to the Throne: gRPC or Thrift + +Although there are many candidates to be considered as top contenders for RPC throne, most of these are targeted for a specific type of application. ONC is generally specific to the Network File System(though it's being pushed as a standard), Cap'n Proto is relatively new and untested, MAUI is specific to mobile systems, the open-source Finagle is primarily being used at Twitter(not widespread), and the Java RMI simply doesn't even come close due to its transparency issues(sorry to burst your bubble Java fans). + +Probably, the most powerful, and practical systems out there are Apache Thrift and Google's gRPC, primarily because these two systems cater to a large number of programming languages, have a significant performance benefit over other techniques and are being actively developed. + +Thrift was actually released a few years ago, while the first stable release for gRPC came out in August 2016. However, despite being 'out there', Thrift is currently less popular than gRPC {%cite trendrpcthrift --file rpc %}. + +gRPC {% cite gRPCLanguages --file rpc %} and Thrift, both, support most of the popular languages, including Java, C/C++, and Python. Thrift supports other languages, like Ruby, Erlang, Perl, Javascript, Node.js and OCaml while gRPC currently supports Node.js and Go. + +The gRPC core is written in C(with the exception of Java and Go) and wrappers are written in other languages to communicate with the core, while the Thrift core is written in C++. + +gRPC also provides easier bidrectional streaming communication between the caller and callee. The client generally initiates the communication {% cite gRPCLanguages --file rpc %} and once the connection is established the client and the server can perform reads and writes independently of each other. However, bi-directional streaming in Thrift might be a little difficult to handle, since it focuses explicitly on a client-server model. To enable bidirectional, async streaming, one may have to run two separate systems {%cite grpcbetter --file rpc%}. + +Thrift provides exception-handling as a message while the programmer has to handle exceptions in gRPC. In Thrift, exceptions can be returned built into the message, while in gRPC, the programmer explicitly defines this behavior. This Thrift exception-handling makes it easier to write client-side applications. + +Although custom authentication mechanisms can be implemented in both these system, gRPC come with a Google-backed authentication using SSL/TLS and Google Tokens {% cite grpcauth --file rpc %}. + +Moreover, gRPC-based network communication is done using HTTP/2. HTTP/2 makes it feasible for communicating parties to multiplex network connections using the same port. This is more efficient(in terms of memory usage) as compared to HTTP/1.1. Since gRPC communication is done HTTP/2, it means that gRPC can easily multiplex different services. As for Thrift, multiplexing services is possible, however, due to lack of support from underlying transport protocol, it is performed using a `TMulitplexingProcessor` class(in code) {% cite multiplexingthrift --file rpc %}. + +However, both gRPC and Thrift allow async RPC calls. This means that a client can send a request to the server and continue with its execution and the response from the server is processed it arrives. + + +The major comparison between gRPC and Thrift can be summed in this table. + +| Comparison | Thrift | gRPC | +| ----- | ----- | ----- | +| License | Apache2 | BSD | +| Sync/Async RPC | Both | Both | +| Supported Languages | C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml | C/C++, Python, Go, Java, Ruby, PHP, C#, Node.js, Objective-C | +| Core Language | C++| C | +| Exceptions | Allows being built in the message | Implemented by the programmer | +| Authentication | Custom | Custom + Google Tokens | +| Bi-Directionality | Not straightforward | Straightforward | +| Multiplexing | Possible via `TMulitplexingProcessor` class | Possible via HTTP/2 | + +Although, it's difficult to specifically choose one over the other, however, with increasing popularity of gRPC, and the fact that it's still in early stages of development, the general trend{%cite trendrpcthrift --file rpc %} over the past year has started to shift in favor of gRPC and it's giving Thrift a run for its money. Although, it may not be considered as a metric, but the gRPC was searched, on average, three times more as compared to Thrift{%cite trendrpcthrift --file rpc %}. + +**Note:** This comparison is performed in December 2016 so the results are expected to change with time. + +## Applications: + +Since its inception, various papers have been published in applying RPC paradigm to different domains, as well as using RPC implementations to create new systems. Here are some of applications and systems that incorporated RPC. + +#### Shared State and Persistence Layer + +One major limitation(and the advantage) of RPC is considered the separate *address space* of all the machines in the network. This means that *pointers* or *references* to a data object cannot be passed between the caller and the callee. Therefore, Interweave {% cite interweave2 interweave1 interweave3 --file rpc %} is a *middleware* system that allows scalable sharing of arbitrary datatypes and language-independent processes running on heterogeneous hardware. Interweave is specifically designed and is compatible with RPC-based systems and allows easier access to the shared resources between different applications using memory blocks and locks. + +Although research has been done in order to ensure a global shared state for an RPC-based system, However, these systems tend to take away the sense of independence and modularity between the *caller* and the *callee* by using a shared storage instead of a separate *address space*. + +#### GridRPC + +Grid computing is one of the most widely used applications of RPC paradigm. At a high level, it can be seen as a mesh (or a network) of computers connected with each other to for *grid* such each system can leverage resources from any other system in the network. + +In the GridRPC paradigm, each computer in the network can act as the *caller* or the *callee* depending on the amount of resources required {% cite grid1 --file rpc %}. It's also possible for the same computer to act as the *caller* as well as the *callee* for *different* computations. + +Some of the most popular implementations that allow one to have GridRPC-compliant middleware are GridSolve{% cite gridsolve1 gridsolve2 --file rpc %} and Ninf-G{% cite ninf --file rpc %}. Ninf is relatively older than GridSolve and was first published in the late 1990's. It's a simple RPC layer that also provides authentication and secure communication between the two parties. GridSolve, on the other hand, is relatively complex and provides a middleware for the communications using a client-agent-server model. + +#### Mobile Systems and Computation Offloading + +Mobile systems have become very powerful these days. With multi-core processors and gigabytes of RAM, they can undertake relatively complex computations without a hassle. Due to this advancement, they consume a larger amount of energy and hence, their batteries, despite becoming larger, drain quickly with usage. Moreover, mobile data (network bandwidth) is still limited and expensive. Due to these requirements, it's better to offload mobile computations from mobile systems when possible. RPC plays an important role in the communication for this *computation offloading*. Some of these services use Grid RPC technologies to offload this computation. Whereas, other technologies use an RMI(Remote Method Invocation) system for this. + +The Ibis Project {% cite ibis --file rpc %} builds an RMI(similar to JavaRMI) and GMI (Group Method Invocation) model to facilitate outsourcing computation. Cuckoo {% cite cuckoo --file rpc %} uses this Ibis communication middleware to offload computation from applications(built using Cuckoo) running on Android smartphones to remote Cuckoo servers. + +The Microsoft's MAUI Project {% cite maui --file rpc %} uses RPC communication and allows partitioning of .NET applications and "fine-grained code offload to maximize energy savings with minimal burden on the programmer". MAUI decides the methods to offload to the external MAUI server at runtime. + +#### Async RPC, Futures and Promises + +Remote Procedure Calls can be asynchronous. Not only that but these async RPCs play in integral role in the *futures* and *promises*. *Future* and *promises* are programming constructs that where a *future* is seen as variable/data/return type/error while a *promise* is seen as a *future* that doesn't have a value, yet. We follow Finagle's {% cite finagle --file rpc %} definition of *futures* and *promises*, where the *promise* of a *future*(an empty *future*) is considered as a *request* while the async fulfillment of this *promise* by a *future* is seen as the *response*. This construct is primarily used for asynchronous programming. + +Perhaps the most renowned systems using this type of RPC model are Twitter's Finagle{% cite finagle --file rpc %} and Cap'n Proto{% cite capnprotosecure --file rpc %}. + +#### RPC in Microservices Ecosystem: + +RPC implementations have moved from a one-server model to multiple servers and on to dynamically-created, load-balanced microservices. RPC started as separate implementations of REST, Streaming RPC, MAUI, gRPC, Cap'n Proto, and has now made it possible for integration of all these implementations as a single abstraction as a user *endpoint*. The endpoints are the building blocks of *microservices*. A *microservice* is usually *service* with a very simple, well-defined purpose, written in almost any language that interacts with other microservices to give the feel of one large monolithic *service*. These microservices are language-agnostic. One *microservice* for airline tickets written in C/C\++, might be communicating with a number of other microservices for individual airlines written in different languages(Python, C\++, Java, Node.js) using a language-agnostic, asynchronous, RPC framework like gRPC{%cite grpc --file rpc %} or Thrift{%cite thrift --file rpc %}. + +The use of RPC has allowed us to create new microservices on-the-fly. The microservices can not only created and bootstrapped at runtime but also have inherent features like load-balancing and failure-recovery. This bootstrapping might occur on the same machine, adding to a Docker container {% cite docker --file rpc %}, or across a network (using any combination of DNS, NATs or other mechanisms). + +RPC can be defined as the "glue" that holds all the microservices together{% cite microservices1rpc --file rpc %}. This means that RPC is one of the primary communication mechanism between different microservices running on different systems. A microservice requests another microservice to perform an operation/query. The other microservice, upon receiving such request, performs an operation and returns a response. This operation could vary from a simple computation to invoking another microservice creating a series of RPC events to creating new microservices on the fly to dynamically load balance the microservices system. These microservices are language-agnostic. One *microservice* could be written in C/C++, another one could be in different languages(Python, C++, Java, Node.js) and they all might be communicating with each other using a language-agnostic, asynchronous, performant RPC framework like gRPC{%cite grpc --file rpc %} or Thrift{%cite thrift --file rpc %}. + +An example of a microservices ecosystem that uses futures/promises is Finagle{%cite finagle --file rpc %} at Twitter. + +## Security in RPC: + +The initial RPC implementation {% cite implementingrpc --file rpc %} was developed for an isolated LAN network and didn't focus much on security. There're various attack surfaces in that model, from the malicious registry to a malicious server, to a client targeting for Denial-of-Service to Man-in-the-Middle attack between client and server. + +As time progressed and internet evolved, new standards came along, and RPC implementations became much more secure. Security, in RPC, is generally added as a *module* or a *package*. These modules have libraries for authentication and authorization of the communication services (caller and callee). These modules are not always bug-free and it's possible to gain unauthorized access to the system. Efforts are being made to rectify these situations by the security in general, using code inspection and bug bounty programs to catch these bugs beforehand. However, with time new bugs arise and this cycle continues. It's a vicious cycle between attackers and security experts, both of whom tries to outdo their opponent. + +For example, the Oracle Network File System uses a *Secure RPC*{% cite oraclenfs --file rpc %} to perform authentication in the NFS. This *Secure RPC* uses Diffie-Hellman authentication mechanism with DES encryption to allow only authorized users to access the NFS. Similarly, Cap'n Proto {% cite capnprotosecure --file rpc %} claims that it is resilient to memory leaks, segfaults, and malicious inputs and can be used between mutually untrusting parties. However, in Cap'n Proto "the RPC layer is not robust against resource exhaustion attacks, possibly allowing denials of service", nor has it undergone any formal verification {% cite capnprotosecure --file rpc %}. + +Although, it's possible to come up with a *Threat Model* that would make an RPC implementation insecure to use, however, one has to understand that using any distributed system increases the attack surface anyways and claiming one *paradigm* to be more secure than another would be a biased statement, since *paradigms* are generally an idea and it depends on different system designers to use these *paradigms* to build their systems and take care of features specific to real systems, like security and load-balancing. There's always a possibility of rerouting a request to a malicious server(if the registry gets hacked), or there's no trust between the *caller* and *callee*. However, we maintain that RPC *paradigm* is not secure or insecure(for that matter), and that the most secure systems are the ones that are in an isolated environment, disconnected from the public internet with a self-destruct mechanism{% cite selfdest --file rpc %} in place, in an impenetrable bunker, and guarded by the Knights Templar(*they don't exist! Well, maybe Fort Meade comes close*). + +## Discussion: + +RPC *paradigm* shines the most in *request-response* mechanisms. Futures and Promises also appear to a new breed of RPC. This leads one to question, as to whether every *request-response* system is a modified implementation to of the RPC *paradigm*, or does it actually bring anything new to the table? These modern communication protocols, like HTTP and REST, might just be a different flavor of RPC. In HTTP, a client *requests* a web page(or some other content), the server then *responds* with the required content. The dynamics of this communication might be slightly different from your traditional RPC, however, an HTTP Stateless server adheres to most of the concepts behind RPC *paradigm*. Similarly, consider sending a request to your favorite Google API. Say, you want to translate your latitude/longitude to an address using their Reverse Geocoding API, or maybe want to find out a good restaurant in your vicinity using their Places API, you'll send a *request* to their server to perform a *procedure* that would take a few input arguments, like the coordinates, and return the result. Even though these APIs follow a RESTful design, it appears to be an extension to the RPC *paradigm*. + +RPC paradigm has evolved over time. It has evolved to the extent that, currently, it's become very difficult differentiate RPC from non-RPC. With each passing year, the restrictions and limitations of RPC evolve. Current RPC implementations even have the support for the server to *request* information from the client to *respond* to these requests and vice versa (bidirectionality). This *bidirectional* nature of RPCs have transitioned RPC from simple *client-server* model to a set of *endpoints* communicating with each other. + +For the past four decades, researchers and industry leaders have tried to come up with *their* definition of RPC. The proponents of RPC paradigm view every *request-response* communication as an implementation the RPC paradigm while those against RPC try to explicitly enumerate the limitations of RPC. These limitations, however, seem to slowly vanish as new RPC models are introduced with time. RPC supporters consider it as the Holy Grail of distributed systems. They view it as the foundation of modern distributed communication. From Apache Thrift and ONC to HTTP and REST, they advocate it all as RPC while REST developers have strong opinions against RPC. + +Moreover, with modern global storage mechanisms, the need for RPC systems to have a separate *address space* seems to be slowly dissolving and disappearing into thin air. So, the question remains what *is* RPC and what * is not* RPC? This is an open-ended question. There is no unanimous agreement about what RPC should look like, except that it has communication between two *endpoints*. What we think of RPC is: + +*In the world of distributed systems, where every individual component of a system, be it a hard disk, a multi-core processor, or a microservice, is an extension of the RPC, it's difficult to come with a concrete definition of the RPC paradigm. Therefore, anything loosely associated with a request-response mechanism can be considered as RPC.* + +<blockquote> +<p align="center"> +<em>**RPC is not dead, long live RPC!**</em> +</p> +</blockquote> ## References -{% bibliography --file rpc %}
\ No newline at end of file +{% bibliography --file rpc --cited %} diff --git a/chapter/3/message-passing.md b/chapter/3/message-passing.md index 5898e23..ee489ff 100644 --- a/chapter/3/message-passing.md +++ b/chapter/3/message-passing.md @@ -1,11 +1,250 @@ --- layout: page -title: "Message Passing" -by: "Joe Schmoe and Mary Jane" +title: "Message Passing and the Actor Model" +by: "Nathaniel Dempkowski" --- -Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. {% cite Uniqueness --file message-passing %} +# Introduction -## References +Message passing programming models have essentially been discussed since the beginning of distributed computing and as a result message passing can be taken to mean a lot of things. If you look up a broad definition on Wikipedia, it includes things like RPC, CSP, and MPI. In practice when people talk about message passing today they mostly mean the actor model. -{% bibliography --file message-passing %}
\ No newline at end of file +In the field of message passing programming models, it is not only important to consider recent state of the art research, but additionally the historic initial papers on message passing and the actor model that are the roots of the programming models described in newer papers. It is enlightening to see which aspects of the models have stuck around, and many of the newer papers reference and address deficiencies present in older papers. There have been plenty of programing languages designed around message passing, especially those focused on the actor model of programming and organizing units of computation. + +In this chapter I describe four different actor models: classic actors, process-based actors, communicating event-loops, and active objects. I attempt to highlight historic and modern languages that exemplify these models, as well as the philosophies and tradeoffs that programmers need to be aware of to understand these models. + +Actor programming models are continuing to develop and become more robust, as some of the recently published papers and systems in the field show. There are a few robust industrial-strength actor systems that are being used to power massive scalable distributed systems. There are a couple of different approaches to building actor frameworks that are detailed later in the chapter. + +I think an important framing for the models and sources presented is “Why message passing, and specifically why the actor model?” There are a number of distributed programming models, so why was this one so important when it was initially proposed? What are the advantages of it for the programmer? Why has it facilitated advanced languages, systems, and libraries that are widely used today? The broad advantages of the actor model are around isolation of state, scalability, and simplifying the programmer's ability to reason about their system. + +# Original proposal of the actor model + +The actor model was originally proposed in _A Universal Modular ACTOR Formalism for Artificial Intelligence_ in 1973 as a method of computation for artificial intelligence research. The original goal of the model was to model parallel computation in communication in a way that could be safely distributed concurrently across workstations. The paper makes few presumptions about implementation details, instead defining the high-level message passing communication model. + +They define actors as independent units of computation with isolated state. These units can send messages to one another, and have a mailbox which contains messages they have received. These messages are of the form: + +``` +(request: <message-to-target> + reply-to: <reference-to-messenger>) +``` + +Actors attempt to process messages from their mailboxes by matching their `request` field sequentially against patterns or rules which can be specific values or logical statements. When a pattern is matched, computation occurs and the result of that computation is implicitly returned to the reference in the message's `reply-to` field. This is a type of continuation, where the continuation is the message to another actor. These messages are one-way and make no claims about whether a message will ever be received in response. This model is limited compared to many of the others, but the early ideas of taking advantage of distribution of processing power to enable greater parallel computation are there. + +One interesting thing to note is that this original paper talks about actors in the context of hardware. They mention actors as almost another machine architecture. This paper describes the concepts of an "actor machine" and a "hardware actor" as the context for the actor model, which is totally different from the way we think about modern actors as abstracting away a lot of the hardware details we don't want to deal with. This concept reminds me of something like a Lisp machine, but built to specially utilize the actor model of computation for artificial intelligence. + +# Classic actor model + +The classic actor model was formalized as a unit of computation in Agha's _Concurrent Object-Oriented Programming_. The classic actor expands on the original proposal of actors, keeping the ideas of asynchronous communication through messages between isolated units of computation and state. The classic actor is formalized as the following primitive operations: + +* `create`: create an actor from a behavior description and a set of parameters, including other existing actors +* `send`: send a message to another actor +* `become`: have an actor replace their behavior with a new one + +As originally described, classic actors communicate by asynchronous message passing. They are a primitive independent unit of computation which can be used to build higher-level abstractions for concurrent programming. Actors are uniquely addressable, and have their own independent mailboxes or message queues. State changes using the classic actor model are specified and aggregated using the `become` operation. Each time an actor processes a communication it computes a behavior in response to the next type of communication it expects to process. A `become` operation's argument is another named behavior with some state to pass to that named behavior. + +For purely functional actors the new behavior would be identical to the original. For more complex actors however, this enables the aggregation of state changes at a higher level of granularity than something like a variable assignment. This enables flexibility in the behavior of an actor over time in response to the actions of other actors in the system. Additionally, this isolation changes the level at which one analyzes a system, freeing the programmer from worrying about interference during state changes. + +If you squint a little, this actor definition sounds similar to Alan Kay’s original definition of Object Oriented programming. This definition describes a system where objects have a behavior, their own memory, and communicate by sending and receiving messages that may contain other objects or simply trigger actions. Kay's ideas sound closer to what we consider the actor model today, and less like what we consider object-oriented programming. The focus is on designing the messaging and communications that dictate how objects interact. + +TODO: transition + +## Concurrent Object-Oriented Programming (1990) + +This is the seminal paper for the classic actor model, as it offers classic actors as a natural solution to solving problems at the intersection of two trends in computing: increased distributed computing resources and the rising popularity of object-oriented programming. The paper defines common patterns of parallelism: pipeline concurrency, divide and conquer, and cooperative problem solving. It then focuses on how the actor model can be used to solve these problems in an object-oriented style, and some of the challenges that arise with distributed actors and objects, as well as strategies and tradeoffs for communication and reasoning about behaviors. + +This paper looks at a lot of systems and languages that are implementing solutions in this space, and starts to actually identify some of the programmer-centric advantages of actors. One of the core languages used for examples in the paper is Rosette, but the paper largely focuses on the potential and benefits of the model. Agha claims the benefits of using objects stem from a separation of concerns. "By separating the specification of what is done (the abstraction) from how it is done (the implementation), the concept of objects provides modularity necessary for programming in the large. It turns out that concurrency is a natural consequence of the concept of objects." Splitting concerns into multiple pieces allows for the programmer to have an easier time reasoning about the behavior of the program. It also allows the programmer to use more flexible abstractions in their programs, as Agha states. "It is important to note that the actor languages give special emphasis to developing flexible program structures which simplify reasoning about programs." This flexibility turns out to be a highly discussed advantage which continues to be touted in modern actor systems. + +## Rosette + +Rosette was both a language for concurrent object-oriented programming of actors, as well as a runtime system for managing the usage of and access to resources by those actors. Rosette is mentioned throughout Agha's _Concurrent Object-Oriented Programming_, and the code examples given in the paper are written in Rosette. Agha is even an author on the Rosette paper, so its clear that Rosette is foundational to the classic actor model. It seems to be a language which almost defines what the classic actor model looks like in the context of concurrent object-oriented programming. + +The motivation behind Rosette was to provide strategies for dealing with problems like search, where the programmer needs a means to control how resources are allocated to sub-computations to optimize performance in the face of combinatorial explosion. This supports the use of concurrency in solving computationally intensive problems whose structure is not statically defined, but rather depends on some heuristic to return results. Rosette has an architecture which uses actors in two distinct ways. They describe two different layers with different responsibilities: + +* _Interface layer_: This implements mechanisms for monitoring and control of resources. The system resources and hardware are viewed as actors. +* _System environment_: This is comprised of actors who actually describe the behavior of concurrent applications and implement resource management policies based on the interface layer. + +The Rosette language has a number of object-oriented features, many of which we take for granted in modern object-oriented programming languages. It implements dynamic creation and modification of objects for extensible and reconfigurable systems, supports inheritance, and has objects which can be organized into classes. I think the more interesting characteristic is that the concurrency in Rosette is inherent and declarative rather than explicit as with many modern object-oriented languages. In Rosette, the concurrency is an inherent property of the program structure and resource allocation. This is different from a language like Java, where all of the concurrency is very explicit. The motivation behind this declarative concurrency comes from the heterogeneous nature of distributed concurrent computers. Different computers and architectures have varying concurrency characteristics, and the authors argue that forcing the programmer to tailor their concurrency to the specific machine makes it difficult to re-map a program to another one. I think this idea of using actors as a more flexible and natural abstraction over concurrency and distribution of resources is an important one which is seen in some form within many actor systems. + +Actors in Rosette are organized into three types of classes which describe different aspects of the actors within the system: + +* _Abstract classes_ specify requests, responses, and actions within the system which can be observed. The idea behind these is to expose the higher-level behaviors of the system, but tailor the actual actor implementations to the resource constraints of the system. +* _Representation classes_ specify the resource management characteristics of implementations of abstract classes. +* _Behavior classes_ specify the actual implementations of actors in given abstract and representation classes. + +These classes represent a concrete object-oriented abstraction to organize actors which handles the practical constraints of a distributed system. It represents a step in the direction of handling not just the information flow and behavior of the system, but the underlying hardware and resources. Rosette's model feels like a direct expression of those concerns which are something every actor system in production inevitably ends up addressing. + +## Akka + +Akka is an actively developed project built out of the work on [Scala Actors](#scala-actors) in Scala to provide the actor model of programming as a framework to Java and Scala. It is an effort to bring an industrial-strength actor model to the JVM runtime, which was not explicitly designed to support actors. There are a few notable changes from Scala Actors that make Akka worth mentioning, especially as it is being actively developed while Scala Actors is not. + +Akka provides a programming interface with both Java and Scala bindings for actors which looks similar to Scala Actors, but has different semantics in how it processes messages. Akka's `receive` operation defines a global message handler which doesn't block on the receipt of no matching messages, and is instead only triggered when a matching message can be processed. It also will not leave a message in an actor's mailbox if there is no matching patter to handle the message. The message will simply be discarded an an event will be published to the system. Akka's interface also provides stronger encapsulation to avoid exposing direct references to actors. To some degree this fixes problems in Scala Actors where public methods could be called on actors, breaking many of the guarantees programmers expect from message-passing. This system is not perfect, but in most cases it limits the programmer to simply sending messages to an actor using a limited interface. + +The Akka runtime also provides performance advantages over Scala Actors. The runtime uses a single continuation closure for many or all messages an actor processes, and provides methods to change this global continuation. This can be implemented more efficiently on the JVM, as opposed to Scala Actors' continuation model which uses control-flow exceptions which cause additional overhead. Additionally, nonblocking message insert and task schedule operations are used for extra performance. + +Akka is the production-ready result of the classic actor model lineage. It is actively developed and actually used to build scalable systems. More detail about this is given when describing the production usage of actors. + +# Process-based actors + +The process-based actor model is essentially an actor modeled as a process that runs from start to completion. This view is broadly similar to the classic actor, but different mechanics exist around managing the lifecycle and behaviors of actors between the models. The first language to explicitly implement this model is Erlang, and they even say in a retrospective that their view of computation is broadly similar to the Agha's classic actor model. + +Process-based actors are defined as a computation which runs from start to completion, rather than the classic actor model, which defines an actor almost as a state machine of behaviors and the logic to transition between those. Similar state-machine like behavior transitions are possible through recursion with process-based actors, but programming them feels fundamentally different than using the previously described `become` statement. + +These actors use a `receive` primitive to specify messages that an actor can receive during a given state/point in time. `receive` statements have some notion of defining acceptable messages, usually based on patterns, conditionals or types. If a message is matched, corresponding code is evaluated, but otherwise the actor simply blocks until it gets a message that it knows how to handle. Depending on the language implementation `receive` might specify an explicit message type or perform some pattern matching on message values. + +## Erlang + +Erlang's implementation of process-based actors gets to the core of what it means to be a process-based actor. Erlang was the origin of the process-based actor model. The Ericsson company originally developed this model to program large highly-reliable fault-tolerant telecommunications switching systems. Erlang's development started in 1985, but its model of programming is still used today. The motivations of the Erlang model were around four key properties that were needed to program fault-tolerant operations: + +* Isolated processes +* Pure message passing between processes +* Detection of errors in remote processes +* The ability to determine what type of error caused a process crash + +The Erlang researchers initially believed that shared-memory was preventing fault-tolerance and they saw message-passing of immutable data between processes as the solution to avoiding shared-memory. There was a concern that passing around and copying data would be costly, but the Erlang developers saw fault-tolerance as a more important concern than performance. This model was essentially developed independently from other actor systems and research, especially as its development was started before Agha's classic actor model formalization was even published, but it ends up with a broadly similar view of computation to Agha's classic actor model. + +Erlang actors run as lightweight isolated processes. They do not have visibility into one another, and pass around pure messages, which are immutable. These have no dangling pointers or data references between objects, and really enforce the idea of immutable separated data between actors unlike many of the early classic actor implementations in which references to actors and data can be passed around freely. + +Erlang implements a blocking `receive` operation as a means of processing messages from a processes' mailbox. They use value matching on message tuples as a means of describing the types of messages a given actor can accept. + +Erlang also seeks to build failure into the programming model, as one of the core assumptions of a distributed system is that things are going to fail. Erlang provides the ability for processes to monitor one another through two primitives: + +* `monitor`: one-way unobtrusive notification of process failure/shutdown +* `link`: two-way notification of process failure/shutdown allowing for coordinated termination + +These primitives can be used to construct complex hierarchies of supervision that can be used to handle failure in isolation, rather than failures impacting your entire system. Supervision hierarchies are notably almost the only scheme for fault-tolerance that exists in the world of actors. Almost every actor system that is used to build distributed systems takes a similar approach, and it seems to work. Erlang's philosophies used to build a reliable fault-tolerant telephone exchange seem to be broadly applicable to the fault-tolerance problems of distributed systems. + +## Scala Actors + +Scala Actors is an example of taking and enhancing the Erlang model while bringing it to a new platform. Scala Actors brings lightweight Erlang-style message-passing concurrency to the JVM and integrates it with the heavyweight thread/process concurrency models. This is stated well in the original paper about Scala Actors as "an impedance mismatch between message-passing concurrency and virtual machines such as the JVM." VMs usually map threads to heavyweight processes, but that a lightweight process abstraction reduces programmer burden and leads to more natural abstractions. The authors claim that “The user experience gained so far indicates that the library makes concurrent programming in a JVM-based system much more accessible than previous techniques.” + +The realization of this model depends on efficiently multiplexing actors to threads. This technique was originally developed in Scala actors, and later was adopted by Akka. This integration allows for Actors to invoke methods that block the underlying thread in a way that doesn't prevent actors from making process. This is important to consider in an event-driven system where handlers are executed on a thread pool, because the underlying event-handlers can't block threads without risking thread pool starvation. The end result here is that Scala Actors enabled a new lightweight concurrency primitive on the JVM, with enhancements over Erlang's model. In addition to the more natural abstraction, the Erlang model was further enhanced with Scala's type system and advanced pattern-matching capabilities. + +## Cloud Haskell + +Cloud Haskell is an extension or domain specific language of Haskell which essentially implements an enhanced version of the computational message-passing model of Erlang in Haskell. It enhances Erlang's model with advantages from Haskell's model of functional programming in the form of purity, types, and monads. Cloud Haskell enables the use of pure functions for remote computation, which means that these functions are idempotent and can be restarted or run elsewhere in the case of failure without worrying about side-effects or undo mechanisms. This alone isn't so different from Erlang, which operates on immutable data in the context of isolated memory. + +One of the largest improvements over Erlang is the introduction of typed channels for sending messages. These provide guarantees to the programmer about the types of messages their actors can handle, which is something Erlang lacks. In Erlang, all you have is dynamic pattern matching based on values patterns, and the hope that the wrong types of message don't get passed around your system. Cloud Haskell processes can also use multiple typed channels to pass messages between actors, rather than Erlang's single untyped channel. Haskell's monadic types make it possible for programmers to use a programming style, where they can ensure that pure and effective code are not mixed. This makes reasoning about where side-effects happen in your system easier. Cloud Haskell has shared memory within an actor process, which is useful for certain applications. This might sound like it could cause problems, but shared-memory structures are forbidden by the type system from being shared across actors. Finally, Cloud Haskell allows for the serialization of function closures, which means that higher-order functions can be distributed across actors. This means that as long as a function and its environment are serializable, they can be spun off as a remote computation and seamlessly continued elsewhere. These improvements over Erlang make Cloud Haskell a notable project in the space of process-based actors. Cloud Haskell is currently supported and also has developed the Cloud Haskell Platform, which aims to provide common functionality needed to build and manage a production actor system using Cloud Haskell. + +# Communicating event-loops + +The communicating event-loop model was introduced in the E language, and is one that aims to change the level of granularity at which communication happens within an actor-based system. The previously described actor systems organize communication at the actor level, while the communicating event model puts communication between actors in the context of actions on objects within those actors. The overall messages still reference higher-level actors, but those messages refer to more granular actions within an actor's state. + +## E Language + +The E language implements a model which is closer to imperative object-oriented programming. Within a single actor-like node of computation called a "vat" many objects are contained. This vat contains not just objects, but a mailbox for all of the objects inside, as well as a call stack for methods on those objects. There is a shared message queue and event-loop that acts as one abstraction barrier for computation across actors. The actual references to objects within a vat are used for addressing communication and computation across actors and operate at a different level of abstraction. + +When handing out references at a different level of granularity than actor-global, how do you ensure the benefits of isolation that the actor model provides? After all, by referencing objects inside of an actor from many places it sounds like we're just reinventing shared-memory problems. The answer is that E's reference-states define many of the isolation guarantees around computation that we expect from actors. Two different ways to reference objects are defined: + +* _Near reference_: This is a reference only possible between two objects in the same vat. These expose both synchronous immediate-calls and asynchronous eventual-sends. +* _Eventual reference_: This is a reference which crosses vat boundaries, and only exposes asynchronous eventual-sends, not synchronous immediate-calls. + +The difference in semantics between the two types of references means that only objects within the same vat are granted synchronous access to one another. The most an eventual reference can do is asynchronously send and queue a message for processing at some unspecified point in the future. This means that within the execution of a vat, a degree of temporal isolation can be defined between the objects and communications within the vat, and the communications to and from other vats. + +The motivation for this referencing model comes from wanting to work at a finer-grained level of references than a traditional actor exposes. The simplest example is that you want to ensure that another actor in your system can read a value, but can't write to it. How do you do that within another actor model? You might imagine creating a read-only variant of an actor which doesn't expose a write message type, or proxies only `read` messages to another actor which supports both `read` and `write` operations. In E because you are handing out object references, you would simply only pass around references to a `read` method, and you don't have to worry about other actors in your system being able to write values. These finer-grained references make reasoning about state guarantees easier because you are no longer exposing references to an entire actor, but instead the granular capabilities of the actor. + +TODO: write more here, maybe something around promise pipelining and partial failure? implications of different types of communication? maybe mention some of the points that inspire some aspects of modern actors? + +## AmbientTalk/2 + +AmbientTalk/2 is a modern revival of the communicating event-loops actor model as a distributed programming language with an emphasis on developing mobile peer-to-peer applications. This idea was originally realized in AmbientTalk/1 where actors were modelled as ABCL/1-like active objects, but AmbientTalk/2 models actors similarly to E's vats. The authors of AmbientTalk/2 felt limited by not allowing passive objects within an actor to be referenced by other actors, so they chose to go with the more fine-grained approach which allows for remote interactions between and movement of passive objects. + +Actors in AmbientTalk/2 are representations of event loops. The message queue is the event queue, messages are events, asynchronous message sends are event notifications, and object methods are the event handlers. The event loop serially processes messages from the queue to avoid race conditions. Local objects within an actor are owned by that actor, which is the only entity allowed to directly execute methods on them. Like E, objects within an actor can communicate using synchronous or asynchronous methods of communication. Again similar to E, objects that are referenced outside of an actor can only be communicated to asynchronously by sending messages. Objects can additionally declare themselves serializable, which means they can be copied and sent to other actors for use as local objects. When this happens, there is no maintained relationship between the original object and its copy. + +AmbientTalk/2 uses the event loop model to enforce three essential concurrency control properties: + +* _Serial execution_: Events are processed sequentially from an event queue, so the handling of a single event is atomic with respect to other events. +* _Non-blocking communication_: An event loop doesn't suspend computation to wait for other event loops, instead all communication happens strictly as asynchronous event notifications. +* _Exclusive state access_: Event handlers (object methods) and their associated state belong to a single event loop, which has access to their mutable state. Mutation of other event loop state is only possible indirectly by passing an event notification asking for mutation to occur. + +The end result of all this decoupling and isolation of computation is that it is a natural fit for mobile ad hoc networks. In this domain, connections are volatile with limited range and transient failures. Removing coupling based on time or synchronization is a natural fit for the domain, and the communicating event-loop actor model is a natural model for programming these systems. AmbientTalk/2 provides additional features on top of the communicating event-loop model like service discovery. These enable ad hoc network creation as actors near each other can broadcast their existence and advertise common services that can be used for communication. + +AmbientTalk/2 is most notable as a reimagining of the communicating event-loops actor model for a modern use case. This again speaks to the broader advantages of actors and their applicability to solving the problems of distributed systems. + +# Active Objects + +Active object actors draw a distinction between two different types of objects: active and passive objects. Every active object has a single entry point defining a fixed set of messages that are understood. Passive objects are the objects that are actually sent between actors, and are copied around to guarantee isolation. This enables a separation of concerns between data that relates to actor communication and data that relates to actor state and behavior. + +The active object model as initially described in the ABCL/1 language defines objects with a state and three modes: + +* `dormant`: Initial state of no computation, simply waiting for a message to activate the behavior of the actor. +* `active`: A state in which computation is performed that is triggered when a message is received that satisfies the patterns and constraints that the actor has defined it can process. +* `waiting`: A state of blocked execution, where the actor is active, but waiting until a certain type or pattern of message arrives to continue computation. + +## ABCL/1 Language + +The ABCL/1 language implements the active object model described above, representing a system as a collection of objects, and the interactions between those objects as concurrent messages being passed around. One interesting aspect of ABCL/1 is the idea of explicitly different modes of message passing. Other actor models generally have a notion of priority around the values, types, or patterns of messages they process, usually defined by the ordering of their receive operation, but ABCL/1 implements two different modes of message passing with different semantics. They have standard queued messages in the `ordinary` mode, but more interestingly they have `express` priority messages. When an object receives an express message it halts any other processing of ordinary messages it is performing, and processes the `express` message immediately. This enables an actor to accept high-priority messages while in `active` mode, and also enables monitoring and interrupting actors. + +The language also offers different models of synchronization around message-passing between actors. Three different message-passing models are given that enable different use cases: + +* `past`: Requests another actor to perform a task, while simultaneously proceeding with computation without waiting for the task to be completed. +* `now`: Waits for a message to be received, and to receive a response. This acts as a basic synchronization barrier across actors. +* `future`: Acts like a typical future, continuing computation until a remote result is needed, and then blocking until that result is received. + +It is interesting to note that all of these modes can be expressed by the `past` style of message-passing, as long as the type of the message and which actor to reply to with results are known. + +The key difference here is around how this different style of actors relates to managing their lifecycle. In the active object style, lifecycle is a result of messages or requests to actors, but in other styles we see a more explicit notion of lifecycle and creating/destroying actors. + +## Orleans + +Orleans takes the concept of actors whose lifecycle is dependent on messaging or requests and places them in the context of cloud applications. Orleans does this via actors (called "grains") which are isolated units of computation and behavior that can have multiple instantiations (called "activations") for scalability. These actors also have persistence, meaning they have a persistent state that is kept in durable storage so that it can be used to manage things like user data. + +Orleans uses a different notion of identity than other actor systems. In other systems an "actor" might refer to a behavior and instances of that actor might refer to identities that the actor represents like individual users. In Orleans, an actor represents that persistent identity, and the actual instantiations are in fact reconcilable copies of that identity. + +The programmer essentially assumes that a single entity is handling requests to an actor, but the Orleans runtime actually allows for multiple instantiations for scalability. These instantiations are invoked in response to an RPC-like call from the programmer which immediately returns an asynchronous promise. Multiple instances of an actor can be running and modifying the state of that actor at the same time. The immediate question here is how does that actually work? It doesn't intuitively seem like transparently accessing and changing multiple isolated copies of the same state should produce anything but problems when its time to do something with that state. + +Orleans solves this problem by providing mechanisms to reconcile conflicting changes. If multiple instances of an actor modify persistent state, they need to be reconciled into a consistent state in some meaningful way. The default here is a last-write-wins strategy, but Orleans also exposes the ability to create fine-grained reconciliation policies, as well as a number of common reconcilable data structures. If an application requires a certain reconciliation algorithm, the developer can implement it using Orleans. These reconciliation mechanisms are built upon Orleans' concept of transactions. + +Transactions in Orleans are a way to causally reason about the different instances of actors that are involved in a computation. Because in this model computation happens in response to a single outside request, a given actor's chain of computation via. associated actors always contains a single instantiation of each actor. These causal chain of instantiations is treated as a single transaction. At reconciliation time Orleans uses these transactions, along with current instantiation state to reconcile to a consistent state. + +All of this is a longwinded way of saying that Orleans' programmer-centric contributions are that it separates the concerns of running and managing actor lifecycles from the concerns of how data flows throughout your distributed system. It does this is a fault-tolerant way, and for many programming tasks, you likely wouldn't have to worry about scaling and reconciling data in response to requests. It provides the benefits of the actor model through a programming model that attempts to abstract away details that you would otherwise have to worry about when using actors in production. + +# Why the actor model? + +The actor programming model offers benefits to programmers of distributed systems by allowing for easier programmer reasoning about behavior, providing a lightweight concurrency primitive that naturally scales across many machines, and enabling looser coupling among components of a system allowing for change without service disruption. Actors enable a programmer to easier reason about their behavior because they are at a fundamental level isolated from other actors. When programming an actor, the programmer only has to worry about the behavior of that actor and the messages it can send and receive. This alleviates the need for the programmer to reason about an entire system. Instead the programmer has a fixed set of concerns, meaning they can ensure behavioral correctness in isolation, rather than having to worry about an interaction they hadn’t anticipated occurring. Actors provide a single means of communication (message-passing), meaning that a lot of concerns a programmer has around concurrent modification of data are alleviated. Data is restricted to the data within a single actor and the messages it has been passed, rather than all of the accessible data in the whole system. + +Actors are lightweight, meaning that the programmer usually does not have to worry about how many actors they are creating. This is a contrast to other fundamental units of concurrency like threads or processes, which a programmer has to be acutely aware of, as they incur high costs of creation, and quickly run into machine resource and performance limitations. Haller (2009) says that without a lightweight process abstraction, burden is increased on the programmer to write their code in an obscured style (Philipp Haller, 2009). Unlike threads and processes, actors can also easily be told to run on other machines as they are functionally isolated. This cannot traditionally be done with threads or processes, as they are unable to be passed over the network to run elsewhere. Messages can be passed over the network, so an actor does not have to care where it is running as long as it can send and receive messages. They are more scalable because of this property, and it means that actors can naturally be distributed across a number of machines to meet the load or availability demands of the system. + +Finally, because actors are loosely coupled, only depending on a set of input and output messages to and from other actors, their behavior can be modified and upgraded without changing the entire system. For example, a single actor could be upgraded to use a more performant algorithm to do its work, and as long as it can process the same input and output messages, nothing else in the system has to change. This isolation is a contrast to methods of concurrent programming like remote procedure calls, futures, and promises. These models emphasize a tighter coupling between units of computation, where a process may call a method directly on another process and expect a specific result. This means that both the caller and callee (receiver of the call) need to have knowledge of the code being run, so you lose the ability to upgrade one without impacting the other. This becomes a problem in practice, as it means that as the complexity of your distributed system grows, more and more pieces become linked together. Agha (1990) states, “It is important to note that the actor languages give special emphasis to developing flexible program structures which simplify reasoning about programs.” This is not desirable, as a key characteristic of distributed systems is availability, and the more things are linked together, the more of your system you have to take down or halt to make changes/upgrades. Actors compare favorably to other concurrent programming primitives like threads or remote procedure calls due to their low cost and loosely coupled nature. They are also programmer friendly, and ease the programmer burden of reasoning about a distributed system. + +# Modern usage in production + +It is important when reviewing models of programming distributed systems not to look just to academia, but to see which of these systems are actually used in industry to build things. This can give us insight into which features of actor systems are actually useful, and the trends that exist throughout these systems. + +_On the Integration of the Actor Model into Mainstream Technologies_ by Philipp Haller provides some insight into the requirements of an industrial-strength actor implementation on a mainstream platform. These requirements were drawn out of an initial effort with [Scala Actors](#scala-actors) to bring the actor model to mainstream software engineering, as well as lessons learned from the deployment and advancement of production actors in [Akka](#akka). + +* _Library-based implementation_: It is not obvious which concurrency abstraction wins in real world cases, and different concurrency models might be used to solve different problems, so implementing a concurrency model as a library enables flexibility in usage. +* _High-level domain-specific language_: A domain-specific language or something comparable is a requirement to compete with languages that specialize in concurrency, otherwise your abstractions are lacking in idioms and expressiveness. +* _Event-driven implementation_: Actors need to be lightweight, meaning they cannot be mapped to an entire VM thread or process. For most platforms this means an event-driven model. +* _High performance_: Most industrial applications that use actors are highly performance sensitive, and high performance enables more graceful scalability. +* _Flexible remote actors_: Many applications can benefit from remote actors, which can communicate transparently over the network. Flexibility in deployment mechanisms is also very important. + +These attributes give us a good basis for analyzing whether an actor system can be successful in production. These are attributes that are necessary, but not sufficient for an actor system to be useful in production. + +## Actors as a framework + +One trend that seems common among the actor systems we see in production is extensive environments and tooling. I would argue that Akka, Erlang, and Orleans are the primary actor systems that see real production use, and I think the reason for this is that they essentially act as frameworks where many of the common problems of actors are taken care of for you. This allows the programmer to focus on the problems within their domain, rather than the common problems of monitoring, deployment, and composition. + +Akka and Erlang provide modules that you can piece together to build various pieces of functionality into your system. Akka provides a huge number of modules and extensions to configure and monitor a distributed system built using actors. They provide a number of utilities to meet common use-case and deployment scenarios, and these are thoroughly listed and documented. Additionally they provide support for Akka Extensions, which are a mechanism for adding your own features to Akka. These are powerful enough that some core features of Akka like Typed Actors or Serialization are implemented as Akka Extensions. Erlang provides the Open Telecom Platform (OTP), which is a framework comprised of a set of modules and standards designed to help build applications. OTP takes the generic patterns and components of Erlang, and provides them as libraries that enable code reuse and best practices when developing new systems. Cloud Haskell also provides something analogous to Erlang's OTP called the Cloud Haskell Platform. + +Orleans is different from these as it is built from the ground up with a more declarative style and runtime. This does a lot of the work of distributing and scaling actors for you, but it is still definitely a framework which handles a lot of the common problems of distribution so that programmers can focus on building the logic of their system. + +## Module vs. managed runtime approaches + +Based on my research there have been two prevalent approaches to frameworks which are actually used to build production actor systems in industry. These are high-level philosophies about the meta-organization of an actor system. They are the design philosophies that aren't even directly considered when just looking at the base actor programming and execution models. I think the easiest way to describe these is are as the "module approach" and the "managed runtime approach". A high-level analogy to describe these is that the module approach is similar to manually managing memory, while the managed runtime approach is similar to garbage collection. In the module approach, you care about the lifecycle and physical allocation of actors within your system, while in the managed runtime approach you care more about the reconciliation behavior and flow of persistent state between automatic instantiations of your actors. + +Both Akka and Erlang take a module approach to building their actor systems. This means that when you build a system using these languages/frameworks, you are using smaller composable components as pieces of the larger system you want to build. You are explicitly dealing with the lifecycles and instantiations of actors within your system, where to distribute them across physical machines, and how to balance actors to scale. Some of these problems might be handled by libraries, but at some level you are specifying how all of the organization of your actors is happening. The JVM or Erlang VM isn't doing it for you. + +Orleans goes in another direction, which I call the managed runtime approach. Instead of providing small components which let you build your own abstractions, they provide a runtime in the cloud that attempts to abstract away a lot of the details of managing actors. It does this to such an extent that you no longer even directly manage actor lifecycles, where they live on machines, or how they are replicated and scaled. Instead you program with actors in a more declarative style. You never explicitly instantiate actors, instead you assume that the runtime will figure it out for you in response to requests to your system. You program in strategies to deal with problems like domain-specific reconciliation of data across instances, but you generally leave it to the runtime to scale and distribute the actor instances within your system. + +I don't have an opinion on which of these is right. Both approaches have been successful in industry. Erlang has the famous use case of a telephone exchange and a successful history since then. Akka has an entire page detailing its usage in giant companies. Orleans has been used as a backend to massive Microsoft-scale games and applications with millions of users. It seems like the module approach is more popular, but there's only really one example of the managed runtime approach out there. There's no equivalent to Orleans on the JVM or Erlang VM, so realistically it doesn't have as much exposure in the distributed programming community. + +## Comparison to Communicating Sequential Processes (CSP) + +TODO: where should this live in the chapter? + +You might argue that I've ignored some other concurrency primitives that could be considered message-passing or actors at some level. After all, from a high level a Goroutine with channels feels a bit like an actor. As does an RPC system which can buffer sequential calls. I think a lot of discussions of actors are looking at them form a not-so-useful level of abstraction. A lot of the discussions of actors simply take them as something that is a lightweight concurrency primitive which passes messages. I think this view is zoomed out too far, and misses many of the subtleties that differentiate these programming models. Many of these differences stem from the flexibility and scalability of actors. Trying to use CSP-like channels to build a scalable system like you would an actor system would arguably be a tightly-coupled nightmare. The advantages of actors are around the looser coupling, variable topology, and focus on isolation of state and behavior. CSP has a place in building systems, and has proven to be a popular concurrency primitive, but lumping actors in with CSP misses the point of both. Actors are operating at a fundamentally different level of abstraction from CSP. + +# References + +TODO: Add non-journal references + +{% bibliography --file message-passing %} diff --git a/chapter/4/dist-langs.md b/chapter/4/dist-langs.md index 9c8a8c9..1736064 100644 --- a/chapter/4/dist-langs.md +++ b/chapter/4/dist-langs.md @@ -8,4 +8,4 @@ Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor i ## References -{% bibliography --file dist-langs %}
\ No newline at end of file +{% bibliography --file dist-langs %} diff --git a/chapter/6/acidic-to-basic-how-the-database-ph-has-changed.md b/chapter/6/acidic-to-basic-how-the-database-ph-has-changed.md new file mode 100644 index 0000000..ffc94c0 --- /dev/null +++ b/chapter/6/acidic-to-basic-how-the-database-ph-has-changed.md @@ -0,0 +1,182 @@ +--- +layout: page +title: "ACIDic to BASEic: How the database pH has changed" +by: "Aviral Goel" +--- + +## 1. The **ACID**ic Database Systems + +Relational Database Management Systems are the most ubiquitous database systems for persisting state. Their properties are defined in terms of transactions on their data. A database transaction can be either a single operation or a sequence of operations, but is treated as a single logical operation on the data by the database. The properties of these transactions provide certain guarantees to the application developer. The acronym **ACID** was coined by Andreas Reuter and Theo Härder in 1983 to describe them. + +* **Atomicity** guarantees that any transaction will either complete or leave the database unchanged. If any operation of the transaction fails, the entire transaction fails. Thus, a transaction is perceived as an atomic operation on the database. This property is guaranteed even during power failures, system crashes and other erroneous situations. + +* **Consistency** guarantees that any transaction will always result in a valid database state, i.e., the transaction preserves all database rules, such as unique keys. + +* **Isolation** guarantees that concurrent transactions do not interfere with each other. No transaction views the effects of other transactions prematurely. In other words, they execute on the database as if they were invoked serially (though a read and write can still be executed in parallel). + +* **Durability** guarantees that upon the completion of a transaction, the effects are applied permanently on the database and cannot be undone. They remain visible even in the event of power failures or crashes. This is done by ensuring that the changes are committed to disk (non-volatile memory). + +<blockquote><p><b>ACID</b>ity implies that if a transaction is complete, the database state is structurally consistent (adhering to the rules of the schema) and stored on disk to prevent any loss.</p></blockquote> + +Because of the strong guarantees this model simplifies the life of the developer and has been traditionally the go to approach in application development. It is instructive to examine how these properties are enforced. + +Single node databases can simply rely upon locking to ensure *ACID*ity. Each transaction marks the data it operates upon, thus enabling the database to block other concurrent transactions from modifying the same data. The lock has to be acquired both while reading and writing data. The locking mechanism enforces a strict linearizable consistency, i.e., all transactions are performed in a particular sequence and invariants are always maintained by them. An alternative, *multiversioning* allows a read and write operation to execute in parallel. Each transaction which reads data from the database is provided the earlier unmodified version of the data that is being modified by a write operation. This means that read operations don't have to acquire locks on the database. This enables read operations to execute without blocking write operations and write operations to execute without blocking read operations. + +This model works well on a single node. But it exposes a serious limitation when too many concurrent transactions are performed. A single node database server will only be able to process so many concurrent read operations. The situation worsens when many concurrent write operations are performed. To guarantee *ACID*ity, the write operations will be performed in sequence. The last write request will have to wait for an arbitrary amount of time, a totally unacceptable situation for many real time systems. This requires the application developer to decide on a **Scaling** strategy. + +### 1.2. Scaling transaction volume + +To increase the volume of transactions against a database, two scaling strategies can be considered + +* **Vertical Scaling** is the easiest approach to scale a relational database. The database is simply moved to a larger computer which provides more transactional capacity. Unfortunately, its far too easy to outgrow the capacity of the largest system available and it is costly to purchase a bigger system each time that happens. Since its specialized hardware, vendor lock-in will add to further costs. + +* **Horizontal Scaling** is a more viable option and can be implemented in two ways. Data can be segregated into functional groups spread across databases. This is called *Functional Scaling*. Data within a functional group can be further split across multiple databases, enabling functional areas to be scaled independently of one another for even more transactional capacity. This is called *sharding*. + +Horizontal Scaling through functional partitioning enables high degree of scalability. However, the functionally separate tables employ constraints such as foreign keys. For these constraints to be enforced by the database itself, all tables have to reside on a single database server. This limits horizontal scaling. To work around this limitation the tables in a functional group have to be stored on different database servers. But now, a single database server can no longer enforce constraints between the tables. In order to ensure *ACID*ity of distributed transactions, distributed databases employ a two-phase commit (2PC) protocol. + +* In the first phase, a coordinator node interrogates all other nodes to ensure that a commit is possible. If all databases agree then the next phase begins, else the transaction is canceled. + +* In the second phase, the coordinator asks each database to commit the data. + +2PC is a blocking protocol and updates can take from a few milliseconds up to a few minutes to commit. This means that while a transaction is being processed, other transactions will be blocked. So the application that initiated the transaction will be blocked. Another option is to handle the consistency across databases at the application level. This only complicates the situation for the application developer who is likely to implement a similar strategy if *ACID*ity is to be maintained. + +## 2. The Distributed Concoction + +A distributed application is expected to have the following three desirable properties: + +1. **Consistency** - This is the guarantee of total ordering of all operations on a data object such that each operation appears indivisible. This means that any read operation must return the most recently written value. This provides a very convenient invariant to the client application. This definition of consistency is the same as the **Atomic**ity guarantee provided by relational database transactions. + +2. **Availability** - Every request to a distributed system must result in a response. However, this is too vague a definition. Whether a node failed in the process of responding or it ran a really long computation to generate a response or whether the request or the response got lost due to network issues is generally impossible to determine by the client and willHence, for all practical purposes, availability can be defined as the service responding to a request in a timely fashion, the amount of delay an application can bear depends on the application domain. + +3. **Partition Tolerance** - Partitioning is the loss of messages between the nodes of a distributed system. During a network partition, the system can lose arbitrary number of messages between nodes. A partition tolerant system will always respond correctly unless a total network failure happens. + +Consistency requirement implies that every request will be treated atomically by the system even if the nodes lose messages due to network partitions. +Availability requirement implies that every request should receive a response even if a partition causes messages to be lost arbitrarily. + +## 3. The CAP Theorem + + + +In the network above, all messages between the node set M and N are lost due to a network issue. The system as a whole detects this situation. There are two options - + +1. **Availability first** - The system allows any application to read and write to data objects on these nodes independently even though they are not able to communicate. The application writes to a data object on node M. Due to **network partition**, this change is not propagated to replicas of the data object in N. Subsequently, the application tries to read the value of that data object and the read operation executes in one of the nodes of N. The read operation returns the older value of the data object, thus making the application state not **consistent**. + +2. **Consistency first** - The system does not allow any application to write to data objects as it cannot ensure **consistency** of replica states. This means that the system is perceived to be **unavailable** by the applications. + +If there are no partitions, clearly both consistency and availability can be guaranteed by the system. This observation led Eric Brewer to conjecture in an invited talk at PODC 2000- + +<blockquote>It is impossible for a web service to provide the following three guarantees: +Consistency +Availability +Partition Tolerance</blockquote> + +This is called the CAP theorem. + +It is clear that the prime culprit here is network partition. If there are no network partitions, any distributed service will be both highly available and provide strong consistency of shared data objects. Unfortunately, network partitions cannot be remedied in a distributed system. + +## 4. Two of Three - Exploring the CAP Theorem + +The CAP theorem dictates that the three desirable properties, consistency, availability and partition tolerance cannot be offered simultaneously. Let's study if its possible to achieve two of these three properties. + +### Consistency and Availability +If there are no network partitions, then there is no loss of messages and all requests receive a response within the stipulated time. It is clearly possible to achieve both consistency and availability. Distributed systems over intranet are an example of such systems. + +### Consistency and Partition Tolerance +Without availability, both of these properties can be achieved easily. A centralized system can provide these guarantees. The state of the application is maintained on a single designated node. All updates from the client are forwarded by the nodes to this designated node. It updates the state and sends the response. When a failure happens, then the system does not respond and is perceived as unavailable by the client. Distributed locking algorithms in databases also provide these guarantees. + +### Availability and Partition Tolerance +Without atomic consistency, it is very easy to achieve availability even in the face of partitions. Even if nodes fail to communicate with each other, they can individually handle query and update requests issued by the client. The same data object will have different states on different nodes as the nodes progress independently. This weak consistency model is exhibited by web caches. + +Its clear that two of these three properties are easy to achieve in any distributed system. Since large scale distributed systems have to take partitions into account, will they have to sacrifice availability for consistency or consistency for availability? Clearly giving up either consistency or availability is too big a sacrifice. + +## 5. The **BASE**ic distributed state + +When viewed through the lens of CAP theorem and its consequences on distributed application design, we realize that we cannot commit to perfect availability and strong consistency. But surely we can explore the middle ground. We can guarantee availability most of the time with occasional inconsistent view of the data. The consistency is eventually achieved when the communication between the nodes resumes. This leads to the following properties of the current distributed applications, referred to by the acronym BASE. + +* **Basically Available** services are those which are partially available when partitions happen. Thus, they appear to work most of the time. Partial failures result in the system being unavailable only for a section of the users. + +* **Soft State** services provide no strong consistency guarantees. They are not write consistent. Since replicas may not be mutually consistent, applications have to accept stale data. + +* **Eventually Consistent** services try to make application state consistent whenever possible. + +## 6. Partitions and latency +Any large scale distributed system has to deal with latency issue. In fact, network partitions and latency are fundamentally related. Once a request is made and no response is received within some duration, the sender node has to assume that a partition has happened. The sender node can take one of the following steps: + +1) Cancel the operation as a whole. In doing so, the system is choosing consistency over availability. +2) Proceed with the rest of the operation. This can lead to inconsistency but makes the system highly available. +3) Retry the operation until it succeeds. This means that the system is trying to ensure consistency and reducing availability. + +Essentially, a partition is an upper bound on the time spent waiting for a response. Whenever this upper bound is exceeded, the system chooses C over A or A over C. Also, the partition may be perceived only by two nodes of a system as opposed to all of them. This means that partitions are a local occurrence. + +## 7. Handling Partitions +Once a partition has happened, it has to be handled explicitly. The designer has to decide which operations will be functional during partitions. The partitioned nodes will continue their attempts at communication. When the nodes are able to establish communication, the system has to take steps to recover from the partitions. + +### 7.1. Partition mode functionality +When at least one side of the system has entered into partition mode, the system has to decide which functionality to support. Deciding this depends on the invariants that the system must maintain. Depending on the nature of problem, the designer may choose to compromise on certain invariants by allowing partitioned system to provide functionality which might violate them. This means the designer is choosing availability over consistency. Certain invariants may have to be maintained and operations that will violate them will either have to be modified or prohibited. This means the designer is choosing consistency over availability. +Deciding which operations to prohibit, modify or delay also depends on other factors such as the node. If the data is stored on the same node, then operations on that data can typically proceed on that node but not on other node. +In any event, the bottomline is that if the designer wishes for the system to be available, certain operations have to be allowed. The node has to maintain a history of these operations so that it can be merged with the rest of the system when it is able to reconnect. +Since the operations can happen simultaneously on multiple disconnected nodes, all sides will maintain this history. One way to maintain this information is through version vectors. +Another interesting problem is to communicate the progress of these operations to the user. Until the system gets out of partition mode, the operations cannot be committed completely. Till then, the user interface has to faithfully represent their incomplete or in-progress status to the user. + +### 7.2. Partition Recovery +When the partitioned nodes are able to communicate, they have to exchange information to maintain consistency. Both sides continued in their independent direction but now the delayed operations on either side have to be performed and violated invariants have to be fixed. Given the state and history of both sides, the system has to accomplish the following tasks. + +#### 7.2.1. Consistency +During recovery, the system has to reconcile the inconsistency in state of both nodes. This is relatively straightforward to accomplish. One approach is to start from the state at the time of partition and apply operations of both sides in an appropriate manner, ensuring that the invariants are maintained. Depending on operations allowed during the partition phase, this process may or may not be possible. The general problem of conflict resolution is not solvable but a restricted set of operations may ensure that the system can always always merge conflicts. For example, Google Docs limits operations to style and text editing. But source-code control systems such as Concurrent Versioning System (CVS) may encounter conflict which require manual resolution. Research has been done on techniques for automatic state convergence. Using commutative operations allows the system to sort the operations in a consistent global order and execute them. Though all operations can't be commutative, for example - addition with bounds checking is not commutative. Mark Shapiro and his colleagues at INRIA have developed *commutative replicated data types (CRDTs)* that provably converge as operations are performed. By implementing state through CRDTs, we can ensure Availability and automatic state convergence after partitions. + +#### 7.2.2. Compensation +During partition, its possible for both sides to perform a series of actions which are externalized, i.e. their effects are visible outside the system. To compensate for these actions, the partitioned nodes have to maintain a history. +For example, consider a system in which both sides have executed the same order during a partition. During the recovery phase, the system has to detect this and distinguish it from two intentional orders. Once detected, the duplicate order has to be rolled back. If the order has been committed successfully then the problem has been externalized. The user will see twice the amount deducted from his account for a single purchase. Now, the system has to credit the appropriate amount to the user's account and possibly send an email explaining the entire debacle. All this depends on the system maintaining the history during partition. If the history is not present, then duplicate orders cannot be detected and the user will have to catch the mistake and ask for compensation. +It would have been great if the duplicate order was not issued by the system in the first place. But the requirement to maintain system availability trumps consistency. Mistakes in such cases cannot always be corrected internally. But by admitting them and compensating for them, the system arguably exhibits equivalent behavior. + +### 8. What's the right pH for my distributed solution? + +Whether an application chooses to be an *ACID*ic or *BASE*ic service depends on the domain. An application developer has to consider the consistency-availability tradeoff on a case by case basis. *ACID*ic databases provide a very simple and strong consistency model making application development easy for domains where data inconsistency cannot be tolerated. *BASE*ic systems provide a very loose consistency model, placing more burden on the application developer to understand the invariants and manage them carefully during partitions by appropriately limiting or modifying the operations. + +## 9. References + +https://neo4j.com/blog/acid-vs-base-consistency-models-explained/ +https://en.wikipedia.org/wiki/Eventual_consistency/ +https://en.wikipedia.org/wiki/Distributed_transaction +https://en.wikipedia.org/wiki/Distributed_database +https://en.wikipedia.org/wiki/ACID +http://searchstorage.techtarget.com/definition/data-availability +https://aphyr.com/posts/288-the-network-is-reliable +http://research.microsoft.com/en-us/um/people/navendu/papers/sigcomm11netwiser.pdf +http://web.archive.org/web/20140327023856/http://voltdb.com/clarifications-cap-theorem-and-data-related-errors/ +http://static.googleusercontent.com/media/research.google.com/en//archive/chubby-osdi06.pdf +http://www.hpl.hp.com/techreports/2012/HPL-2012-101.pdf +http://research.microsoft.com/en-us/um/people/navendu/papers/sigcomm11netwiser.pdf +http://www.cs.cornell.edu/projects/ladis2009/talks/dean-keynote-ladis2009.pdf +http://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf +https://people.mpi-sws.org/~druschel/courses/ds/papers/cooper-pnuts.pdf +http://blog.gigaspaces.com/nocap-part-ii-availability-and-partition-tolerance/ +http://stackoverflow.com/questions/39664619/what-if-we-partition-a-ca-distributed-system +https://people.eecs.berkeley.edu/~istoica/classes/cs268/06/notes/20-BFTx2.pdf +http://ivoroshilin.com/2012/12/13/brewers-cap-theorem-explained-base-versus-acid/ +https://www.quora.com/What-is-the-difference-between-CAP-and-BASE-and-how-are-they-related-with-each-other +http://berb.github.io/diploma-thesis/original/061_challenge.html +http://dssresources.com/faq/index.php?action=artikel&id=281 +https://saipraveenblog.wordpress.com/2015/12/25/cap-theorem-for-distributed-systems-explained/ +https://www.infoq.com/articles/cap-twelve-years-later-how-the-rules-have-changed +https://dzone.com/articles/better-explaining-cap-theorem +http://www.julianbrowne.com/article/viewer/brewers-cap-theorem +http://delivery.acm.org/10.1145/1400000/1394128/p48-pritchett.pdf?ip=73.69.60.168&id=1394128&acc=OPEN&key=4D4702B0C3E38B35%2E4D4702B0C3E38B35%2E4D4702B0C3E38B35%2E6D218144511F3437&CFID=694281010&CFTOKEN=94478194&__acm__=1479326744_f7b98c8bf4e23bdfe8f17b43e4f14231 +http://dl.acm.org/citation.cfm?doid=1394127.1394128 +https://en.wikipedia.org/wiki/Eventual_consistency +https://en.wikipedia.org/wiki/Two-phase_commit_protocol +https://en.wikipedia.org/wiki/ACID +https://people.eecs.berkeley.edu/~brewer/cs262b-2004/PODC-keynote.pdf +http://www.johndcook.com/blog/2009/07/06/brewer-cap-theorem-base/ +http://searchsqlserver.techtarget.com/definition/ACID +http://queue.acm.org/detail.cfm?id=1394128 +http://www.dataversity.net/acid-vs-base-the-shifting-ph-of-database-transaction-processing/ +https://neo4j.com/developer/graph-db-vs-nosql/#_navigate_document_stores_with_graph_databases +https://neo4j.com/blog/aggregate-stores-tour/ +https://en.wikipedia.org/wiki/Eventual_consistency +https://en.wikipedia.org/wiki/Distributed_transaction +https://en.wikipedia.org/wiki/Distributed_database +https://en.wikipedia.org/wiki/ACID +http://searchstorage.techtarget.com/definition/data-availability +https://datatechnologytoday.wordpress.com/2013/06/24/defining-database-availability/ +{% bibliography --file rpc %} diff --git a/chapter/6/being-consistent.md b/chapter/6/being-consistent.md new file mode 100644 index 0000000..233d987 --- /dev/null +++ b/chapter/6/being-consistent.md @@ -0,0 +1,82 @@ +--- +layout: page +title: "Being Consistent" +by: "Aviral Goel" +--- + +## Replication and Consistency +Availability and Consistency are the defining characteristics of any distributed system. As dictated by the CAP theorem, accommodating network partitions requires a trade off between the two properties. Modern day large scale internet based distributed systems have to be highly available. To manage huge volumes of data (big data) and to reduce access latency for geographically diverse user base, their data centers also have to be geographically spread out. Network partitions which would otherwise happen with a low probability on a local network become certain events in such systems. To ensure availability in the event of partitions, these systems have to replicate data objects. This begs the question, how to ensure consistency of these replicas? It turns out there are different notions of consistency which the system can adhere to. + +* **Strong Consistency** implies linearizability of updates, i.e., all updates applied to a replicated data type are serialized in a global total order. This means that any update will have to be simultaneously applied to all other replicas. Its obvious that this notion of consistency is too restrictive. A single unavailable node will violate this condition. Forcing all updates to happen synchronously will impact system availability negatively. This notion clearly does not fit the requirements of highly available fault tolerant systems. + +* **Eventual Consistency** is a weaker model of consistency that does not guarantee immediate consistency of all replicas. Any local update is immediately executed on the replica. The replica then sends its state asynchronously to other replicas. As long as all replicas share their states with each other, the system eventually achieves stability. Each replica finally contains the same value. During the execution, all updates happen asynchronously at all replicas in a non-deterministic order. So replicas can be inconsistent between updates. If updates arrive concurrently at a replica, a consensus protocol can be employed to ensure that both updates taken together do not violate an invariant. If they do, a rollback has to be performed and the new state is communicated to all the other replicas. + +Most large scale distributed systems try to be **Eventually Consistent** to ensure high availability and partition-tolerance. But conflict resolution is hard. There is little guidance on correct approaches to consensus and its easy to come up with an error prone ad-hoc approach. What if we side-step conflict resolution and rollback completely? Is there a way to design data structures which do not require any consensus protocols to merge concurrent updates? + +## A Distributed Setting + +### TODO need to write pseudocode. Will finish this part with the detailed explanation of CRDTs in the next chapter. +Consider a replicated counter. Each node can increment the value of its local copy. The figure below shows three nodes which increment their local copies at arbitrary time points and each replica sends its value asynchronously to the other two replicas. Whenever it recieves the value of its replica, it adds it to its current value. If two values are received concurrently, both will be added together to its current value. So merging replicas in this example becomes trivial. + +Let's take a look at another interesting generalization of this. Integer Vector + + +We can make an interesting observation from the previous examples: + +__*All distributed data structures don't need conflict resolution*__ + +This raises the following question: + +__*How can we design a distributed structure such that we don't need conflict resolution?*__ + +The answer to this question lies in an algebraic structure called the **join semilattice**. + +## Join Semilattice +A join-semilattice or upper semilattice is a *partial order* `≤` with a *least upper bound* (LUB) `⊔` for all pairs. +`m = x ⊔ y` is a Least Upper Bound of `{` `x` `,` `y` `}` under `≤` iff `∀m′, x ≤ m′ ∧ y ≤ m′ ⇒ x ≤ m ∧ y ≤ m ∧ m ≤ m′`. + +`⊔` is: + +**Associative** + +`(x ⊔ y) ⊔ z = x ⊔ (y ⊔ z)` + +**Commutative** + +`x ⊔ y = y ⊔ x` + +**Idempotent** + +`x ⊔ x = x` + +The examples we saw earlier were of structures that could be modeled as join semilattices. The merge operation for the increment only counter is the summation function and for the integer vector it is the per-index maximum of the vectors being merged. +So, if we can model the state of the data structure as a partially ordered set and design the merge operation to always compute the "larger" of the two states, its replicas will never need consensus. They will always converge as execution proceeds. Such data structures are called CRDTs (Conflict-free Replicated Data Type). But what about consistency of these replicas? + +## Strong Eventual Consistency (SEC) +We discussed a notion of consistency, *Eventual Consistency*, in which replicas eventually become consistent if there are no more updates to be merged. But the update operation is left unspecified. Its possible for an update to render the replica in a state that causes it to conflict with a later update. In this case the replica may have to roll back and use consensus to ensure that all replicas do the same to ensure consistency. This is complicated and wasteful. But if replicas are modeled as CRDTs, the updates never conflict. Regardless of the order in which the updates are applied, all replicas will eventually have equivalent state. Note that no conflict arbitration is necessary. This kind of Eventual Consistency is a stronger notion of consistency than the one that requires conflict arbitration and hence is called *Strong Eventual Consistency*. + +### Strong Eventual Consistency and CAP Theorem + +Let's study SEC data objects from the perspective of CAP theorem. + +#### 1. Consistency and Network Partition +Each distributed replica will communicate asynchronously with other reachable replicas. These replicas will eventually converge to the same value. There is no consistency guarantee on the value of replicas not reachable due to network conditions and hence this condition is strictly weaker than strong consistency. But as soon as those replicas can be reached, they will also converge in a self-stabilizing manner. + +#### 2. Availability and Network Partition +Each distributed replica will always be available for local reads and writes regardless of network partitions. In fact, if there are n replicas, a single replica will function even if the remaining n - 1 replicas crash simultaneously. This **provides an extreme form of availability**. + +SEC facilitates maximum consistency and availability in the event of network partitions by relaxing the requirement of global consistency. Note that this is achieved by virtue of modeling the data objects as join semilattices. + +#### Strong Eventual Consistency and Linearizability +In a distributed setting, a replica has to handle concurrent updates. In addition to its sequential behavior, a CRDT also has to ensure that its concurrent behavior also ensures strong eventual consistency. This makes it possible for CRDTs to exhibit behavior that is simply not possible for sequentially consistent objects. +Consider a set CRDT used in a distributed setting. One of the replicas p<sub>i</sub> executes the sequence `add(a); remove(b)`. Another replica p<sub>j</sub> executes the sequence `add(b); remove(a)`. Now both send their states asynchronously to another replica p<sub>k</sub> which has to merge them concurrently. Same element exists in one of the sets and does not exist in the other set. There are multiple choices that the CRDT designer can make. Let's assume that the implementation always prefers inclusion over exclusion. So in this case, p<sub>k</sub> will include both `a` and `b`. +Now consider a sequential execution of the two sequences on set data structure. The order of execution will be either `add(a); remove(b); add(b); remove(a)` or `add(b); remove(a); add(a); remove(b)`. In both cases one of the elements is excluded. This is different from the state of the CRDT set implementation. +Thus, strong eventually consistent data structures can be sequentially inconsistent. +Similarly, if there are `n` sequentially consistent replicas, then they would need consensus to ensure a single order of execution of operations across all replicas. But if `n - 1` replicas crash, then consensus cannot happen. This makes the idea of sequential consistency incomparable to that of strong eventual consistency. + +## What Next? +This chapter introduced Strong Eventual Consistency and the formalism behind CRDTs, join semilattices, which enables CRDTs to exhibit strong eventual consistency. The discussion however does not answer an important question: + +__*Can all standard data structures be designed as CRDTs?*__ + +The next chapter sheds more light on the design of CRDTs and attempts to answer this question. diff --git a/chapter/6/consistency-crdts.md b/chapter/6/consistency-crdts.md deleted file mode 100644 index fcb49e7..0000000 --- a/chapter/6/consistency-crdts.md +++ /dev/null @@ -1,11 +0,0 @@ ---- -layout: page -title: "Consistency & CRDTs" -by: "Joe Schmoe and Mary Jane" ---- - -Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. {% cite Uniqueness --file consistency-crdts %} - -## References - -{% bibliography --file consistency-crdts %}
\ No newline at end of file diff --git a/chapter/6/resources/partitioned-network.jpg b/chapter/6/resources/partitioned-network.jpg Binary files differnew file mode 100644 index 0000000..513fc13 --- /dev/null +++ b/chapter/6/resources/partitioned-network.jpg diff --git a/chapter/8/Hive-architecture.png b/chapter/8/Hive-architecture.png Binary files differnew file mode 100644 index 0000000..9f61454 --- /dev/null +++ b/chapter/8/Hive-architecture.png diff --git a/chapter/8/Hive-transformation.png b/chapter/8/Hive-transformation.png Binary files differnew file mode 100644 index 0000000..7383188 --- /dev/null +++ b/chapter/8/Hive-transformation.png diff --git a/chapter/8/big-data.md b/chapter/8/big-data.md index bfd3e7b..6c0781d 100644 --- a/chapter/8/big-data.md +++ b/chapter/8/big-data.md @@ -1,11 +1,472 @@ --- layout: page title: "Large Scale Parallel Data Processing" -by: "Joe Schmoe and Mary Jane" +by: "Jingjing and Abhilash" --- +## Introduction +The growth of Internet has generated the so-called big data(terabytes or petabytes). It is not possible to fit them into a single machine or process them with one single program. Often the computation has to be done fast enough to provide practical services. A common approach taken by tech giants like Google, Yahoo, Facebook is to process big data across clusters of commodity machines. Many of the computations are conceptually straightforward, and Google proposed the MapReduce model to abstract the logic and proved to be simple and powerful. From then on, the idea inspired lots of other programming models. In this chapter, we will present how programming models evolve over time, why their execution engines are designed in certain ways, and underlying ecosystem that supports each developing thread. +## 1 Programming Models +### 1.1 Data parallelism +*Data parallelism* is to run a single operation on different pieces of the data on different machines in parallel. Comparably, a sequential computation looks like *"for all elements in the dataset, do operation A"*, where dataset could be in the order of terabytes or petabytes aka. big data and one wants to scale up the processing. The challenges to do this sequential computation in a parallelized manner include how to abstract the different types of computations in a simple and correct way, how to distribute the data to hundreds/thousands of machines, how to handle failures and so on. -Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. {% cite Uniqueness --file big-data %} +<figure class="main-container"> + <img src="{{ site.baseurl }}/resources/img/data-parallelism.png" alt="Data Parallelism" /> +</figure> -## References +**MapReduce** {% cite dean2008mapreduce --file big-data %} is a programming model proposed by Google to initially satisfy their demand of large-scale indexing for web search service. It provides a simple user program interface: *map* and *reduce* functions and automatically handles the parallelization and distribution. + +The MapReduce model is simple and powerful, and quickly became very popular among developers. However, when developers start writing real-world applications, they often end up chaining together MapReduce stages. The pipeline of MapReduce forces programmers to write additional coordinating codes, i.e. the development style goes backward from simple logic computation abstraction to lower-level coordination management. In map reduce, programmers need to reason about data representation on disk or in storage services such as a database. Besides, developers need to clearly understand the map reduce execution model to do manual optimizations[ref]. **FlumeJava** {%cite chambers2010flumejava --file big-data%} library intends to provide support for developing data-parallel pipelines by abstracting away the complexity involved in data representation and implicitly handling the optimizations. It defers the evaluation, constructs an execution plan from parallel collections, optimizes the plan, and then executes underlying MR primitives. The optimized execution is comparable with hand-optimized pipelines, so there's no need to write raw MR programs directly. + +An alternative approach to data prallelism is to construct complex, multi-step directed acyclic graphs (DAGs) of work from the user instructions and execute those DAGs all at once. This eliminates the costly synchronization required by MapReduce and makes applications much easier to build and reason about. Dryad, a Microsoft Research project used internally at Microsoft was one such project which leveraged this model of computation. + +Microsfot **Dryad** {% cite isard2007dryad --file big-data %} abstracts individual computational tasks as vertices, and constructs a communication graph between those vertices. What programmers need to do is to describe this DAG graph and let Dryad execution engine construct the execution plan and manage scheduling and optimization. One of the advantages of Dryad over MapReduce is that Dryad vertices can process an arbitrary number of inputs and outputs, while MR only supports a single input and a single output for each vertex. Besides the flexibility of computations, Dryad also supports different types of communication channel: file, TCP pipe and shared-memory FIFO. + + +Dryad expresses computation as acyclic data flows, which might be too expensive for some complex applications, e.g. iterative machine learning algorithms. **Spark** {% cite zaharia2010spark --file big-data%} is a framework that uses functional programming and pipelining to provide such support. It is largely inspired by MapReduce's model and builds upon the ideas behind DAG, lazy evaluation of DryadLinq. Instead of writing data to disk for each job as MapReduce does Spark can cache the results across jobs. Spark explicitly caches computational data in memory thorugh specialized immutable datasets named Resilient Distributed Sets(RDD) and reuse the same dataset across multiple parallel operations. The Spark builds upon RDD to achieve fault tolerance by reusing the lineage information of the lost RDD. This results in lesser overhead than what is seen in fault tolerance achieved by checkpoint in Distribtued Shared Memory systems. Moreover, Spark powers a stack of other libraries, e.g..SQL&DataFrames, GraphX, and can easily combine those libraries in one single application. These feature makes Spark the best fit for iterative jobs and interactive analytics and also helps it in providing better performance. Above all, any system can be easily expressed by Spark enabling other models to leverage the specific advantages of Spark systems and still retain the process of computation without any changes to Spark system[ref]. + + +Following four sections discuss about the programming models of MapReduce, FlumeJava, Dryad and Spark. + + +### 1.1.1 MapReduce +In this model, parallelizable computations are abstracted into map and reduce functions. The computation accepts a set of key/value pairs as input and produces a set of key/value pairs as output. The process involves two phases: +- *Map*, written by the user, accepts a set of key/value pairs("record") as input, applies *map* operation on each record, then it computes a set of intermediate key/value pairs as output. +- *Reduce*, also written by the user, accepts an intermediate key and a set of values associated with that key, operate on them, produces zero or one output value. + Note: there is a *Shuffle* phase between *map* and *reduce*, provided by MapReduce library, groups the all the intermediate values of the same key together and pass to *Reduce* function. We will discuss more in Section 2 Execution Models. + +Conceptually, the map and reduction functions have associated **types**: + +\\[map (k1,v1) \rightarrow list(k2,v2)\\] + +\\[reduce (k2,list(v2)) \rightarrow list(v2)\\] + + +The input keys and values are drawn from a different domain than the output keys and values. The intermediate keys and values are from the same domain as the output keys and values. + + +Concretely, considering the problem of counting the number of occurrence of each word in a large collection of documents: each time, a `map` function that emits a word plus its count 1; a `reduce` function sums together all counts emitted for the same word + +``` +map(String key, String value): + // key: document name + // value: document contents + for each word w in value: + EmitIntermediate(w, "1"); + +reduce(String key, Iterator values): + // key: a word + // values: a list of counts + int result = 0; + for each v in values: + result += ParseInt(v); + Emit(AsString(result)); +``` + +During executing, the MapReduce library assigns a master node to manage data partition and scheduling, other nodes can serve as workers to run either *map* or *reduce* operations on demands. More details of the execution model is discussed later. Here, it's worth mentioning that the intermediate results are written into disks and reduce operation will read from disk. This is crucial for fault tolerance. + +*Fault Tolerance* +MapReduce runs on hundreds or thousands of unreliable commodity machines, so the library must provide fault tolerance. The library assumes that master node would not fail, and it monitors worker failures. If no status update is received from a worker on timeout, the master will mark it as failed. Then the master may schedule the associated task to other workers depending on task type and status. The commits of *map* and *reduce* task outputs are atomic, where the in-progress task writes data into private temporary files, once the task succeeds, it negotiate with the master and rename files to complete the task . In the case of failure, the worker discards those temporary files. This guarantees that if the computation is deterministic, the distribution implementation should produce same outputs as non-faulting sequential execution. + +*Limitations* +Many a analytics workloads like K-means, logistic regression, graph processing applications like PageRank, shortest path using parallel breadth first search require multiple stages of map reduce jobs. In regular map reduce framework like Hadoop, this requires the developer to manually handle the iterations in the driver code. At every iteration, the result of each stage T is written to HDFS and loaded back again at stage T+1 causing a performance bottleneck. The reason being wastage of network bandwidth, CPU resources and mainly the disk I/O operations which are inherently slow. In order to address such challenges in iterative workloads on map reduce, frameworks like Haloop {% cite bu2010haloop --file big-data %}, Twister {% cite ekanayake2010twister --file big-data %} and iMapReduce {% cite zhang2012imapreduce --file big-data %} adopt special techniques like caching the data between iterations and keeping the mapper and reducer alive across the iterations. + +### 1.1.2 FlumeJava +FlumeJava {%cite chambers2010flumejava --file big-data %}was introduced to make it easy to develop, test, and run efficient data-parallel pipelines. FlumeJava represents each dataset as an object and transformation is invoked by applying methods on these objects. It constructs an efficient internal execution plan from a pipeline of MapReduce jobs, uses deferred evaluation and optimizes based on plan structures. The debugging ability allows programmers to run on the local machine first and then deploy to large clusters. + +*Core Abstraction* +- `PCollection<T>`, a immutable bag of elements of type `T` +- `recordOf(...)`, specifies the encoding of the instance +- `PTable<K, V>`, a subclass of `PCollection<Pair<K,V>>`, a immutable multi-map with keys of type `K` and values of type `V` +- `parallelDo()`, can be expressed both the map and reduce parts of MapReduce +- `groupByKey()`, same as shuffle step of MapReduce +- `combineValues()`, semantically a special case of `parallelDo()`, a combination of a MapReduce combiner and a MapReduce reducer, which is more efficient than doing all the combining in the reducer. + +*Deferred Evaluation & Optimizer* +The state of each `PCollection` object is either *deferred* (not yet computed) and *materialized* (computed). When the program invokes a parallel operation, it does not actually run the operation. Instead, it performs the operation only when needed. FlumeJava also provides some optimization practices: 1) parallelDo Fusion: f(g(x)) => f o g(x) to reduce steps; 2) MapShuffleCombineReduce (MSCR) Operation that generalizes MapReduce jobs to accept multiple inputs and multiple outputs. And for this, FlumeJava does another MSCR fusion. + + +### 1.1.3 Dryad +Dryad is a more general and flexible execution engine that execute subroutines at a specified graph vertices. Developers can specify an arbitrary directed acyclic graph to combine computational "vertices" with communication channels (file, TCP pipe, shared-memory FIFO) and build a dataflow graph. Compared with MapReduce, Dryad can specify an arbitrary DAG that have multiple number of inputs/outputs and support multiple stages. Also it can have more channels and boost the performance when using TCP pipes and shared-memory. But like writing a pipeline of MapReduce jobs, Dryad is a low-level programming model and hard for users to program, thus a more declarative model - DryadLINQ {%cite yu2008dryadlinq --file big-data %} was created to fill in the gap. It exploits LINQ, a query language in .NET and automatically translates the data-parallel part into execution plan and passed to the Dryad execution engine. Like MR, writing raw Dryad is hard, programmers need to understand system resources and other lower-level details. This motivates a more declarative programming model: DryadLINQ - a querying language. + +### 1.1.4 Spark + +Spark {%cite zaharia2010spark --file big-data %} is a fast, in-memory data processing engine with an elegant and expressive development interface which enables developers to efficiently execute machine learning, SQL or streaming workloads that require fast iterative access to datasets. Its a functional style programming model (similar to DryadLINQ) where a developer can create acyclic data flow graphs and transform a set of input data through a map - reduce like operators. Spark provides two main abstractions - distributed in-memory storage (RDD) and parallel operations (based on Scala’s collection API) on data sets high performance processing, scalability and fault tolerance. + +*Distributed in-memory storage - Resilient Distributed Data sets :* + +RDD is a partitioned, read only collection of objects which can be created from data in stable storage or by transforming other RDD. It can be distributed across multiple nodes (parallelize) in a cluster and is fault tolerant(Resilient). If a node fails, a RDD can always be recovered using its lineage graph (information on how it was derived from dataset). A RDD is stored in memory (as much as it can fit and rest is spilled to disk) and is immutable - It can only be transformed to a new RDD. These are the lazy transformations which are applied only if any action is performed on the RDD. Hence, RDD need not be materialized at all times. + +The properties that power RDD with the above mentioned features : +- A list of dependencies on other RDD’s. +- An array of partitions that a dataset is divided into. +- A compute function to do a computation on partitions. +- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) +- Optional preferred locations (aka locality info), (e.g. block locations for an HDFS file) + + +<figure class="main-container"> + <img src="./spark_pipeline.png" alt="Spark pipeline" /> +</figure> + + +Spark API provide two kinds of operations on a RDD: + +- Transformations - lazy operations that return another RDD. + - `map (f : T => U) : RDD[T] ⇒ RDD[U]` : Return a MappedRDD[U] by applying function f to each element + - `flatMap( f : T ⇒ Seq[U]) : RDD[T] ⇒ RDD[U]` : Return a new FlatMappedRDD[U] by first applying a function to all elements and then flattening the results. + - `filter(f:T⇒Bool) : RDD[T] ⇒ RDD[T]` : Return a FilteredRDD[T] having elemnts that f return true + - `groupByKey()` : Being called on (K,V) Rdd, return a new RDD[([K], Iterable[V])] + - `reduceByKey(f: (V, V) => V)` : Being called on (K, V) Rdd, return a new RDD[(K, V)] by aggregating values using eg: reduceByKey(_+_) + - `join((RDD[(K, V)], RDD[(K, W)]) ⇒ RDD[(K, (V, W))]` :Being called on (K,V) Rdd, return a new RDD[(K, (V, W))] by joining them by key K. + + +- Actions - operations that trigger computation on a RDD and return values. + + - `reduce(f:(T,T)⇒T) : RDD[T] ⇒ T` : return T by reducing the elements using specified commutative and associative binary operator + - `collect()` : Return an Array[T] containing all elements + - `count()` : Return the number of elements + +RDDs by default are discarded after use. However, Spark provides two explicit operations persist() and cache() to ensure RDDs are persisted in memory once the RDD has been computed for the first time. + +*Why RDD over Distributed Shared memory (DSM) ?* +RDDs are immutable and can only be created through coarse grained transformation while DSM allows fine grained read and write operations to each memory location. Hence RDDs do not incur the overhead of checkpointing thats present in DSM and can be recovered using their lineages. +RDDs are immutable and hence a straggler(slow node) can be replaced with backup copy as in Map reduce. This is hard to implement in DSM as two copies point to the same location and can interfere in each other’s update. +Other benefits include the scheduling of tasks based on data locality to improve performance and the ability of the RDDs to degrade gracefully incase of memory shortage. Partitions that do not fit in RAM gets spilled to the disk (performance will then be equal to that of any data parallel system). + +***Challenges in Spark*** + +- `Functional API semantics` : The GroupByKey operator is costly in terms of performance. In that it returns a distributed collection of (key, list of value) pairs to a single machine and then an aggregation on individual keys is performed on the same machine resulting in computation overhead. Spark does provide reduceByKey operator which does a partial aggregation on invidual worker nodes before returning the distributed collection. However, developers who are not aware of such a functionality can unintentionally choose groupByKey. + +- `Debugging and profiling` : There is no availability of debugging tools and developers find it hard to realize if a computation is happening more on a single machine or if the data-structure they used were inefficient. + +### 1.2 Querying: declarative interfaces +MapReduce provides only two high level primitives - map and reduce that the programmers have to worry about. MapReduce takes care of all the processing over a cluster, failure and recovery, data partitioning etc. However, the framework suffers from rigidity with respect to its one-input data format (key/value pair) and two-stage data flow. +Several important patterns like joins (which could be highly complex depending on the data) are extremely hard to implement and reason about for a programmer. Sometimes the code could be become repetitive when the programmer wants to implement most common operations like projection, filtering etc. +Non-programmers like data scientists would highly prefer SQL like interface over a cumbersome and rigid framework{% cite scaling-spark-in-real-world --file big-data%}. Such a high level declarative language can easily express their task while leaving all of the execution optimization details to the backend engine. Hence, these kind of abstractions provide ample opportunities for query optimizations. + +Sawzall {% cite pike2005interpreting --file big-data%} is a programming language built on top of MapReduce. It consists of a *filter* phase (map) and an *aggregation* phase (reduce). User program can specify the filter function, and emit the intermediate pairs to external pre-built aggregators. + +Apart from Sawzal, Pig {%cite olston2008pig --file big-data %} and Hive {%cite thusoo2009hive --file big-data %} are the other major components that sit on top of Hadoop framework for processing large data sets without the users having to write Java based MapReduce code. + +Hive is built by Facebook to organize dataset in structured formats and still utilize the benefit of MapReduce framework. It has its own SQL-like language: HiveQL {%cite thusoo2010hive --file big-data %} which is easy for anyone who understands SQL. Hive reduces code complexity and eliminates lots of boiler plate that would otherwise be an overhead with Java based MapReduce approach. It has a component called *metastore* that are created and reused each time the table is referenced by HiveQL like the way traditional warehousing solutions do. The drawback to using Hive is programmers have to be familiar with basic techniques and best practices for running their Hive queries at maximum speed as it depends on the Hive optimizer. Hive requires developers train the Hive optimizer for efficient optimization of their queries. + +Relational interface to big data is good, however, it doesn’t cater to users who want to perform + +- ETL to and from various semi or unstructured data sources. +- advanced analytics like machine learning or graph processing. + +These user actions require best of both the worlds - relational queries and procedural algorithms. Pig Latin {% cite olston2008pig --file big-data%} and Spark SQL {% cite armbrust2015spark --file big-data%} bridges this gap by letting users to seamlessly intermix both relational and procedural API. Both the frameworks free the programmer from worrying about internal execution model by providing implicit optimization on the user input DAG of transformations. + +Pig Latin aims at a sweet spot between declarative and procedural programming. For advanced programmers, SQL is unnatural to implement program logic and Pig Latin wants to dissemble the set of data transformation into a sequence of steps. This makes Pig more verbose than Hive. + +SparkSQL though has the same goals as that of Pig, is better given the Spark exeuction engine, efficient fault tolerance mechanism of Spark and specialized data structure called Dataset. + +The following subsections will discuss Hive, Pig Latin, SparkSQL in details. + + +### 1.2.1 Hive/HiveQL + +Hive is a data-warehousing infrastructure built on top of the map reduce framework - Hadoop. The primary responsibility of Hive is to provide data summarization, query and analysis. It supports analysis of large datasets stored in Hadoop’s HDFS {% cite shvachko2010hadoop --file big-data%}. It supports SQL-Like access to structured data which is known as HiveQL (or HQL) as well as big data analysis with the help of MapReduce. These SQL queries can be compiled into map reduce jobs that can be executed be executed on Hadoop. It drastically brings down the development time in writing and maintaining Hadoop jobs. + +Data in Hive is organized into three different formats : + +`Tables`: Like RDBMS tables Hive contains rows and tables and every table can be mapped to HDFS directory. All the data in the table is serialized and stored in files under the corresponding directory. Hive is extensible to accept user defined data formats, custom serialize and de-serialize methods. It also supports external tables stored in other native file systems like HDFS, NFS or local directories. + +`Paritions`: Distribution of data in sub directories of table directory is is determined by one or more partitions. A table can be further partitioned on columns. + +`Buckets`: Data in each partition can be further divided into buckets on the basis on hash of a column in a table. Each bucket is stored as a file in the partition directory. + +***HiveSQL***: Hive query language consists of a subset of SQL along with some extensions. The language is very SQL-like and supports features like subqueries, joins, cartesian product, group by, aggregation, describe and more. MapReduce programs can also be used in Hive queries. A sample query using MapReduce would look like this: +``` +FROM ( + MAP inputdata USING 'python mapper.py' AS (word, count) + FROM inputtable + CLUSTER BY word + ) + REDUCE word, count USING 'python reduce.py'; +``` +This query uses mapper.py for transforming inputdata into (word, count) pair, distributes data to reducers by hashing on word column (given by CLUSTER) and uses reduce.py. +INSERT INTO, UPDATE, and DELETE are not supported which makes it easier to handle reader and writer concurrency. + + +***Serialization/Deserialization*** +Hive implements the LazySerDe as the default SerDe interface. A SerDe is a combination of serialization and deserialization which helps developers instruct Hive on how their records should be processed. The Deserializer interface translates rows into internal objects lazily so that the cost of Deserialization of a column is incurred only when it is needed. The Serializer, however, converts a Java object into a format that Hive can write to HDFS or another supported system. Hive also provides a RegexSerDe which allows the use of regular expressions to parse columns out from a row. + +### 1.2.2 Pig Latin +The goal of Pig Latin {% cite olston2008pig --file big-data%} is to attract experienced programmers to perform ad-hoc analysis on big data. Parallel database products provide a simple SQL query interface, which is good for non-programmers and simple tasks, but not in a style where experienced programmers would approach. Instead such programmers prefer to specify single steps and operate as a sequence. + +For example, suppose we have a table urls: `(url, category, pagerank)`. The following is a simple SQL query that finds, for each suciently large category, the average pagerank of high-pagerank urls in that category. + +``` +SELECT category, AVG(pagerank) +FROM urls WHERE pagerank > 0.2 +GROUP BY category HAVING COUNT(*) > 106 +``` + +And Pig Latin would address in following way: + +``` +good_urls = FILTER urls BY pagerank > 0.2; +groups = GROUP good_urls BY category; +big_groups = FILTER groups BY COUNT(good_urls)>106; +output = FOREACH big_groups GENERATE + category, AVG(good_urls.pagerank); +``` + +*Interoperability* Pig Latin is designed to support ad-hoc data analysis, which means the input only requires a function to parse the content of files into tuples. This saves the time-consuming import step. While as for the output, Pig provides freedom to convert tuples into byte sequence where the format can be defined by users. + +*Nested Data Model* Pig Latin has a flexible, fully nested data model, and allows complex, non-atomic data types such as set, map, and tuple to occur as fields of a table. The benefits include: closer to how programmer think; data can be stored in the same nested fashion to save recombining time; can have algebraic language; allow rich user defined functions. + +*UDFs as First-Class Citizens* Pig Latin supports user-defined functions (UDFs) to support customized tasks for grouping, filtering, or per-tuple processing. + +*Debugging Environment* Pig Latin has a novel interactive debugging environment that can generate a concise example data table to illustrate output of each step. + +### 1.2.3 SparkSQL : + +The major contributions of Spark SQL {% cite armbrust2015spark --file big-data%} are the Dataframe API and the Catalyst. Spark SQL intends to provide relational processing over native RDDs and on several external data sources, through a programmer friendly API, high performance through DBMS techniques, support semi-structured data and external databases, support for advanced analytical processing like machine learning algorithms and graph processing. + +***Programming API*** + +Spark SQL runs on the top of Spark providing SQL interfaces. A user can interact with this interface though JDBC/ODBC, command line or Dataframe API. +A Dataframe API lets users to intermix both relational and procedural code with ease. Dataframe is a collection of schema based rows of data and named columns on which relational operations can be performed with optimized execution. Unlike a RDD, Dataframe allows developers to define structure for the data and can be related to tables in a relational database or R/Python’s Dataframe. Dataframe can be constructed from tables of external sources or existing native RDD’s. Dataframe is lazy and each object in it represents a logical plan which is not executed until an output operation like save or count is performed. +Spark SQL supports all the major SQL data types including complex data types like arrays, maps and unions. +Some of the Dataframe operations include projection (select), filter(where), join and aggregations(groupBy). +Illustrated below is an example of relational operations on employees data frame to compute the number of female employees in each department. + +``` +employees.join(dept, employees("deptId") === dept("id")) .where(employees("gender") === "female") .groupBy(dept("id"), dept("name")) .agg(count("name")) +``` +Several of these operators like === for equality test, > for greater than, a rithmetic ones (+, -, etc) and aggregators transforms to a abstract syntax tree of the expression which can be passed to Catalyst for optimization. +A cache() operation on the data frame helps Spark SQL store the data in memory so it can be used in iterative algorithms and for interactive queries. In case of Spark SQL, memory footprint is considerably less as it applies columnar compression schemes like dictionary encoding / run-length encoding. + +The DataFrame API also supports inline UDF definitions without complicated packaging and registration. Because UDFs and queries are both expressed in the same general purpose language (Python or Scala), users can use standard debugging tools. + +However, a DataFrame lacks type safety. In the above example, attributes are referred to by string names. Hence, it is not possible for the compiler to catch any errors. If attribute names are incorrect then the error will only detected at runtime, when the query plan is created. +Spark introduced a extension to Dataframe called ***Dataset*** to provide this compile type safety. It embraces object oriented style for programming and has an additional feature termed Encoders. Encoders translate between JVM representations (objects) and Spark’s internal binary format. Spark has built-in encoders which are very advanced in that they generate byte code to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object + + +Winding up - we can compare SQL vs Dataframe vs Dataset as below : + +<figure class="main-container"> + <img src="./sql-vs-dataframes-vs-datasets.png" alt="SQL vs Dataframe vs Dataset" /> +</figure> +*Figure from the website :* https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html -{% bibliography --file big-data %}
\ No newline at end of file +### 1.3 Large-scale Parallelism on Graphs +Map Reduce doesn’t scale easily and is highly inefficient for iterative / graph algorithms like page rank and machine learning algorithms. Iterative algorithms requires programmer to explicitly handle the intermediate results (writing to disks). Hence, every iteration requires reading the input file and writing the results to the disk resulting in high disk I/O which is a performance bottleneck for any batch processing system. + +Also graph algorithms require exchange of messages between vertices. In case of PageRank, every vertex requires the contributions from all its adjacent nodes to calculate its score. Map reduce currently lacks this model of message passing which makes it complex to reason about graph algorithms. One model that is commonly employed for implementing distributed graph processing is the graph parallel model. + +In the graph-parallel abstraction, a user-defined vertex program is instantiated concurrently for each vertex and interacts with adjacent vertex programs through messages or shared state. Each vertex program can read and modify its vertex property and in some cases adjacent vertex properties. When all vertex programs vote to halt the program terminates. Most systems adopt the bulk synchronous parallel model {% cite bulk-synchronous-model --file big-data%}. + +This model was introduced in 1980 to represent the hardware design features of parallel computers. It gained popularity as an alternative for map reduce since it addressed the above mentioned issues with map reduce<br /> +BSP model is a message passing synchronous model where - + + - Computation consists of several steps called as supersets. + - The processors involved have their own local memory and every processor is connected to other via a point-to-point communication. + - At every superstep, a processor receives input at the beginning, performs computation and outputs at the end. + - A processor at superstep S can send message to another processor at superstep S+1 and can as well receive message from superstep S-1. + - Barrier synchronization synchs all the processors at the end of every superstep. + +A notable feature of the model is the complete control on data through communication between every processor at every superstep. Though similar to map reduce model, BSP preserves data in memory across supersteps and helps in reasoning iterative graph algorithms. + +The graph-parallel abstractions allow users to succinctly describe graph algorithms, and provide a runtime engine to execute these algorithms in a distributed nature. They simplify the design, implementation, and application of sophisticated graph algorithms to large-scale real-world problems. Each of these frameworks presents a different view of graph computation, tailored to an originating domain or family of graph algorithms. However, these frameworks fail to address the problems of data preprocessing and construction, favor snapshot recovery over fault tolerance and lack support from distributed data flow frameworks. The data-parallel systems are well suited to the task of graph construction, and are highly scalable. However, suffer from the very problems mentioned before for which the graph-parallel systems came into existence. +GraphX {%cite xin2013graphx --file big-data%} is a new computation system which builds upon the Spark’s Resilient Distributed Dataset (RDD) to form a new abstraction Resilient Distributed Graph (RDG) to represent records and their relations as vertices and edges respectively. RDG’s leverage the RDD’s fault tolerance mechanism and expressivity. + +How does GraphX improve over the existing graph-parallel and data flow models ? +The RDGs in GraphX provides a set of elegant and expressive computational primitives through which many a graph parallel systems like Pregel, PowerGraph can be easily expressed with minimal lines of code. GraphX simplifies the process of graph ETL and analysis through new operations like filter, view and graph transformations. It minimizes communication and storage overhead. + +Similar to the data flow model, it GraphX away from the vertex centric view and adopts transformations on graphs yielding a new graph. + +***Why partitioning is important in graph computation systems ?*** +Graph-parallel computation requires every vertex or edge to be processed in the context of its neighborhood. Each transformation depends on the result of distributed joins between vertices and edges. This means that graph computation systems rely on graph partitioning (edge-cuts in most of the systems) and efficient storage to minimize communication and storage overhead and ensure balanced computation. + +<figure class="main-container"> + <img src="./edge-cuts.png" alt="edge cuts" /> +</figure> +*Figure from {%cite xin2013graphx --file big-data%}* + +***Why Edge-cuts are expensive ?*** +Edge-cuts for partitioning requires random assignment of vertices and edges across all the machines. hus the communication and storage overhead is proportional to the number of edges cut, and this makes balancing the number of cuts a priority. For most real-world graphs, constructing an optimal edge-cut is cost prohibitive, and most systems use random edge-cuts which achieve appropriate work balance, but nearly worst-case communication overhead. + +<figure class="main-container"> + <img src="./vertex-cuts.png" alt="Vertex cuts" /> +</figure> +*Figure from {%cite xin2013graphx --file big-data%}* + +***Vertex-cuts - GraphX’s solution to effective partitioning*** : An alternative approach which does the opposite of edge-cut — evenly assign edges to machines, but allow vertices to span multiple machines. The communication and storage overhead of a vertex-cut is directly proportional to the sum of the number of machines spanned by each vertex. Therefore, we can reduce communication overhead and ensure balanced computation by evenly assigning edges to machines in way that minimizes the number of machines spanned by each vertex. + +The GraphX RDG structure implements a vertex-cut representation of a graph using three unordered horizontally partitioned RDD tables. These three tables are as follows: + +- `EdgeTable(pid, src, dst, data)`: Stores adjacency structure and edge data. +- `VertexDataTable(id, data)`: Stores vertex data. Contains states associated with vertices that are changing in the course of graph computation +- `VertexMap(id, pid)`: Maps from vertex ids to the partitions that contain their adjacent edges. Remains static as long as the graph structure doesn’t change. + +A three-way relational join is used to bring together source vertex data, edge data, and target vertex data. The join is straightforward, and takes advantage of a partitioner to ensure the join site is local to the edge table. This means GraphX only has to shuffle vertex data. + +***Operators in GraphX*** +Other than standard data-parallel operators like filter, map, leftJoin, and reduceByKey, GraphX supports following graph-parallel operators: + +- graph - constructs property graph given a collection of edges and vertices. +- vertices, edges - decompose the graph into a collection of vertices or edges by extracting vertex or edge RDDs. +- mapV, mapE - transform the vertex or edge collection. +- triplets -returns collection of form ((i, j), (PV(i), PE(i, j), PV(j))). The operator essentially requires a multiway join between vertex and edge RDD. This operation is optimized by shifting the site of joins to edges, using the routing table, so that only vertex data needs to be shuffled. +- leftJoin - given a collection of vertices and a graph, returns a new graph which incorporates the property of matching vertices from the given collection into the given graph without changing the underlying graph structure. +- subgraph - Applies predicates to return a subgraph of the original graph by filtering all the vertices and edges that don't satisfy the vertices and edges predicates respectively. +- mrTriplets (MapReduce triplet) - logical composition of triplets followed by map and reduceByKey. It is the building block of graph-parallel algorithms. + +## 2 Execution Models +There are many possible implementations for those programming models. In this section, we will discuss about a few different execution models, how the above programming interfaces exploit them, the benefits and limitations of each design and so on. MapReduce, its variants and Spark all use the master/workers model (section 2.1), where the master is responsible for managing data and dynamically scheduling tasks to workers. The master monitors workers' status, and when failure happens, master will reschedule the task to another idle worker. The fault-tolerance is guaranteed by persistence of data in MapReduce versus lineage(for recomputation) in Spark. + + + +### 2.1 Master/Worker model +The original MapReduce model is implemented and deployed in Google infrastructure. As described in section 1.1.1, user program defines map and reduce functions and the underlying system manages data partition and schedules jobs across different nodes. Figure 2.1.1 shows the overall flow when the user program calls MapReduce function: +1. Split data. The input files are split into *M* pieces; +2. Copy processes. The user program create a master process and the workers. The master picks idle workers to do either map or reduce task; +3. Map. The map worker reads corresponding splits and passes to the map function. The generated intermediate key/value pairs are buffered in memory; +4. Partition. The buffered pairs are written to local disk and partitioned to *R* regions periodically. Then the locations are passed back to the master; +5. Shuffle. The reduce worker reads from the local disks and groups together all occurrences of the same key together; +6. Reduce. The reduce worker iterates over the grouped intermediate data and calls reduce function on each key and its set of values. The worker appends the output to a final output file; +7. Wake up. When all tasks finish, the master wakes up the user program. + +<figure class="fullwidth"> + <img src="{{ site.baseurl }}/resources/img/mapreduce-execution.png" alt="MapReduce Execution Overview" /> +</figure> +<p>Figure 2.1.1 Execution overview<label for="sn-proprietary-monotype-bembo" class="margin-toggle sidenote-number"></label><input type="checkbox" id="sn-proprietary-monotype-bembo" class="margin-toggle"/><span class="sidenote">from original MapReduce paper {%cite dean2008mapreduce --file big-data%}</span></p> + +At step 4 and 5, the intermediate dataset is written to the disk by map worker and then read from the disk by reduce worker. Transferring big data chunks over network is expensive, so the data is stored on local disks of the cluster and the master tries to schedule the map task on the machine that contains the dataset or a nearby machine to minimize the network operation. + +There are some practices in this paper that make the model work very well in Google, one of them is **backup tasks**: when a MapReduce operation is close to completion, the master schedules backup executions of the remaining in-progress tasks ("straggler"). The task is marked as completed whenever either the primary or the backup execution completes. +In the paper, the authors measure the performance of MapReduce on two computations running on a large cluster of machines. One computation *grep* through approximately 1TB of data. The other computation *sort* approximately 1TB of data. Both computations take in the order of a hundred seconds. In addition, the backup tasks do help largely reduce execution time. In the experiment where 200 out of 1746 tasks were intentionally killed, the scheduler was able to recover quickly and finish the whole computation for just a 5% increased time. +Overall, the performance is very good for conceptually unrelated computations. + + +### 2.2 Spark execution model + +<figure class="main-container"> + <img src="./cluster-overview.png" alt="MapReduce Execution Overview" /> +</figure> + +The Spark driver defines SparkContext which is the entry point for any job that defines the environment/configuration and the dependencies of the submitted job. It connects to the cluster manager and requests resources for further execution of the jobs. +The cluster manager manages and allocates the required system resources to the Spark jobs. Furthermore, it coordinates and keeps track of the live/dead nodes in a cluster. It enables the execution of jobs submitted by the driver on the worker nodes (also called Spark workers) and finally tracks and shows the status of various jobs running by the worker nodes. +A Spark worker executes the business logic submitted by the Spark driver. Spark workers are abstracted and are allocated dynamically by the cluster manager to the Spark driver for the execution of submitted jobs. The driver will listen for and accept incoming connections from its executors throughout its lifetime. + +***Job scheduler optimization :*** Spark’s job scheduler tracks the persistent RDD’s saved in memory. When an action (count or collect) is performed on a RDD, the scheduler first analyzes the lineage graph to build a DAG of stages to execute. These stages only contain the transformations having narrow dependencies. Outside these stages are the wider dependencies for which the scheduler has to fetch the missing partitions from other workers in order to build the target RDD. The job scheduler is highly performant. It assigns tasks to machines based on data locality or to the preferred machines in the contained RDD. If a task fails, the scheduler re-runs it on another node and also recomputes the stage’s parent is missing. + +***How are persistent RDD’s memory managed ?*** + +Persistent RDDs are stored in memory as java objects (for performance) or in memory as serialized data (for less memory usage at cost of performance) or on disk. If the worker runs out of memory upon creation of a new RDD, LRU policy is applied to evict the least recently accessed RDD unless its same as the new RDD. In that case, the old RDD is excluded from eviction given the fact that it may be reused again in future. Long lineage chains involving wide dependencies are checkpointed to reduce the time in recovering a RDD. However, since RDDs are read-only, checkpointing is still ok since consistency is not a concern and there is no overhead to manage the consistency as is seen in distributed shared memory. + + +### 2.3 Hive execution model + +The Hive execution model composes of the below important components (and as shown in the below diagram): + +- Driver : Similar to the Drivers of Spark/Map reduce application, the driver in Hive handles query submission & its flow across the system. It also manages the session and its statistics. + +- Metastore – A Hive metastore stores all information about the tables, their partitions, schemas, columns and their types, etc. enabling transparency of data format and its storage to the users. It in turn helps in data exploration, query compilation and optimization. Criticality of the Matastore for managing the structure of hadoop files requires it to be updated on a regular basis. + +- Query Compiler – The Hive Query compiler is similar to any traditional database compilers. it processes the query in three steps : + - Parse : In this phase it uses Antlr (A parser generator tool) to generate the Abstract syntax tree (AST) of the query. + - Transformation of AST to DAG (Directed acyclic graph) : In this phase it generates logical plan and does a compile type checking. Logical plan is generated using the metadata (stored in Metastore) information of the required tables. It can flag errors if any issues found during the type checking. + + - Optimization : Optimization forms the core of any declarative interface. In case of Hive, optimization happens through chains of transformation of DAG. A transformation could include even a user defined optimization and it applies an action on the DAG only if a rule is satisfied. Every node in the DAG implements a special interface called as Node interface which makes it easy for the manipulation of the operator DAG using other interfaces like GraphWalker, Dispatcher, Rule and Processor. Hence, by transformation, we mean walking through a DAG and for every Node we encounter we perform a Rule satisfiability check. If a Rule is satisfied, a corresponding processor is invoked. A Dispatcher maintains a list of Rule to Processor mappings. + +<figure class="main-container"> + <img src="./Hive-transformation.png" alt="Hive transformation" /> +</figure> +*Figure to depict the transformation flow during optimization, from:* %cite thusoo2010hive --file big-data %} + + Some of the important transformations are : + + - Column Pruning - Consider only the required columns needed in the query processing for projection. + - Predicate Pushdown - Filter the rows as early as possible by pushing down the predicates. Its important that unnecessary records are filtered first and transformations are applied on only the needed ones. + - Partition Pruning - Predicates on partitioned columns are used to prune out files of partitions that do not satisfy the predicate. + - Map Side Joins - Smaller tables in the join operation can be replicated in all the mappers and the reducers. + - Join Reordering - Reduce reducer side join operation memory by keeping only smaller tables in memory. Larger tables need not be kept in memory. + - Repartitioning data to handle skew in GROUP BY processing can be achieved by performing GROUP BY in two MapReduce stages. In first stage data is distributed randomly to the reducers and partial aggregation is performed. In the second stage, these partial aggregations are distributed on GROUP BY columns to different reducers. + - Similar to combiners in Map reduce, hash based partial aggregations in the mappers can be performed reduce the data that is sent by the mappers to the reducers. This helps in reducing the amount of time spent in sorting and merging the resulting data. + + +Execution Engine : Execution Engine finally executes the tasks in order of their dependencies. A MapReduce task first serializes its part of the plan into a plan.xml file. This file is then added to the job cache and mappers and reducers are spawned to execute relevant sections of the operator DAG. The final results are stored to a temporary location and then moved to the final destination (in the case of say INSERT INTO query). + + +<figure class="main-container"> + <img src="./Hive-architecture.png" alt="Hive architecture" /> +</figure> +*Hive architecture diagram* + +Summarizing the flow - the query is first submitted via CLI/web UI/any other interface. The query undergoes all the compiler phases as explained above to form an optimized DAG of MapReduce and hdfs tasks which the execution engine executes in its correct order using Hadoop. + +### 2.4 SparkSQL execution model + +SparkSQL execution model leverages Catalyst framework for optimizing the SQL before submitting it to the Spark Core engine for scheduling the job. +A Catalyst is a query optimizer. Query optimizers for map reduce frameworks can greatly improve performance of the queries developers write and also significantly reduce the development time. A good query optimizer should be able to optimize user queries, extensible for user to provide information about the data and even dynamically include developer defined specific rules. + +Catalyst leverages the Scala’s functional language features like pattern matching and runtime meta programming to allow developers to concisely specify complex relational optimizations. + +Catalyst includes both rule-based and cost-based optimization. It is extensible to include new optimization techniques and features to Spark SQL and also let developers provide data source specific rules. +Catalyst executes the rules on its data type Tree - a composition of node objects where each node has a node type (subclasses of TreeNode class in Scala) and zero or more children. Node objects are immutable and can be manipulated. The transform method of a Tree applies pattern matching to match a subset of all possible input trees on which the optimization rules needs to be applied. + +Hence, in Spark SQL, transformation of user queries happens in four phases : + +<figure class="main-container"> + <img src="./sparksql-data-flow.jpg" alt="SparkSQL optimization plan Overview" /> +</figure> + +***Analyzing a logical plan to resolve references :*** In the analysis phase a relation either from the abstract syntax tree (AST) returned by the SQL parser or from a DataFrame is analyzed to create a logical plan out of it, which is still unresolved (the columns referred may not exist or may be of wrong datatype). The logical plan is resolved using using the Catalyst’s Catalog object(tracks the table from all data sources) by mapping the named attributes to the input provided, looking up the relations by name from catalog, by propagating and coercing types through expressions. + +***Logical plan optimization :*** In this phase, several of the rules like constant folding, predicate push down, projection pruning, null propagation, boolean expression simplification are applied on the logical plan. + +***Physical planning :*** In this phase, Spark generates multiples physical plans out of the input logical plan and chooses the plan based on a cost model. The physical planner also performs rule-based physical optimizations, such as pipelining projections or filters into one Spark map operation. In addition, it can push operations from the logical plan into data sources that support predicate or projection pushdown. + + +***Code Generation :*** The final phase generates the Java byte code that should run on each machine.Catalyst transforms the Tree which is an expression in SQL to an AST for Scala code to evaluate, compile and run the generated code. A special scala feature namely quasiquotes aid in the construction of abstract syntax tree(AST). + + +## 3. Big Data Ecosystem +*Hadoop Ecosystem* + +Apache Hadoop is an open-sourced framework that supports distributed processing of large dataset. It involves a long list of projects that you can find in this table https://hadoopecosystemtable.github.io/. In this section, it is also important to understand the key players in the system, namely two parts: the Hadoop Distributed File System (HDFS) and the open-sourced implementation of MapReduce model - Hadoop. + +<figure class="main-container"> + <img src="./hadoop-ecosystem.jpg" alt="Hadoop Ecosystem" /> +</figure> +*Figure is from http://thebigdatablog.weebly.com/blog/the-hadoop-ecosystem-overview* + + +HDFS forms the data management layer, which is a distributed file system designed to provide reliable, scalable storage across large clusters of unreliable commodity machines. The idea was inspired by GFS{%cite ghemawat2003google --file big-data%}. Unlike closed GFS, HDFS is open-sourced and provides various libraries and interfaces to support different file systems, like S3, KFS etc. + +To satisfy different needs, big companies like Facebook and Yahoo developed additional tools. Facebook's Hive, as a warehouse system, can provide more declarative programming interface and translate to Hadoop jobs. Yahoo's Pig platform is an ad-hoc analysis tool that can structurize HDFS objects and support operations like grouping, joining and filtering. + + +***Spark Ecosystem*** + +Apache Spark's rich-ecosystem constitutes of third party libraries like Mesos{%cite hindman2011mesos --file big-data%}/Yarn{%cite vavilapalli2013apache --file big-data%} and several major components that have been already discussed in this article like Spark-core, SparkSQL, GraphX. +In this section we will discuss the remaining yet very important components/libraries which help Spark deliver high performance. + +<figure class="main-container"> + <img src="./spark-ecosystem.png" alt="Spark ecosystem" /> +</figure> + +*Spark Streaming - A Spark component for streaming workloads* + +Spark achieves fault tolerant, high throughput data streaming workloads in real-time through a light weight Spark Streaming API. Spark streaming is based on Discretized Streams model{% cite d-streams --file big-data%}. Spark Streaming processes streaming workloads as a series of small batch workloads by leveraging the fast scheduling capacity of Apache Spark Core and fault tolerance capabilities of a RDD. A RDD in here represents each batch of streaming data and transformations are applied on the same. Data source in Spark Streaming could be from many a live streams like Twitter, Apache Kafka, Akka Actors, IoT Sensors, Amazon Kinesis, Apache Flume, etc. Spark streaming also enables unification of batch and streaming workloads and hence developers can use the same code for both batch and streaming workloads. It supports integration of streaming data with historical data. + + +*Apache Mesos* + +Apache Mesos{%cite hindman2011mesos --file big-data%} is an open source heterogenous cluster/resource manager developed at the University of California, Berkley and used by companies such as Twitter, Airbnb, Netflix etc. for handling workloads in a distributed environment through dynamic resource sharing and isolation. It aids in the deployment and management of applications in large-scale clustered environments. Mesos abstracts node allocation by combining the existing resources of the machines/nodes in a cluster into a single pool and enabling fault-tolerant elastic distributed systems. Variety of workloads can utilize the nodes from this single pool voiding the need of allocating specific machines for different workloads. Mesos is highly scalable, achieves fault tolerance through Apache Zookeeper {%cite hunt2010zookeeper --file big-data%} and is a efficient CPU and memory-aware resource scheduler. + + +*Alluxio/Tachyon* + +Alluxio/Tachyon{% cite li2014tachyon --file big-data%} is an open source memory-centric distributed storage system that provides high throughput writes and reads enabling reliable data sharing at memory-speed across cluster jobs. Tachyon can integrate with different computation frameworks, such as Apache Spark and Apache MapReduce. In the big data ecosystem, Tachyon fits between computation frameworks or jobs like spark or mapreducce and various kinds of storage systems, such as Amazon S3, OpenStack Swift, GlusterFS, HDFS, or Ceph. It caches the frequently read datasets in memory, thereby avoiding going to disk to load every dataset. In Spark RDDs can automatically be stored inside Tachyon to make Spark more resilient and avoid GC overheads. + + + + + +## References +{% bibliography --file big-data %} diff --git a/chapter/8/cluster-overview.png b/chapter/8/cluster-overview.png Binary files differnew file mode 100644 index 0000000..b1b7c1a --- /dev/null +++ b/chapter/8/cluster-overview.png diff --git a/chapter/8/ecosystem.png b/chapter/8/ecosystem.png Binary files differnew file mode 100644 index 0000000..c632ec2 --- /dev/null +++ b/chapter/8/ecosystem.png diff --git a/chapter/8/edge-cuts.png b/chapter/8/edge-cuts.png Binary files differnew file mode 100644 index 0000000..e9475a8 --- /dev/null +++ b/chapter/8/edge-cuts.png diff --git a/chapter/8/hadoop-ecosystem.jpg b/chapter/8/hadoop-ecosystem.jpg Binary files differnew file mode 100644 index 0000000..2ba7aa9 --- /dev/null +++ b/chapter/8/hadoop-ecosystem.jpg diff --git a/chapter/8/spark-ecosystem.png b/chapter/8/spark-ecosystem.png Binary files differnew file mode 100644 index 0000000..d3569fc --- /dev/null +++ b/chapter/8/spark-ecosystem.png diff --git a/chapter/8/spark_pipeline.png b/chapter/8/spark_pipeline.png Binary files differnew file mode 100644 index 0000000..ac8c383 --- /dev/null +++ b/chapter/8/spark_pipeline.png diff --git a/chapter/8/sparksql-data-flow.jpg b/chapter/8/sparksql-data-flow.jpg Binary files differnew file mode 100644 index 0000000..1cf98f5 --- /dev/null +++ b/chapter/8/sparksql-data-flow.jpg diff --git a/chapter/8/sql-vs-dataframes-vs-datasets.png b/chapter/8/sql-vs-dataframes-vs-datasets.png Binary files differnew file mode 100644 index 0000000..600c68b --- /dev/null +++ b/chapter/8/sql-vs-dataframes-vs-datasets.png diff --git a/chapter/8/trash.md b/chapter/8/trash.md new file mode 100644 index 0000000..c9b90fe --- /dev/null +++ b/chapter/8/trash.md @@ -0,0 +1,53 @@ +## Trash + + +## Performance +`TODO: re-organize` There are some practices in this paper that make the model work very well in Google, one of them is **backup tasks**: when a MapReduce operation is close to completion, the master schedules backup executions of the remaining in-progress tasks ("straggler"). The task is marked as completed whenever either the primary or the backup execution completes. +In the paper, the authors measure the performance of MapReduce on two computations running on a large cluster of machines. One computation *grep* through approximately 1TB of data. The other computation *sort* approximately 1TB of data. Both computations take in the order of a hundred seconds. In addition, the backup tasks do help largely reduce execution time. In the experiment where 200 out of 1746 tasks were intentionally killed, the scheduler was able to recover quickly and finish the whole computation for just a 5% increased time. +Overall, the performance is very good for conceptually unrelated computations. + + + +## Outline +- 1. Programming Models + - 1.1. Data parallelism: what is data parallelism and how do the following models relate to each other? + - 1.1.1 MapReduce + - 1.1.2 FlumeJava + - 1.1.3 Dryad + - 1.1.4 Spark + + - 1.2. Querying: we need more declarative interfaces, built on top MR models. + - Sawzall {%cite pike2005interpreting --file big-data %}: first one propose + - Pig {% cite olston2008pig --file big-data %}: on top of Hadoop, independent of execution platform, in theory can compiled into DryadLINQ too; what is the performance gain/lost? Easier to debug? + - Hive {%cite thusoo2009hive --file big-data %} + - DryadLINQ: SQL-like, uses Dryad as execution engine; + `Suggestion: Merge this with Dryad above?` + - Dremel, query natively w/o translating into MR jobs + - Spark SQL {%cite --file big-data %} - Limitations of Relational alone models? how SparkSQL model overcomes it? goals of SparkSQL? how it leverages the Spark programming model? what is a DataFrame and how is it different from a RDD? what are the operations a DataFrame provides? how is in-memory caching different from Spark? + + - 1.3. Large-scale Parallelism on Graphs + - Why a separate graph processing model? what is a BSP? working of BSP? Do not stress more since its not a map reduce world exactly. + - GraphX programming model - discuss disadvantages graph-parallel model to data parallel model for large scale graph processing? how graphX combines the advantages of both the models? representation of a graph in GraphX? discuss the model, vertex cut partitioning and its importance? graph operations ? + + +- 2. Execution Models + - 2.1 MapReduce (intermediate writes to disk): What is the sequence of actions when a MapReduce functions are called? How is write-to-disk good/bad (fault-tolerant/slow)? How does the data are transmitted across clusters efficiently (store locally)? To shorten the total time for MR operations, it uses backup tasks. When MR jobs are pipelined, what optimizations can be performed by FlumeJava? In spite of optimizations and pipelining, what is the inherent limitation (not support iterative algorithm?) + - 2.2 Spark (all in memory): introduce spark architecture, different layers, what happens when a spark job is executed? what is the role of a driver/master/worker, how does a scheduler schedule the tasks and what performance measures are considered while scheduling? how does a scheduler manage node failures and missing partitions? how are the user defined transformations passed to the workers? how are the RDDs stored and memory management measures on workers? do we need checkpointing at all given RDDs leverage lineage for recovery? if so why ? + - 2.3 Graphs : + - Pregel :Overview of Pregel. Its implementation and working. its limitations. Do not stress more since we have a better model GraphX to explain a lot. + - GraphX : Working on this. + - SparkSQL Catalyst & Spark execution model : Discuss Parser, LogicalPlan, Optimizer, PhysicalPlan, Execution Plan. Why catalyst? how catalyst helps in SparkSQL , data flow from sql-core-> catalyst->spark-core + +- 3. Evaluation: Given same algorithm, what is the performance differences between Hadoop, Spark, Dryad? There are no direct comparison for all those models, so we may want to compare separately: + - Hadoop vs. Spark + - Spark vs. SparkSQL from SparkSQL paper + +- 4. Big Data Ecosystem + Everything interoperates with GFS or HDFS, or makes use of stuff like protocol buffers so systems like Pregel and MapReduce and even MillWheel... + - GFS/HDFS for MapReduce/Hadoop: Machines are unreliable, how do they provide fault-tolerance? How does GFS deal with single point of failure (shadow masters)? How does the master manage partition, transmission of data chunks? Which + - Resource Management: Mesos. New frameworks keep emerging and users have to use multiple different frameworks(MR, Spark etc.) in the same clusters, so how should they share access to the large datasets instead of costly replicate across clusters? + - Introducing streaming: what happens when data cannot be complete? How does different programming model adapt? windowing `todo: more` + + 2015 NSDI Ousterhout + + latency numbers that every programmer should know diff --git a/chapter/8/vertex-cuts.png b/chapter/8/vertex-cuts.png Binary files differnew file mode 100644 index 0000000..b256630 --- /dev/null +++ b/chapter/8/vertex-cuts.png diff --git a/resources/img/data-parallelism.png b/resources/img/data-parallelism.png Binary files differnew file mode 100644 index 0000000..eea5bf8 --- /dev/null +++ b/resources/img/data-parallelism.png diff --git a/resources/img/mapreduce-execution.png b/resources/img/mapreduce-execution.png Binary files differnew file mode 100644 index 0000000..090878d --- /dev/null +++ b/resources/img/mapreduce-execution.png diff --git a/resources/img/rpc_chapter_1_asyncrpc.jpg b/resources/img/rpc_chapter_1_asyncrpc.jpg Binary files differnew file mode 100644 index 0000000..800c57b --- /dev/null +++ b/resources/img/rpc_chapter_1_asyncrpc.jpg diff --git a/resources/img/rpc_chapter_1_syncrpc.jpg b/resources/img/rpc_chapter_1_syncrpc.jpg Binary files differnew file mode 100644 index 0000000..b497fbb --- /dev/null +++ b/resources/img/rpc_chapter_1_syncrpc.jpg diff --git a/resources/img/rpc_chapter_1_ycog_10_steps.png b/resources/img/rpc_chapter_1_ycog_10_steps.png Binary files differnew file mode 100644 index 0000000..86c7432 --- /dev/null +++ b/resources/img/rpc_chapter_1_ycog_10_steps.png |
