Actors in Scala

zhagnzhi

贡献于2012-06-06

字数:0 关键词: Scala开发 Scala

Actors Scala in artima Philipp Haller Frank Sommers Concurrent programming for the multi-core era Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Actors in Scala PrePrint™ Edition Cover · Overview · Contents · Discuss · Suggest · Glossary · Index iii Thank you for purchasing the PrePrint™ Edition of Actors in Scala. A PrePrint™ is a work-in-progress, a book that has not yet been fully written, reviewed, edited, or formatted. W e are publishing this book as a PrePrint™ for two main reasons. First, e v e n though this book is not quite finished, the information contained in its pages can already provide v a l u e to many readers. Second, we hope to get reports of errata and suggestions for improvement from those readers while we still have time to incorporate them into the first printing. As a PrePrint™ customer, you’ll be able to download new PrePrint™ v e r s i o n s from Artima as the book e v o l v e s , as well as the final PDF of the book once finished. Y o u ’ l l have access to the book’s content prior to its print publication, and can participate in its creation by submitting feedback. Please submit by clicking on the Suggest link at the bottom of each page. Thanks for your participation. W e hope you find the book useful and enjoyable. Bill V e n n e r s President, Artima, Inc. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Actors in Scala PrePrint™ Edition Philipp Haller, Frank Sommers artima ARTIMAPRESS WALNUTCREEK,CALIFORNIA Cover · Overview · Contents · Discuss · Suggest · Glossary · Index v Actors in Scala PrePrint™ Edition, V e r s i o n 2 Philipp Haller is a research assistant in the School of Computer and Communication Sciences at EPFL in Lausanne, Switzerland. Frank Sommers is president of Autospaces, Inc. Artima Press is an imprint of Artima, Inc. P.O. Box 305, W a l n u t Creek, California 94597 Copyright © 2010-2011 Philipp Haller and Frank Sommers. All rights reserved. PrePrint™ Edition first published 2010 Build date of this impression March 3, 2011 Produced in the United States of America No part of this publication may be reproduced, modified, distributed, stored in a retrieval system, republished, displayed, or performed, for commercial or noncommercial purposes or for compensation of any kind without prior written permission from Artima, Inc. This PDF PrePrint™ is prepared exclusively for its purchaser. The purchaser of this PrePrint™ Edition may download, view on-screen, and print it for personal, noncommercial use only, provided that all copies include the following notice in a clearly visible position: “Copyright © 2010 Artima, Inc. All rights reserved.” The purchaser may store one electronic copy and one electronic backup, and may print one copy, for personal, noncommercial use only. All information and materials in this book are provided “as is” and without warranty of any kind. The term “ A r t i m a ” and the Artima logo are trademarks or registered trademarks of Artima, Inc. All other company and/or product names may be trademarks or registered trademarks of their o w n e r s . Cover · Overview · Contents · Discuss · Suggest · Glossary · Index to H.M. -P.H. to Jungwon -F.S. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Overview Contents viii List of Figures x List of Listings xii 1. Concurrency Everywhere 14 2. Messages All the W a y Up 24 3. Scala’s Language Support for Actors 40 4. Actor Chat 41 5. Event-Based Programming 51 6. Exception Handling, Actor Termination and Shutdown 64 7. Customizing Actor Execution 79 8. Remote Actors 94 9. Using Scala Actors with Java APIs 100 10. Distributed and P a r a l l e l Computing 101 11. API Overview 119 Bibliography 133 About the Authors 135 Index 136 Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Contents Contents viii List of Figures x List of Listings xii 1 Concurrency Everywhere 14 1.1 Scaling with concurrency ................ 15 1.2 Actors v e r s u s threads .................. 16 1.3 The indeterministic soda machine ............ 18 2 Messages All the W a y Up 24 2.1 Control flow and data flow ................ 24 2.2 Actors and messages ................... 27 2.3 Actor creation ...................... 32 2.4 Actor e v e n t s ....................... 32 2.5 Asynchronous communication .............. 35 2.6 Y o u ’ v e got mail: indeterminacy and the role of the arbiter 37 2.7 Actor lifecycle ...................... 39 3 Scala’s Language Support f o r Actors 40 4 Actor Chat 41 4.1 Defining message classes ................ 42 4.2 Processing messages ................... 42 4.3 Sending actor messages ................. 45 5 Event-Based Programming 51 5.1 Events vs. threads .................... 51 Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Contents ix 5.2 Making actors event-based: react ........... 52 5.3 Event-based futures ................... 59 6 Exception Handling, Actor Termination and Shutdown 64 6.1 Simple e x c e p t i o n handling ................ 64 6.2 Monitoring actors .................... 67 7 Customizing Actor Execution 79 7.1 Pluggable schedulers ................... 79 7.2 Managed blocking .................... 88 8 Remote Actors 94 8.1 Creating remote actors .................. 94 8.2 Remote communication ................. 96 8.3 A remote start service .................. 97 9 Using Scala Actors with J a v a APIs 100 10 Distributed and Parallel Computing 101 10.1 MapReduce ....................... 101 10.2 Reliable broadcast .................... 112 11 API Overview 119 11.1 The actor traits Reactor, ReplyReactor, and Actor . 119 11.2 Control structures .................... 125 11.3 Futures .......................... 127 11.4 Channels ......................... 128 11.5 Remote Actors ...................... 130 Bibliography 133 About the A u t h o r s 135 Index 136 Cover · Overview · Contents · Discuss · Suggest · Glossary · Index List of Figures 1.1 State transitions in a soda machine. .............. 19 1.2 Message passing between cores with indeterministic message ordering. ............................ 21 2.1 Components holding shared state require synchronization. . 25 2.2 Data flow and control flow interact in subtle ways: If speed ad- justment is needed, data flows from one component to another. There is no data flow otherwise. ............... 26 2.3 The simplest actor computation: Adding x and y together. .. 28 2.4 Actor computation with continuation message passing. ... 29 2.5 Every message carries a sender reference. .......... 29 2.6 CruiseControl actor receiving currentSpeed message. . 30 2.7 A more modular approach to cruise control with further decom- position of responsibilities into actors. ............ 31 2.8 A continuation actor included in a message affects control flow and accommodates late binding in an actor system. ..... 31 2.9 An actor can create child actors as part of its message processing, and delegate work to those child actors. ........... 33 2.10 C’s arrival e v e n t is activated by B’s arrival e v e n t . ...... 33 2.11 Event causality in an actor system: an inital e v e n t is followed by arrival e v e n t s , activations, and actor creation e v e n t s . ..... 34 2.12 Actor message with integer and reference to calculation. .. 38 4.1 Actor chat: Users subscribe to a chat room to received messages sent to that room. The chat room maintains a session of sub- scribers. ............................ 41 4.2 All communication between the chatroom and users takes place via messages. ......................... 43 x List of Figures xi 10.1 Dataflow in a basic MAPREDUCEimplementation. ..... 109 Cover · Overview · Contents · Discuss · Suggest · Glossary · Index List of Listings 4.1 Case classes for Users and messages ............ 42 4.2 Defining act ......................... 43 4.3 Incoming message patterns ................. 44 4.4 Creating and starting an actor with actor .......... 45 4.5 Representing a user as an actor inside a session ....... 46 4.6 Using the seeenderreference ................. 47 4.7 Using the r ply method ................... 48 4.8 Using message timeouts with receiveWithin ....... 49 4.9 Processing post messages .................. 50 5.1 Building a chain of event-based actors. ........... 53 5.2 The main method. ...................... 54 5.3 Sequencing react calls using a recursive method. ..... 56 5.4 Asleep method that uses react............... 57 5.5 Using andThen to continue after react........... 58 5.6 Using loopWhile for iterations with react......... 59 5.7 Image renderer using futures. ................ 60 5.8 Using react to wait for futures. ............... 61 5.9 A custom ForEach operator enables react in for-comprehensions. 62 5.10 Implementing the custom ForEach operator. ........ 62 6.1 Defining an actor-global e x c e p t i o n handler. ......... 65 6.2 Linking dependent actors. .................. 70 6.3 Receiving a notification because of an unhandled e x c e p t i o n . 72 6.4 Monitoring and restarting an actor using link and restart. 74 6.5 Using keepAlive to automatically restart a crashed actor. . 75 6.6 Reacting to Exit messages for e x c e p t i o n handling. .... 77 xii List of Listings xiii 7.1 Incorrect use of ThreadLocal................ 82 7.2 Saving and restoring a ThreadLocal............. 82 7.3 Executing actors on the Swing e v e n t dispatch thread. .... 83 7.4 Creating daemon-style actors. ................ 85 7.5 Synchronizing the speed of Gear actors. .......... 86 7.6 Blocked actors may lock up the thread pool. ........ 89 7.7 Using managed blocking to prevent locking up the thread pool. 91 8.1 Making the chat room actor remotely accessible. ...... 95 8.2 A server actor implementing a remote start service. .... 98 8.3 An echo actor that can be started remotely. ......... 99 10.1 A function for b u i l d i n g an inverted index. .......... 103 10.2 A basic MAPREDUCEimplementation. ........... 106 10.3 Applying the reducing function in parallel. ........ 108 10.4 AMAPREDUCEimplementation that tolerates mapper faults. 110 10.5 AMAPREDUCEimplementation with coarse-grained worker tasks. ............................. 113 10.6 Best-effort broadcasting. ................... 114 10.7 Using the broadcast implementation in user code. ..... 115 10.8 A reliable broadcast actor. .................. 117 10.9 Sending messages with time stamps inside the broadcast method. 118 11.1 Using andThen for sequencing. ............... 126 11.2 Scope-based sharing of channels. .............. 129 11.3 Sharing channels via messages. ............... 130 Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Actors in Scala PrePrint™ Edition Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Chapter 1 Concurrency Everywhere The actor model of concurrency was born of a practical need: When Carl Hewitt and his team at MIT first described actors in the 1970s, computers were relatively slow.1 While it was already possible to divide up work among several computers and to compute in parallel, Hewitt’s team wanted a model that would not only simplify b u i l d i n g such concurrent systems, b u t would also let them reason about concurrent programs in general. Such reasoning, it was believed, would allow developers to be more certain that their concurrent programs worked as intended. Although actor-based concurrency has been an important concept e v e r since, it is only now gaining wide-spread acceptance. That is in part because until recently no widely used programming language offered first-class sup- port for actors. An effective actors implementation places a great b u r d e n on a host language, and few mainstream languages were up to the task. Scala rises to that challenge, and offers a full-featured implementation of actor- based concurrency on the Java virtual machine (JVM).2 Because Scala code seamlessly interoperates with code and libraries written in Java, or any other JVM language, Scala actors offer an e x c i t i n g and practical way to b u i l d scal- able and reliable concurrent programs. Like many powerful concepts, the actor model can be understood and used on several levels. On one level, actor-based programming provides an easy way to e x c h a n g e messages between independently running threads or 1Hewitt et al.,“A Universal Modular ACTOR Fo r m a l i s m for Artificial Intelligence” [Hew73] 2Haller and Odersky, “Scala Actors: Unifying thread-based and event-based program- ming” [Hal09] Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 1.1 Chapter 1 · Concurrency Everywhere 15 processes. On another level, actors make concurrent programming gener- ally simpler, because actors let developers focus on high-level concurrency abstractions and shield programmers from intricacies that can easily lead to errors. On an e v e n broader level, actors are about b u i l d i n g reliable programs in a world where concurrency is the norm, not the exception—a world that is fast approaching. This book aims to e x p l a i n actor-based programming with Scala on all those levels. Before diving into the details of Scala actors, it helps to take a step back and place actors in the context of other approaches to concurrent programming, some of which may already be familiar to you. 1.1 Scaling with concurrency The mainstream computing architectures of the past decades focused on making the e x e c u t i o n of a single thread of sequential instructions faster. That led to an application of Moore’s Law to computing performance: Processor performance per unit cost has doubled roughly e v e r y eighteen months for the last twenty years, and developers counted on that trend to ensure that their increasingly complex programs performed well.3 Moore’s Law has been remarkably accurate in predicting processor per- formance, and it is reasonable to e x p e c t processor computing capacity to double e v e r y one-and-a-half years for at least another decade. T o make that increase practical, however, chip designers had to implement a major shift in their design focus in recent years. Instead of trying to improve the clock c y c l e s dedicated to e x e c u t i n g a single thread of instructions, new processor designs make it possible to e x e c u t e many concurrent instruction threads on a single chip. While the clock speed of each computing core on a chip is e x - pected to improve only marginally o v e r the next few years, processors with dozens of cores are already showing up in commodity servers, and multicore chips are the norm e v e n in inexpensive desktops and notebooks. This shift in the design of high-volume, commodity processor architec- tures, such as the Intel x86, has at least two ramifications for developers. First, because individual core clock c y c l e s will increase only modestly, we will need to pay renewed attention to the algorithmic efficiency of sequential code. Second, and more important in the context of actors, we will need to design programs such that they take maximum advantage of a v a i l a b l e pro- 3Sutter, “The free lunch is ove r : A fundamental turn toward concurrency” [Sut05] Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 1.2 Chapter 1 · Concurrency Everywhere 16 cessor cores. In other words, we not only need to write programs that work correctly on concurrent hardware, b u t also design programs that opportunis- tically scale to all a v a i l a b l e processing units or cores. 1.2 Actors v e r s u s threads In a concurrent program, many independently e x e c u t i n g threads, or sequen- tial processes, work together to fulfill an application’s requirements. Inves- tigation into concurrent programming has mostly focused on defining how concurrently e x e c u t i n g sequential processes can communicate such that a larger process—for e x a m p l e , a program that e x e c u t e d those processes—can proceed predictably. The two most common ways of communication among concurrent threads are synchronization on shared state, and message passing. Many fa- miliar programming constructs, such as semaphores and monitors, are based on shared-state synchronization. Developers of concurrent programs are fa- miliar with those structures. F o r e x a m p l e , Java programmers can find these structures in the java.util.concurrent library.4 Among the biggest chal- lenges for anyone using shared-state concurrency are a v o i d i n g concurrency hazards, such as data races and deadlocks, and scalability.5 Message passing is an alternative way of communication among coop- erating threads. There are two important categories of systems based on message passing. In channel-based systems, messages are sent to c h a n n e l s (or ports) that processes can share. Several processes can then receive mes- sages from the same shared channels. Examples of channel-based systems are MPI6 and systems based on the CSP paradigm7, such as the Go lan- guage. Systems based on actors (or agents, or Erlang-style processes8) are in the second category of message-passing concurrency. In these systems, mes- 4Goetz et al., J a v a Concurrency in Practice [Goe06] 5One of the reasons why scalability is hard to achieve using locks (or Java-style synchro- nization) is the fact that coarse-grained locking increases the amount of code that is exe c u t e d sequentially. Moreover, accessing a small number of locks (or, in the ex t r e m e case, a single global lock) from several threads may increase the cost of synchronization significantly. 6Gropp et al., Using MPI: P o r t a b l e P a r a l l e l Programming with the Message–Passing Interface [Gro99] 7Hoare, “Communicating sequential processes” [Hoa78] 8Armstrong et al., Concurrent Programming in Erlang [Arm95] Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 1.2 Chapter 1 · Concurrency Everywhere 17 sages are sent directly to actors; it is not necessary to create intermediary channels between processes. An important advantage of message passing o v e r shared-state concur- rency is that it makes it easier to a v o i d data races. If processes communicate only by passing messages, and those messages are immutable, then race con- ditions are a v o i d e d by design. Moreover, anecdotal e v i d e n c e suggests that this approach in practice also reduces the risk of deadlock. A potential dis- advantage of message passing is that the communication o v e r h e a d may be high. T o communicate, processes have to create and send messages, and these messages are often b u f f e r e d in queues before they can be received to support asynchronous communication. In contrast, shared-state concurrency enables direct access to shared memory, as long as it is properly synchro- nized. T o reduce the communication o v e r h e a d of message passing, large messages should not be transferred by copying the message state; instead, only a reference to the message should be sent. However, this reintroduces the risk for data races when several processes have access to the same muta- ble (message) data. It is an ongoing research effort to provide static checkers (for instance, the Scala compiler plug-in for uniqueness types9 that can v e r- ify that programs passing mutable messages by reference do not contain data races. Let’s take a step back, and look at actor-based programming from a higher-level perspective. T o appreciate the difference, and the relationship, between more traditional concurrency constructs and actors, it helps to pay a brief visit to the local railroad yard. Imagine yourself standing on a bridge overlooking the multitude of indi- vidual tracks entering the rail yard. Y o u can observe many seemingly inde- pendent activities taking place, such as trains arriving and leaving, cars being loaded and unloaded and so on. Suppose, then, that your job was to design such a railroad yard. Thinking in terms of threads, locks, monitors, and so on, is similar to the problem of figuring out how to make sure that trains running on parallel tracks don’t collide. It is a v e r y important requirement; without that, the rail yard would be a dangerous place, indeed. T o accomplish that task, you would employ specialized artifacts, such as semaphores, monitors, switches. Actors illuminate the same rail yard from the higher perspective of en- suring that all the concurrent activities taking place at the rail yard progress 9Haller and Odersky, “Capabilities for Uniqueness and Borrowing” [Hal10] Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 1.3 Chapter 1 · Concurrency Everywhere 18 smoothly: that all the delivery v e h i c l e s find their ways to train cars, all the trains can make their progress through the tracks, and all the activities are properly coordinated. Y o u will need both perspectives when designing a rail yard: Thinking from the relatively low-level perspective of individual tracks ensures that trains don’t inadvertently cross paths; thinking from the perspective of the entire facility helps ensure that your design faciliates smooth o v e r a l l opera- tion, and that your rail yard can scale, if needed, to accommodate increased traffic. Simply adding new rail tracks only goes so far: you need some o v e r- all design principles to ensure that the whole rail yard can grow to handle increased traffic, and that greater traffic can scale to the full capacity of the tracks. W o r k i n g on the relatively low-level details of individual tracks (or prob- lems associated with interleaving threads), on the one hand, and the higher- level perspective of the entire facility (actors) on the other, require somewhat different skills and experience. An actor-based system is often implemented in terms of threads, locks, monitors, and the like, b u t actors hide those low- level details, and allow you to think of concurrent programs from a higher v a n t a g e point. 1.3 The indeterministic soda machine In addition to allowing you to focus on the scalability aspect of concurrent applications, actors’ higher-level perspective on concurrency is helpful be- cause it provides a more realistic abstraction for understanding how concur- rent applications work. Specifically, concurrent programs e x h i b i t two char- acteristics that, while also present in sequential applications, are especially pronounced when a program is designed from the ground up to take advan- tage of concurrency. T o see what these are, we need only to stop by the office soda machine.10 A soda machine is convenient not only to quench our thirst on a hot summer day, b u t also because it’s a good metaphor for a kind of program that moves from one well-defined state to another. T o start out, a soda machine a w a i t s input from the user, perhaps prompting the user to insert some coins. Inserting those coins causes the soda machine to enter a state where it can now ask the user to make a selection of the desired drink. As soon as the user 10Hoare, “Communicating sequential processes” [Hoa78] Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 1.3 Chapter 1 · Concurrency Everywhere 19 out of orderstart expect coins expect selection dispense drink coins inserted selection made problem occurred problem occurred problem occurred drink dispensed Figure 1.1 · State transitions in a soda machine. makes that selection, the soda machine dispenses a can and moves back into its initial state. On occasion, it may also run out of soda cans—that would place it in an “out of service” state. At any point in time, a soda machine is a w a r e of only one state. That state is also global to the machine: Each component—the coin input device, the display unit, the selection entry keypad, the can dispenser, and so on—must consult that global state to determine what action to take next: F o r instance, if the machine is in the state where the user has already made his selection, the can dispenser component is allowed to release a soda can into the output tray. In addition to always being in a well-defined state, our simple abstraction suggests two further characteristics of a soda machine: First, that the number of possible states the machine can enter is finite and, second, that given any one of those possible states, we can determine in advance what the next state will be. F o r instance, if you inserted a sufficient amount of coins, you would e x p e c t to be prompted for the choice of drink. And having made that choice, you e x p e c t the selected drink to be dispensed. Of course, you’ve probably had occasions to e x p e r i e n c e soda machines that did not e x a c t l y behave in such a predictable, deterministic, way. Y o u may have inserted plenty of coins, b u t instead of being prompted for your choice, you were presented with an unwelcoming “OUT OF ORDER” mes- sage. Or you may not have received any message at all—but also did not Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 1.3 Chapter 1 · Concurrency Everywhere 20 receive your frosty refreshment, no matter how hard you pounded the ma- chine. Real-world e x p e r i e n c e teaches us that soda machines, like most phys- ical objects, are not entirely deterministic. Most of the time they move from one well-defined state to another in an e x p e c t e d , predetermined fashion; b u t on occasion they move from one state to another—to an error state, for instance—in a way that could not be predicted in advance. A more realistic model of a soda machine, therefore, should include the property of some indeterminism: A model that readily admits a soda ma- chine’s ability to shift from one state to another in a way that could not be determined in advance with certainty. Although we are generally adept at dealing with such indeterminism in physical objects—as well as when dealing with people—when we encounter such indeterminism in software, we tend to consider that behavior a b u g . Examining such “bugs” may reveal that they crept into our code because some aspect of our program was not sufficiently specified. Naturally, as developers we desire to create programs that are well- specified and, therefore, behave as expected—programs that act e x a c t l y in accord with detailed and e x h a u s t i v e specifications. Indeed, one way to pro- vide more or less e x a c t specifications for code is by writing test cases for it. Concurrent programs, however, are a bit more like soda machines than deterministic sequential code. That’s because concurrent programs gain many of their benefits due to some aspects of a concurrent system inten- tionally being left unspecified. The reason for that is easy to understand intuitively when considering a processor with four cores: Suppose that code running on the first core sends messages to code running on the three other cores, and then a w a i t s replies back from each. Upon receiving a reply, the first core performs further processing on the response message. In practice, the order in which cores 2, 3, and 4 send back their replies is determined by the order in which the three cores finish their computations. If that reply order is left unspecified, then core 1 can start processing a reply as soon as it receives one: it does not have to wait for the slowest core to finish its work. In this e x a m p l e , leaving the reply order from cores 2, 3, and 4 unspecified helps to best utilize the a v a i l a b l e computing resources. At the same time, your program can no longer rely on any specific message ordering. Instead, your application must function deterministically e v e n though its component Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 1.3 Chapter 1 · Concurrency Everywhere 21 Core 1 Core 3 Core 2 Figure 1.2 · Message passing between cores with indeterministic message ordering. computations, or how those components interact, may not be fully specified. One application of b u i l d i n g deterministic systems out of indeterministic component computations are data centers constructed of commodity, off-the- shelf (COTS) components. Many well-known W e b services companies have proven the economic advantages of using COTS hardware as basic b u i l d - ing blocks for highly reliable data centers. Such an environment becomes practical when infrastructure software alleviates the need for developers to concern themselves with the intricacies of how such a data center partitions work between the v a r i o u s hardware components. Instead, application devel- opers can focus on higher-level concerns, such as specifying the algorithms to use when servicing an incoming request. A popular e x a m p l e of an infrastructure that makes programming on COTS clusters easier is MapReduce.11 W i t h MapReduce, a user provides some data, as well as some algorithms to operate on that data, and submits that as a request to the MapReduce infrastructure software. The MapReduce software, in turn, distributes the workload required to compute the specified request across a v a i l a b l e cluster nodes and returns a result to the user. An important aspect of MapReduce is that, upon submitting a job, a user can reasonably e x p e c t some result back. F o r instance, should a node e x - 11Dean and Ghemawat, “MapReduce: Simplified data processing on large clusters” [Dea08] Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 1.3 Chapter 1 · Concurrency Everywhere 22 ecuting parts of a MapReduce job fail to return results within a specified time period, the MapReduce software restarts that component job on an- other node. Because of its guarantee of returning a result, MapReduce not only allows an infrastructure to scale a compute-intensive job to a cluster of nodes, b u t more significantly, MapReduce lends reliability guarantees to the computation. It is that reliability aspect that makes MapReduce suitable for COTS-based compute clusters. While a developer using MapReduce can e x p e c t to receive a result back, e x a c t l y when the result will arrive cannot be known prior to submitting the job: The user knows only that a result will be received, b u t he cannot, in advance, know when that will be. More generally, the system provides a guarantee that at some point a computation is brought to completion, b u t a developer using the system cannot in advance put a time bound on the length of time a computation would run. Intuitively, it is easy to understand the reason for that: As the infrastruc- ture software partitions the computation, it must communicate with other system components—it must send messages and a w a i t replies from indi- vidual cluster nodes, for instance. Such communication can incur v a r i o u s latencies, and those communication latencies impact the time it takes to re- turn a result. Y o u can’t tell, in advance of submitting a job, how large those latencies will be. Although some MapReduce implementations aim to ensure that a job re- turns some results—albeit perhaps incomplete results—in a specified amount of time, the actors model of concurrent computation is more general: It ac- knowledges that we may not know in advance just how long a concurrent computation would take. Put another way, you cannot place a time bound in advance on the length a concurrent computation would run. That’s in contrast to traditional, sequential algorithms that model computations with well-defined e x e c u t i o n times on a given input. By acknowledging the property of unbounded computational times, ac- tors aim to provide a more realistic model of concurrent computing. While v a r y i n g communication latencies is easy to grasp in the case of distributed systems or clusters, it is also not possible in a four-core processor to tell in advance how long before cores 2, 3, and 4 will send their replies back to core 1. All we can say is that the replies will e v e n t u a l l y arrive. At the same time, unboundedness does not imply infinite times: While infinity is an intriguing concept, it lends b u t limited usefulness to realistically modeling computations. The actor model, indeed, requires that a concurrent Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 1.3 Chapter 1 · Concurrency Everywhere 23 computation terminate in finite time, b u t it also acknowledges that it may not be possible to tell, in advance, just how long that time will be. In the actor model, unboundedness and indeterminism—or, unbounded indeterminism—are key attributes of concurrent computing. While also present in primarily sequential systems, these are pervasive attributes of con- current programs. Acknowledging these attributes of concurrency and pro- viding a model that allows a developer to reason about a concurrent pro- gram in the face of those attributes are the prime goals of actors. The actor model accomplishes that by providing a surprisingly simple abstraction that can e x p r e s s program control structures you are familiar with from sequen- tial programs—such as if, while, for, and so on—and make those control structures work predictably in a system that can opportunistically scale in a concurrent environment. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Chapter 2 Messages All the W a y Up Actor-based programming aims to model the complex world of pervasive concurrency with a handful of simple abstractions. Before diving into Scala’s actors library, it is helpful to review briefly the most common Actor program- ming constructs. Scala’s actors library implements many of these features. At the same time, like many Scala APIs, the actors API is constantly e v o l v - ing, and future v e r s i o n s will likely provide e v e n more capabilities. In the fol- lowing birds-eye view of the actor programming model we refer to features that Scala actors already implement and, when relevant, point out differences between Scala actors and the more general model. 2.1 Control flow and data flow The designers of the actor programming model started out by defining suit- able abstractions for program control flow in concurrent systems. Informally, control flow in a program refers to the choice a program makes about what instructions to e x e c u t e next. Branching, switch and case statements, as well as making decisions about what to return from a method invocation are all e x a m p l e s of control flow. All b u t the most trivial programs include some form of control flow. Developers of sequential programs would not consider control flow a problematic task: After all, we routinely write if, while, and for e x p r e s - sions without thinking too much about the implications of those basic pro- gramming constructs. Concurrency, however, can make control flow more difficult to reason about. That’s because control flow often depends on some logic, data, or state embedded in the program. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.1 Chapter 2 · Messages All the W a y Up 25 currentSpeed reads currentSpeed variable requests throttle adjustment SpeedMonitor CruiseControl Engine Figure 2.1 · Components holding shared state require synchronization. In small programs, data and the control structures using that data may be defined close to each other, e v e n in the same object. As a program grows in size, however, control flow decisions will need to consult bits of data—or program state—defined in other parts of the program. F o r e x a m p l e , the fol- lowing e x p r e s s i o n requires access to the currentSpeed and desiredSpeed v a r i a b l e s : if (currentSpeed != dddesiredSpeed) channngeSpeed(dddesireSpeed - currentSpeed) else mai tainSpee () The currentSpeed and desiredSpeed v a l u e s are defined outside the if-else control structure, perhaps e v e n outside the method or object con- taining the control flow expression. Similarly, the code implementing the changeSpeed and maintainSpeed methods may access, as well as alter, program state defined elsewhere in the program. Therefore, such methods require access to program state that other objects e x p o s e and share. In the above e x a m p l e , for instance, aSpeedMonitor object may have a public currentSpeed accessor method that any other object in the program can invoke. currentSpeed then becomes part of a globally visible program state. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.1 Chapter 2 · Messages All the W a y Up 26 (no data flow) CruiseControl Engine (data flow) CruiseControl Engine speed adjustment if (currentSpeed != desiredSpeed) else Figure 2.2 · Data flow and control flow interact in subtle ways: If speed adjustment is needed, data flows from one component to another. There is no data flow otherwise. Globally visible program state defined across v a r i o u s objects is not a problem as long as only a single thread accesses that state. If many con- currently e x e c u t i n g threads need to access globally visible state, however, a developer must carefully synchronize access to objects holding that state. Synchronized access to shared data is not only difficult to get right, b u t can also reduce the amount of concurrency a v a i l a b l e in a system. T o see why, consider that there are conceptually two different kinds of changes taking place as a concurrent program e x e c u t e s : First, v a r i o u s threads of e x e c u t i o n , starting from the beginning of the program, wind their ways through possible paths based on program control flow. Those threads, in turn, can alter the v a l u e s of v a r i a b l e s holding the program’s state. Y o u can think of those state changes as defining the program’s data flow.A developer must carefully identify e v e r y point in the program’s data flow that can be altered by, and in turn affect, other e x e c u t i o n threads, and guard against undesired side-effects. A proven way to guard against unwanted conflicts between data flow and control flow is to serialize the program’s data flow across concurrent threads of e x e c u t i o n . Using special serialization constructs, such as locks, monitors, and semaphores, a developer specifies that threads must affect data flow in a strict order. Defining such serialization in Java or Scala has become much easier with Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.2 Chapter 2 · Messages All the W a y Up 27 the introduction of the java.util.concurrent package. While serializing access to globally visible program state helps define correct program behavior, it may reduce some of the benefits of concurrent e x e c u t i o n : As we mentioned in the previous chapter, the benefits of concur- rency come about as a result of having few requirements about the order in which threads wind their way through a program and access program state. In effect, synchronization turns parts of a program into sequential code be- cause only one thread at a time can access the global, or shared, state. Indeed, if control structures through a program rely on globally visible state, there is no way around serialized access to that state without risking incorrect be- havior. A key contribution of the actor model is to define control structures in a way that minimizes reliance on global program state. Instead, all the state— or knowledge—needed to make control flow decisions are co-located with the objects that make those decisions. Such objects, in turn, direct control flow only—or mostly—based on data locally visible to them. That princi- ple of locality renders data flow and control flow in a program inseparable, reducing the requirement for synchronization. That, in turn, maximizes the potential for concurrency. Although actor-based systems consider global state to be e v i l , in practice some control structures still need access to globally visible state. Recent additions to Scala’s actors library make it easier to reason about such shared state in the context of actors, and we will highlight those features later in this book. 2.2 Actors and messages The main mechanism for unifying control flow and data flow is a special ab- straction, the actor, and the message-based communication that takes place between actors. An actor is any object with the capability to e x c h a n g e mes- sages with other actors. In the actor programming model, actors communi- cate solely by passing messages to each other. In a pure actor system, e v e r y object is an actor. F o r instance, in Erlang, another language that defines an actor programming model, e v e n atomic ob- jects, such as Ints and Strings, are actors. Scala’s actors library, by con- trast, allows you to easily turn any Scala object into an actor, b u t does not require that all objects be actors. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.2 Chapter 2 · Messages All the W a y Up 28 n t a actor ar thmet c actor x y z Figure 2.3 · The simplest actor computation: Adding x and y together. Actors have a uniform public interface: An actor can, in general, accept any kind of message. When an actor receives a message from another actor, the receiving actor e x a m i n e s , or e v a l u a t e s , the incoming message. Based on the contents and type of that message, the receiving actor may find the message interesting; otherwise, it simply discards the message. When an actor is interested in an incoming message, it may perform some action in response to that message. The action depends on the actor’s internal script or program, as well as the actor’s current state. The ability to perform actions in response to incoming messages is what makes an object an actor. An actor’s response to an incoming message can take different forms. The simplest response is to merely e v a l u a t e the message’s content. Perform- ing addition of integers x and y in an actor-based system, for instance, would consist of a message containing x and y sent to an actor capable of adding the integers together. In that case, the arithmetic actor would simply e v a l u a t e the sum of x and y. Of course, merely adding two numbers together is of little use if the result is not visible outside the actor performing the evaluation. Thus, a more useful actor message would contain the address of another actor interested in receiving the result. Reference to another actor in a message is the receiving actor’s continua- tion: Upon e v a l u a t i n g a message according to an internal script, an actor can send the results of that e v a l u a t i o n to its continuation. Including references to a continuation in an actor’s message means that the actor programming model implicitly supports continuation-passing style (CPS), b u t generalized Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.2 Chapter 2 · Messages All the W a y Up 29 n t a actor ar thmet c actor x y z c cont nuat on actor (c) z = x + y Figure 2.4 · Actor computation with continuation message passing. x y c sender Figure 2.5 · Every message carries a sender reference. to concurrent programming.1 The simplest kind of continuation is a reference to the sending actor. Having access to a message’s sender is so convenient that the Scala actors library implicitly includes a reference to the sending actor in messages. An actor’s continuation is a key element of control flow in actor-based programming: Program control flows from one actor to another as continu- ations are passed between actors. At the same time, the actor message that sends possible continuations may also include the data required by the actor to determine control flow. The actor model unifies control flow and data flow in the sense that data as well as an actor’s continuation can be passed inside the same message. That unified view makes it easier to design actor-based programs. When 1Agha, “Concurrent Object-Oriented Programming” [Agh90] Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.2 Chapter 2 · Messages All the W a y Up 30 SpeedMonitor CruiseControl actor currentSpeed Figure 2.6 · CruiseControl actor receiving currentSpeed message. designing a program with actors, it is helpful to first determine the kinds of control flow your code requires. Those control decisions would be made by actors. Thus, you would next define what data those control flow decisions require, and send that data to the appropriate actors inside messages. The speed maintenance control structure in e x a m p l e above, for instance, requires a decision about whether to maintain or reduce the current speed. That decision needs just the current and desired speed v a l u e s . The simplest implementation merely e v a l u a t e s the v a l u e s in the incoming message and takes appropriate action based on those v a l u e s . Note that the message sender does not have to be an actor. A more modular approach would define an actor responsible for deciding the required speed adjustment, and would then send the result to a continua- tion, as shown in Figure 2.7. One advantage of the actor-based approach is that it allows the continuation of CruiseControl—ThrottleControl—to be defined af- ter CruiseControl is already defined—and e v e n after an instance of CruiseControl is already initialized and loaded into memory: ThrottleControl is simply an actor with the uniform actor interface to receive messages. Thus all CruiseControl needs is a reference to the con- tinuation actor, such as that actor’s address. The ability to perform such e x t r e m e late binding of a continuation allows developers to incrementally add knowledge—such as control flow—to an actor-based system. Indeed, actors grew out of the desire to create large knowledge-based systems in an incremental fashion. Late binding in actor control flow is also an important tool in lending ro- b u s t n e s s to an actor-based system. F o r instance, an actor may be redundantly defined, allowing a message sender to send replicated messages. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.2 Chapter 2 · Messages All the W a y Up 31 Engine SpeedMonitor CruiseControl actor ThrottleControl actor current speed thrott e adjustment speed adjustment Figure 2.7 · A more modular approach to cruise control with further decom- position of responsibilities into actors. throttleControl currentSpeed Figure 2.8 · A continuation actor included in a message affects control flow and accommodates late binding in an actor system. If actors interacting via messages sounds similar to how objects commu- nicate in an object-oriented system, that likeness is no mere coincidence. In- deed, the actor model was developed at the same time the first object-oriented languages were designed, and was, in turn, influenced by object-oriented concepts. Alan Kay, an inventor of object-oriented programming, noted that message passing between objects is more central to object-oriented program- ming than objects themselves are. In a note to a Smalltalk discussion group, Kay wrote:2 The big idea is “messaging” – that is what the kernel of Smalltalk/Squeak is all about (and it’s something that was never 2Kay, an email on messaging in Smalltalk/Squeak. [Kay98] Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.3 Chapter 2 · Messages All the W a y Up 32 quite completed in our Xerox PARC phase). The Japanese have a small word – ma – for “that which is in between” – perhaps the nearest English equivalent is “interstitial.” The key in mak- ing great and growable systems is much more to design how its modules communicate rather than what their internal properties and behaviors should be. .. The actor model can be viewed as a special case of object-oriented pro- gramming where all communication between objects takes place via mes- sage passing, and when an object’s internal state changes only in response to messages. 2.3 Actor creation An actor can send a message only to its acquaintances—other actors whose addresses it knows. Continuation passing is one way in which an actor can learn the addresses of other actors. Another way is for an actor to create other actors as part of e v a l u a t i n g a message. Such newly created actors—child actors—can have an independent lifetime from that of the creating actor. Having created new actors, the creating actor can send messages to the new actors, possibly passing its o w n address as part of those messages. An actor’s ability to create other actors makes it easy to implement fork- join parallelism, for instance: Upon receiving a message, an actor may de- cide to divide up a potentially compute-intensive job and create child actors for the purpose of processing parts of that larger computation. A creator ac- tor would divvy up work among its child actors, and wait for the children to complete their work and send their results back to the parent. Once all the results have been collected, the parent actor can summarize those results, possibly sending the results to yet another actor, or continuation. W e will provide several e x a m p l e s of fork-join parallelism in later chapters. 2.4 Actor e v e n t s Although we have so far focused on an actor’s ability to send messages to other actors, all the “action” in an actor takes place at the time a message is r e c e i v e d . Receiving messages and creating other actors are two e x a m p l e s of e v e n t s in an actor system. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.4 Chapter 2 · Messages All the W a y Up 33 ch d actor 1 ch d actor 2 mage process ng actor create child, then send send, then die create child, then send send, then d e Figure 2.9 · An actor can create child actors as part of its message processing, and delegate work to those child actors. actor A actor B actor C A’s act vat on event B’s act vat on event B’s arr va event C’s arr va event Figure 2.10 · C’s arrival e v e n t is activated by B’s arrival e v e n t . Events and their relationships illuminate how physical phenomena in- spired the actor programming model. F o r instance, when actor B receives a message from actor A, B can send a message to C as a result, defining an order of arrival e v e n t s . In this e x a m p l e , the message sent to B caused, or activated, e v e n t ( c ) . In their seminal paper on the “Laws for Communicating P a r a l l e l Pro- cesses,” Carl Hewitt and Henry Baker noted that3, Activation is the actor notion of causality. . . A crude analogy 3Hewitt and Baker, “Laws for Communicating Pa r a l l e l Processes” [HB77] Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.4 Chapter 2 · Messages All the W a y Up 34 create child, then send send, then d e mage process ng actor ch d actor 1 1 2 3 4 5 6 t 1 2 3 4 5 6 arr va event creat on event act vat on event arr va event act vat on event arr va event Figure 2.11 · Event causality in an actor system: an inital e v e n t is followed by arrival e v e n t s , activations, and actor creation e v e n t s . from physics may make activation more clear. A photon (mes- sage) is received by an atom (target) which puts it into an e x c i t e d state. After a while, the atom gives off one or more photons and returns to its ground state. These emitted photons may be re- ceived by other atoms, and these secondary e v e n t s are said to be activated by the first e v e n t . In addition to arrival e v e n t s and actor creation e v e n t s , an actor-based system also includes some “initial e v e n t ” that gets the ball rolling, so to speak. Causality e x t e n d s to all three types of e v e n t s : The initial e v e n t must precede all other e v e n t s and may include a set of initial actors. Those actors can process activation e v e n t s from each other in any order. Finally—and obviously—an actor’s creation e v e n t must always precede activation e v e n t s targeting that actor. W e can see that arrival and activation e v e n t s nicely line up in a time- ordered sequence, one e v e n t always occurring before the other. Indeed, an actor-based computation can be described as a linear ordering of combined arrival and activation e v e n t s : A computation starts with some e v e n t , which Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.5 Chapter 2 · Messages All the W a y Up 35 is then followed by a finite number of other e v e n t s , and is finally terminated by the computation’s last e v e n t . The order of e v e n t s is strict in the sense that an e v e n t can only be influenced by other e v e n t s that preceded it. When we say one e v e n t occurs before—or after—another e v e n t , we in- tuitively refer to some notion of time. In a sequential computation, when the entire program state is shared globally, the sequence of e v e n t s that make up the computation refers to the global time: T i m e shared by all objects par- ticipating in the computation. An actor-based system, by contrast, splits the global program state into many local states held by each actor. Those ac- tors interact merely by passing messages, and have no reference to a shared notion of time. Instead, arrival orderings of e v e n t s in an actor-based system refer to the time local to an actor—there is no requirement to have a notion of global time. V i e w i n g computations as a partial order of actor e v e n t s occurring in local time to an actor turns out to be a powerful abstraction. The designers of the actor programming model demonstrated that you can implement any control structure as a sequence of actor e v e n t s . Because actor-based programming is designed with concurrency as its basic assumption, it is theoretically possi- ble, therefore, to implement any sequential program in a concurrent manner with actor messaging. 2.5 Asynchronous communication The reason actors ignore message sending as an e v e n t , and emphasize mes- sage arrival instead, is that message transmission between actors may incur some delay. F o r instance, actor A may send a message to B, and include C as a continuation. Although C’s message is activated by A, there may be some delay between A sending the message and C receiving a message. The delay may be due to some processing time incurred at B as well as to communica- tion latency. Considering message delay as an integral part of a computation is another way actor communication differs from simple object invocation in object-oriented programming. Practical actor-based systems deal with message delay by offering the ability to asynchronously pass messages between actors: Once an actor dis- patches a message to another actor, the sending actor can resume its work immediately. The sending actor does not need to wait for a reply. Indeed, some actor messages will never produce a reply. When replies are e x p e c t e d , Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.5 Chapter 2 · Messages All the W a y Up 36 those will also be sent asynchronously. As Carl Hewitt noted, actors com- municate via messages following the principle of “send it and forget it.” Y o u might already be familiar with the concept of asynchronous mes- sage passing from modern W e b programming models, such as AJAX. AJAX is based on asynchronous messages e x c h a n g e d between a W e b browser and a remote server. AJAX has proven practical in W e b applications because an unknown amount of latency may be incurred both in the network commu- nication as well as in the server processing an incoming message. A W e b client can simply send a message to the server, register a listener for future replies from the server, and immediately return control to the user interface, keeping the application responsive to user interaction. Similarly, asynchronous messages in actor communication means that the actor model works equally well across networks as it does in a single ad- dress space. Indeed, the Scala actors library defines both “local“ as well as “remote” actors. Switching between local and remote actors is surprisingly simple because asynchronous messaging works well in either case. In ad- dition to asynchronous messaging primitives, the Scala actors API provides for synchronous message sending as well. The “send it and forget it” principle assumes that all messages sent are e v e n t u a l l y received by the target actor. Although in many systems the no- tion of “lost” messages is real—for instance, the server hosting a target ac- tor may crash resulting in the target never receiving the message—the ac- tor model assumes that infrastructure components ensure reliable message transmission. In other words, the actor model assumes a finite—although initially unknown or unbounded—amount of time between message sending and message transmission. How to achieve reliable message transmission is no more a part of the actor-based programming model than, say, the problem of high availability for database management systems is a part of relational algebra and SQL programming. The actor programming model nevertheless makes the im- plementation of highly reliable and a v a i l a b l e systems much easier: Reliabil- ity is often achieved through redundancy and replication, and actors’ natural propensity to work well in distributed, concurrent systems serves those needs well. W e will provide throughout this book e x a m p l e s and best practices for achieving reliable actor communication. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.6 Chapter 2 · Messages All the W a y Up 37 2.6 Y o u ’ v e got mail: indeterminacy and the role of the arbiter Although the actor model doesn’t prescribe a mechanism for reliable mes- sage delivery, it acknowledges that many messages may be sent to a single actor in quick succession. Rapidly arriving messages could result in a sort of denial-of-service for the actor, rendering the actor incapable of process- ing the incoming message flow. T o alleviate that problem, the actor model requires that a special object be provided by each actor for the purpose of receiving messages and holding those messages until the actor is able to pro- cess them. Such an arbiter is often called a mailbox, since it provides a function similar to, say, an email account: Messages can arrive in the mail- box at any time and will be held there until the recipient is ready to process them. Email clients give you complete freedom in choosing the order in which you read new messages. In a similar way, an actor’s mailbox may provide the actor with messages in any order. The only requirement is that an actor process one message at a time from its mailbox. Because the order in which messages are processed by an actor cannot be determined in advance—the order of message delivery is indeterminate—a developer must ensure that the correctness of an actor-based program does not depend on any specific message order.4 The actor model makes such programming practices easy, however, be- cause any sort of data can be contained in an actor message, and also because an actor is able to maintain its o w n state. Consider, for instance, an actor that sums up two integers and sends the result to a third actor. In the simplest implementation, a single actor message would contain the two integers as well as the continuation: That implementation assumes that both integers were known to the mes- sage sender. Another implementation may process integers from separate senders, e x p e c t i n g a message with a single integer, in addition to a name that uniquely identifies the calculatation to the calculation: Since addition is commutative, the order of message transmission does not matter: The actor saves a w a y the initial v a l u e received for a calcula- 4 In Scala actors, the guarantees of message delivery are a bit stronger than the full indeterminacy of the pure actor programming model. If an actor sends several messages to the same receiver, those messages arrive in the receiving actor’s mailbox in the order in which they have been sent. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.6 Chapter 2 · Messages All the W a y Up 38 mage 7 p xe s (0, 1) computation tag} Figure 2.12 · Actor message with integer and reference to calculation. tion and, upon receiving the second integer for the calculation, performs the arithmetic operation and sends the reply. Consider, however, a v e r s i o n of the arithmetic actor designed to add a set of integers. One problematic approach would be to have each integer message include alastElement flag indicating whether it is the terminal element of the series. As soon as the actor receives the last element in the series, it could send the result to the continuation. But since message deliv- ery order is not guaranteed, the last element may be received in any order, resulting possibly in a premature sending of the result. Reliance on message ordering can often be alleviated by refactoring the actor communication, i.e., reworking the contents of the messages. F o r in- stance, the above message could include, instead of the lastElement flag, the number of elements in the series. Throughout this book, we will include tips and techniques to design actor communication that does not rely on mes- sage order. Indeterminacy in the actor model results because an actor’s mailbox, or arbiter, can receive and provide messages to the actor in any order. The or- der of message arrival can’t be guaranteed—or e v e n specified—due to the inevitable latencies in message transmission between actors: While a mes- sage is guaranteed to e v e n t u a l l y arrive, the messages transmission time is unbounded. As we mentioned in the previous chapter, a programming model based on unbounded indeterminism powerfully captures the nature of concurrent computation. In the actor model, concurrency is the norm, while sequential computation is a special case. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 2.7 Chapter 2 · Messages All the W a y Up 39 2.7 Actor lifecycle Because of their readiness to process incoming messages, actors can be imagined as “live objects,” or objects with a lifecycle. Unlike the lives of movie actors, the life of an actor object is rather boring: Once an actor is created, it typically starts processing incoming messages. Once an actor has e x c e e d e d its useful life, it can be stopped and destroyed, either of its o w n accord, or as a result of some “poison pill” message. Creating and starting an actor are separate, although closely related, tasks. In Scala, actors are plain old Scala objects, and can, therefore, be created via their constructors. An actor starts processing incoming messages after it has been started, which is similar to starting a Java thread. In practice, it is useful for actors to be able to monitor each others’ life- c y c l e e v e n t s . In the fork-join e x a m p l e , for instance, a child actor may decide to terminate upon sending its response to the parent, in order to free up mem- ory. It could at that point send a message to its parent actor indicating its e x i t . In Scala Actors, lifecycle monitoring is supported through actor links. Actor linking is e x p l a i n e d in detail in Section 6.2 in Chapter 6. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Chapter 3 Scala’s Language Support for Actors This chapter will e x p l a i n how Scala’s focus on scalable language features makes it possible to define the actor API as a domain-specific language em- bedded in the Scala language. This chapter will also cover the Scala features you need to understand in order to follow the e x a m p l e s later in the book. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Chapter 4 Actor Chat The previous chapters illustrate the actor programming model’s focus on message passing. Not surprisingly, much of Scala’s actors library defines a rich set of programming constructs for sending and receiving messages. These constructs appear as an internal domain-specific language—DSL—to the developer. This chapter illustrates the key elements of Scala’s actor DSL with a quintessential messaging application: a chat program. A chat program allows users to communicate with each other by e x - changing messages about v a r i o u s topics. Each topic is represented by a chat room. Users interested in following a discussion about a topic can subscribe to a chat room. Once subscribed, a user may send messages to the chat room and, in turn, receive messages from other chat room subscribers. Figure 4.1 provides an o v e r v i e w of the chat application developed in this chapter. ChatRoom session private state User User User User Figure 4.1 · Actor chat: Users subscribe to a chat room to received messages sent to that room. The chat room maintains a session of subscribers. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 4.1 Chapter 4 · Actor Chat 42 4.1 Defining message classes The chat room application’s communication centers around messages: a user sends aSubscribe message to indicate a desire to send and receive chat room messages, an Unsubscribe message to remove a user from the chat room’s session, and a UserPost message to forward to the chat room’s sub- scribers. When a chat room receives a user’s UserPost message, it forwards the contents of that message to each of its subscribers inside a Post mes- sage. The chat room also makes sure not to send a message back to the user posting that message, lest an unfriendly ”echo” effect appear. A typical first step in developing an actor-based program is to define the message classes that represent the application’s communication pattern. Scala’s case classes come in handy for defining actor messages. As we’ll shortly see, case classes are especially useful in the context of pattern match- ing, a key technique in actor message processing. Listing 4.1 shows how to define the message classes for our chat application. cccaaassseeeccclllaaassssssUser(name: String) Subscribe(user: User) cccaaassseeeccclllaaassssssUnsssubscribe(user: User) Po t(msg: String) case class UserPost(user: User, post: Post) Listing 4.1 · Case classes for Users and messages 4.2 Processing messages In addition to the messages, a key abstraction in the chat application is the ChatRoom. ChatRoom’ s main responsibilities include keeping a session of actively logged-in users, receiving messages from users, and transmitting a user’s message to other interested users, as shown in Figure 4.2. The subscribers of a chat room are managed as private state of a ChatRoom.A ChatRoom modifies that state upon receiving a Subscribe or Unsubscribe message, illustrating an important concept of actor-based programming: some messages sent to an actor alter the actor’s internal state Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 4.2 Chapter 4 · Actor Chat 43 ChatRoom session private state user: User Unsubscr be user: User Subscr be post: ost PostFrom user: User name: Str ng User ChatClient Figure 4.2 · All communication between the chatroom and users takes place via messages. that, in turn, affects the actor’s subsequent behavior. F o r instance, a new Subscribe message causes a ChatRoom to forward subsequent Post mes- sages to the newly registered user, affecting the application’s message flow. ChatRoom’ s message handling responsibilities are implemented by e x - tending the scala.actors.AAAccctttooorrrtrait. Extending Actor means that a ChatRoom benefits from the trait’s message handling infrastructure, such as the mailbox. All message handling in an actor takes place inside the act method; listing 4.2 shows how to define it. import scala.actors.Actor class ChatRoom extends Actor { def act() { // the actor's behavior } } Listing 4.2 · Defining act Message processing inside act starts when you invoke start on the actor: val chatRoom = new ChatRoom chatRoom.start() Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 4.2 Chapter 4 · Actor Chat 44 A key task in actor message processing is to obtain the next a v a i l a b l e message from the actor’s mailbox. Actor’ s receive method accomplishes that by removing a message from the mailbox and making that message a v a i l a b l e to a series of pattern matching cases that you pass as a parame- ter to receive. The e x a m p l e in Listing 4.3 defines a pattern for each of the three types of messages a ChatRoom is e x p e c t e d to receive. class ChatRoom extends Actor { def act() { while (true){ receive { cccaaassseeeSubscribe(user) => //////hhhaaannndddllleeesubscriptttions UUUnsubscribe(user) => uuunsubscriptions serPost(user, post) => ser pos s } } } } Listing 4.3 · Incoming message patterns Each invocation of receive obtains the next a v a i l a b l e message from the actor’s mailbox, and passes that message to a list of pattern matching cases. P a t t e r n s are e v a l u a t e d on a message starting from the first pattern and moving down in the list of patterns; if a pattern matches, the matching message is removed from the mailbox, and subsequent patterns are not e v a l u a t e d on the message. If no match is found, the message is left in the mailbox. In this e x a m p l e , ChatRoom e x p e c t s either a Subscribe, an Unsubscribe, or a UserPost message. Upon receiving any such message, ChatRoom e v a l u t e s the e x p r e s s i o n on the right side of the pattern’s =>. The Scala actors library also provides a shorthand for defining and start- ing an actor in a single step without e x t e n d i n g the Actor trait. Listing 4.4 shows how to write the above code using the shorthand notation. Handling subscription messages Upon receiving a Subscribe message, a ChatRoom must add the user to its subscribers session. At first, it may seem convenient to keep chat room sub- Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 4.3 Chapter 4 · Actor Chat 45 val chatRoom = actor { while (true){ receive { Subscribe(user) => nsubscribe(user) => cccaaassseeeUUUserPost(user, post) => } } } Listing 4.4 · Creating and starting an actor with actor scribers in a list of Users. Note, however, that subscribers must be able to receive messages from the chat room. In our current design, when a UserPost arrives, ChatRoom iterates through its subscribers session, and sends the message’s content to all subscribing users, e x c e p t to the user send- ing the message. So that a user can accept messages from ChatRoom, we can represent a user as an actor inside the subscriber session. When ChatRoom receives a Subscribe message, it creates a new actor representing the user, and asso- ciates the user with the newly created actor. That actor, in turn, will process Post messages sent to it from the chat room; this is shown in Listing 4.5. 4.3 Sending actor messages At this point, ChatRoom is ready to process subscription messages. Scala’s actors library supports both asynchronous and synchronous messages send- ing. Asynchronous messages sending Y o u send a message asynchronously to an actor with the ! method. In using ! to denote message sending, Scala follows the tradition of Erlang actors: val chatRoom = new ChatRoom chatRoom ! Subscribe(User("Bob")) Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 4.3 Chapter 4 · Actor Chat 46 var session = Map.empty[User, Actor] while (true){ receive { case Subscribe(user) => val sessionUser = actor { while (true){ self.receive { case Post(msg) => // Send message to sender } } } session = session + (user -> sessionUser) //////hhhaaannndddllleeeUUUserPostmeeessage nsubscrib message } } Listing 4.5 · Representing a user as an actor inside a session !!!sends a message to chatRoom and returns immediately: It does not wait for any confirmation or reply from the target actor. In addition to the message, also sends an implicit reference to the sender to the target actor. That reference is always a v a i l a b l e inside the target actor via the sender v a r i a b l e . Listing 4.6 illustrates how the target actor uses the sender reference to process a Post message: Note that there are two actors in the above e x a m p l e : ChatRoom and the actor representing the user inside the chat room, sessionUser. When the chat room actor receives aSubscribe message, it assigns the sender of that message to the subscriber v a r i a b l e . The closure passed to the actor method, in turn, captures that v a r i a b l e and makes it possi- ble for subscriberActor to receive and process Post messages. Once sessionUser is initialized to the actor representing the user, it is saved a w a y in the session map. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 4.3 Chapter 4 · Actor Chat 47 var session = Map.empty[User, Actor] while (true){ receive { case Subscribe(user) => val subscriber = sender val sessionUser = actor { while (true){ self.receive { case Post(msg) => subscriber ! Post(msg) } } } session = session + (user -> sessionUser) //////hhhaaannndddllleeeUUUserPostmeeessage nsubscrib message } } Listing 4.6 · Using the sender reference Synchronous messages Scala also supports synchronous message sending via the !? operator: val chatRoom = new ChatRoom chatRoom !? Subscribe(User("Bob")) Unlike with asynchronous message sending, !? blocks the calling thread un- til the message is sent and a reply received. Listing 4.7 shows how ChatRoom might return an acknowledgement when handling a subscription message with the reply method: The client can capture and process the reply: chatRoom !? Subscribe(User("Bob")) match { case response: String => println(response) } Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 4.3 Chapter 4 · Actor Chat 48 var session = Map.empty[User, Actor] while (true){ receive { case Subscribe(user) => val subscriber = sender val sessionUser = actor { while (true){ self.receive { case Post(msg) => subscriber ! Post(msg) } } } seeession= session + (user -> sessionUser) r ply("Subscribed " + user) } } Listing 4.7 · Using the reply method Futures In some cases, you want the calling thread to return immediately after send- ing a message, b u t you may also need access to the target actor’s reply at a later time. F o r instance, you may want to quickly return from sending a sub- scription message, b u t also record the chatroom’s acknowledgment message in the future. Scala actors provide the concept of futures for such a scenario: Futures messages are sent with the !! method, which returns a Future without blocking the calling thread. A future’s v a l u e may or may not be e v a l u a t e d by the caller; if it is, and if the future v a l u e is not yet a v a i l a b l e , the calling thread blocks: val future = chatRoom !! Subscribe(User("Bob")) // Do useful work println(future()) // Wait for the future Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 4.3 Chapter 4 · Actor Chat 49 Message timeouts In the e x a m p l e s so far, receive blocks the actor’s main thread until a match- ing message is found. In some cases, you may want to wait for suitable messages only for a certain period of time. The receiveWithin method allows you to specify a message timeout, and to be notified if no message was received within that time. Y o u can use the receiveWithin method to automatically unsubscribe a user if the user hasn’t received a post message within a specified amount of time. In the following e x a m p l e , TIMEOUT will match if no Post message is received within 3 minutes; the user is then unsubscribed from the chatroom: val ssseeessionUser= actor { while (true){ lf.receiveWithin (1800 * 1000){ cccaaassseeePost(msg) => subscriber ! Post(msg) TIMEOUT => room ! Unsubscribe(user) self.exit() } } } Listing 4.8 · Using message timeouts with receiveWithin Processing user posts All that remains from our chat room to be fully functional is to implement the processing of a user’s post: Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 4.3 Chapter 4 · Actor Chat 50 var session = Map.empty[User, Actor] def act() { while (true){ receive { case PostFrom(user, msg) => for (key <- session.keys; if key != user) { session(key) ! msg } //////HHHaaannndddllleeeSubscribe message Unsubscribe message } } } Listing 4.9 · Processing post messages Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Chapter 5 Event-Based Programming The constructs we introduced in Chapter 2 tie each actor to a JVM thread: each actor needs its o w n dedicated Java thread. The thread-per-actor ap- proach works well if your program requires relatively few actors. If you anticipate many actors, however, or if the number of actors in your program v a r i e s depending on input, defining one thread per actor incurs sig- nificant o v e r h e a d : Not only does each JVM thread require memory for its e x e c u t i o n stack–which is usually pre-allocated,– each JVM thread may be mapped to an underlying operating system process. Depending on platform, context-switching between those processes involves switching between ker- nel and user modes, an e x p e n s i v e operation. T o allow many actors in a JVM, you can make your actors event-based . Event-based actors are implemented as e v e n t handlers instead of as threads, and are therefore more light-weight than their thread-based cousins. Since event-based actors are not tied to Java threads, event-based actors can e x e c u t e on a pool of a small number of worker threads. T y p i c a l l y , such a pool should contain as many worker threads as there are processor cores in the system. That maximizes parallelism while keeping the o v e r h e a d of pool threads– memory consumption and context-switching–to a minimum. 5.1 Events vs. threads Making an actor event-based is not entirely transparent to the programmer. That is because event-based programming follows a different paradigm from programming with threads. A typical actor spends some time waiting for incoming messages, and a key difference between event-based and thread- Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 5.2 Chapter 5 · Event-Based Programming 52 based actors can be illustrated by an actor’s waiting strategy. A thread-based actor waits by invoking wait on an object for which its thread holds the associated lock. That thread resumes whenever another thread invokes notify (or notifyAll) on the same object.1 An event-based actor, by contrast, registers an event-handler with the actor runtime. After that registration, the actor’s computation usually finishes, and the thread ini- tially running the computation is free to e x e c u t e other tasks, or go to sleep if there is nothing else to do. Later, when an e v e n t of interest is fired–when a message of interest to the actor is received, for instance–the actor runtime schedules the actor’s event-handler for e x e c u t i o n on a thread pool, and the actor’s computation resumes. In that manner, event-based actors are decou- pled from underlying JVM threads. 5.2 Making actors event-based: react Because event-based actors differ from thread-based actors in their waiting strategies, turning a thread-based actor into an event-based one is straightfor- ward. The thread-based actors we have seen so far used receive to wait for a matching message to arrive in their mailbox. T o make an actor event-based, replace all uses of receive with invocations of the react method. As with receive, react e x p e c t s a block of message patterns that are associated with actions to process a matching message. Although replacing receive with react is a simple code change, re- ceive and react are used quite differently in programs. The following e x a m - ples e x p l o r e these differences. Using react to wait f o r messages Listing 5.1 shows the definition of a method that (recursively) b u i l d s a chain of actors and returns the first actor. Each actor in the chain uses react to wait for a 'Die message. When it receives such a message, the actor checks to see if it is last in the chain (in this case, next == null). The last actor in the chain simply responds with 'Ack to the sender of the 'Die message and terminates. 1In practice, waiting is slightly more complicated, because threads may be interrupted during waiting. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 5.2 Chapter 5 · Event-Based Programming 53 def buildChain(size: Int, next: Actor): Actor = { val a = actor { react { case 'Die => val from = sender if (next != null) { neeext! ''Die r act { case Ack => from ! 'Ack } } else from ! 'Ack } } if (size > 0) buildChain(size - 1, a) else a } Listing 5.1 · Building a chain of event-based actors. If the current actor is not the last in the chain, it sends a 'Die message to the next actor, and waits for an '''Ack message. When the 'Ack arrives, it notifies the original sender of the Die and terminates. Note that we store the sender of the original 'Die message in the local v a r i a b l e from, so that we can refer to this actor inside the nested react. Inside the nested react, sender refers to the next actor in the chain, whereas the current actor should send its 'Ack to the previous actor in the chain, which is stored in from. Let’s use the buildChain method by putting it into an object with the main method shown in Listing 5.2. W e store the first command-line argu- ment in the numActors v a r i a b l e to control the size of the actor chain. Just for fun, we take the time to see how long it takes to b u i l d and terminate a single chain of size numActors. After b u i l d i n g the chain using buildChain, we immediately send a 'Die message to the first actor in the chain. What happens is that each actor sends 'Die to the next actor, waiting for an 'Ack message. When the 'Ack is received, the actor propagates it to the previous actor and terminates; the first actor is the last one to receive its '''AAAccckkk. When the receive operation in the main method starts processing , all actors in the chain have terminated. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 5.2 Chapter 5 · Event-Based Programming 54 def main(args: Array[String]){ val numActors = args(0).toInt val start = System.currentTimeMillis buildChain(numActors, null)!'Die receive { case 'Ack > val end ===System.currentTimeMillis println("Took "+(end-start)+" ms") } } Listing 5.2 · The main method. How many actors are too many? Actors that use react for receiving messages are v e r y lightweight compared to normal JVM threads. Let’s find out just how lightweight they actually are by creating chains of actors of e v e r increasing size until the JVM runs out of memory. Moreover, we can compare that chain with thread-based actors by replacing the two reacts with receives. But first, how many event-based actors can we create? And, how much time does it take to create them? On a test system, a chain of one thousand actors is b u i l t and terminated in about 115 ms, while creating and destroying a chain of ten thousand actors takes about 540 ms. Building a chain with fi v e hundred thousand actors takes 6,232 ms, b u t one with a million actors takes a little longer: about 26 seconds without increasing the default heap size of the JVM (Java HotSpot(TM) Server VM 1.6.0). Let’s try this now with thread-based actors. Since we are going to create lots of threads, we should configure the actor run-time to a v o i d unreasonable o v e r h e a d s . Configuring the actor run-time’s thread pool Since we intend to use lots of threads with thread-bound actors, it is more efficient to create those threads in advance. Moreover, we can adjust the size of the actor runtime’s internal thread pool to optimize actor e x e c u t i o n . Scala’s actor runtime allows its thread pool to resize itself according to the number of actors blocked in receive–each of those actors needs its o w n Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 5.2 Chapter 5 · Event-Based Programming 55 thread–but that resizing may take a long time, since the thread pool is not optimized to handle massive resizing efficiently. The internal thread pool is configured using two JVM properties, actors.corePoolSize and actors.maxPoolSize. The first property is used to set the number of pool threads that are started when the thread pool is initialized. The latter property specifies an upper bound on the total num- ber of threads the thread pool will e v e r use (the default is 256). T o minimize the time it takes to resize the thread pool, we set both properties close to the actual number of threads that our application needs. F o r e x a m p l e , when running our chain e x a m p l e with one thousand thread-based actors, setting actors.corePoolSize to one thousand and actors.maxPoolSize to, say one thousand ten keeps the pool resizing o v e r- head low. W i t h these settings in place, it takes about 12 seconds to create and de- stroy a chain of one thousand thread-based actors. A chain of two thousand threaded actors takes already o v e r 97 seconds. W i t h a chain of three thou- sand actors, the test JVM crashes with an java.lang.OutOfMemoryError. As this simple test demonstrates, event-based actors are much more lightweight than thread-based actors. The following sections e x p l o r e how to program with event-based actors effectively. Using react effectively As we mentioned above, with react an actor waits for a message in an event-based manner: Under the hood, instead of blocking the underlying worker thread, react’ s block of pattern-action pairs is registered as an e v e n t - handler. That e v e n t handler is then invoked by the actor runtime when a matching message arrives. The event-handler is all that is retained before the actor goes to sleep. In particular, the call stack, as it is maintained by the current thread, is dis- carded when the actor suspends. That allows the runtime system to release the underlying worker thread so that thread can be reused to e x e c u t e other actors. By running a large number of event-based actors on a small number of threads, the context-switching o v e r h e a d and the resource consumption of thread-bound actors is reduced dramatically. That the current thread’s call-stack is discarded when an event-based ac- tor suspends bears an important consequence on the event-based actor pro- gramming model: a call to react never return normally. react, like any Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 5.2 Chapter 5 · Event-Based Programming 56 def waitFor(n: Int): Unit = if (n > 0){ react { case 'Die => val from = sender if (next != null){ n xt ! Die reeeact{ case ''Ack => from ! 'Ack; waitFor(n - 1) } } else { from ! 'Ack; waitFor(n - 1)} } } Listing 5.3 · Sequencing react calls using a recursive method. Scala or Java method, could return normally only if its full call-stack was a v a i l a b l e when it e x e c u t e d . But that isn’t the case with event-based actors. In fact, a call to react does not return at all. That react never returns means that no code can follow a react method invocation: Since react doesn’t return, code following react would never e x e c u t e . Thus, invoking react must always be the last thing an event-based actor does before it terminates. Since an actor’s main job is to handle interesting messages, and since react defines an event-based actor’s message handling mechanism, you might think that react will always be the last, and e v e n only, thing an actor needs to do. However, it is sometimes convenient to perform several react invocations in succession. In those situations, you could nest react invoca- tions in sequence, as we saw in Listing 5.1. Alternatively, you could define a recursive method that runs several reacts in sequence. F o r instance, we can e x t e n d our simple chain e x a m - ple so that an actor waits for a specified number of 'Die messages before it terminates. W e can do this by replacing the body of the chain actors with a call to the waitFor method as it is shown in Listing 5.3. waitFor method tests up-front whether the current actor should terminate (if n == 0) or con- tinue waiting for messages. The protocol logic is the same as before. The only difference is that after each of the message sends to from, we added a recursive call to waitFor. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 5.2 Chapter 5 · Event-Based Programming 57 Recursive methods with react Looking at the e x a m p l e in Listing 5.3, you might be concerned that call- ing a recursive method in this way could quickly lead to a stack o v e r fl o w. The good news, however, is that react plays e x t r e m e l y well with recursive methods: Whenever an invocation of react resumes due to a matching mes- sage in the actor’s mailbox, a task item is created and submitted to the actor runtime’s internal thread pool for e x e c u t i o n . The thread that e x e c u t e s that task item doesn’t have much else on its call stack, apart from the basic logic of being a pool worker thread. As a result, e v e r y invocation of react e x e c u t e s on a call stack that is as good as empty. The call stack of a recursive method like waitFor in Listing 5.3, therefore, doesn’t grow at all thanks to the invocations of react. Composing react-based code with combinators Sometimes it is difficult or impossible to use recursive methods for sequenc- ing multiple reacts. That is the case when reusing classes and methods that make use of react: By their v e r y nature, reusable components cannot be changed after they have been b u i l t . In particular, we cannot simply per- form invasive changes such as when we added iteration through a recursive method in the e x a m p l e in Listing 5.3. This section illustrates several ways in which react-based code can be reused. def sleep(delay: Long){ rrreeegister(timer,delay, self) act { case 'Awake => /* OK, continue */ } } Listing 5.4 ·A sleep method that uses react. F o r e x a m p l e , suppose our project contains the sleep method shown in Listing 5.4. It registers the current actor, self, with a timer service (not shown) to be woken up after the specified delay that is provided as a pa- rameter. The timer notifies the registered actor using an 'Awake message. F o r efficiency, sleep uses react to wait for the 'Awake so that the sleeping actor does not require the resources of a JVM thread while it is sleeping. Using the above sleep method invariably requires e x e c u t i n g something after its react invocation. However, since we want to reuse the method as Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 5.2 Chapter 5 · Event-Based Programming 58 actor { val period = 1000 {// code before going to sleep sleep(period) andThen { // code after waking up }}} } Listing 5.5 · Using andThen to continue after react. is, we cannot simply go ahead and insert something in the body of its react. Instead, we need a way to combine the sleep method with the code that should run after the 'Awake message has been received without c h a n g i n g the implementation of sleep. That is where the control-flow combinators of the Actor object come into play. These combinators allow e x p r e s s i n g common communication pat- terns in a relatively simple and concise way. The most basic combinator is andThen. The andThen combinator combines two code blocks to run after each other e v e n if the first one invokes react. Listing 5.5 shows how you can use andThen to e x e c u t e code that runs after invoking the sleep method. andThen is used as an operator that is written infix between two blocks of code. The first block of code invokes sleep as its last action, which, in turn, invokes react. Note that the period parameter of sleep is declared outside the code block that andThen operates on. This is possible, because the two code blocks are actually closures that may capture v a r i a b l e s in their environment. The second block of code is run when the react of the first code block–the one inside sleep–is finished. However, note that the second code block is really the last thing e x e c u t e d by the actor. The use of andThen does not change the fact that invocations of react do not return. andThen merely allows one to combine two pieces of code in sequence. Another useful combinator is loopWhile. As its name suggests, it loops running a provided closure while some condition holds. Thanks to Scala’s flexible syntax, loopWhile feels almost like a native language primitive. Listing 5.6 shows a v a r i a t i o n of our actor chain e x a m p l e that uses loopWhile to wait for multiple 'Die messages. Again, we make use of the fact that the Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 5.3 Chapter 5 · Event-Based Programming 59 def buildChain(size: Int, next: Actor, waitNum: Int) : Actor = { val a = actor { var n = waitNum loopWhile (n > 0){ n -= 1 react { case 'Die => val from = sender if (next != null){ neeext!'Die r act { case 'Ack => from !'Ack } } else from !'Ack } } } if (size > 0) buildChain(size - 1, a, waitNum) else a } Listing 5.6 · Using loopWhile for iterations with react. two code block parameters of loopWhillle, the condition (n > 0) and the body, are closures, since both code blocks access the local v a r i a b l e n. Note that the top-level react in the body of oopWhile is unchanged from the v e r y first e x a m p l e that did not support iteration. The body might as well be e x t r a c t e d to a method—loopWhile works in either case. 5.3 Event-based futures In Chapter 4, we illustrated how to use futures for result-bearing messages. Some of the methods to wait for the result of a future rely on the thread- based receive under the hood. While waiting for the result, those methods monopolize the underlying worker thread. It is also possible to wait for a future in an event-based way with react. F o r e x a m p l e , suppose we would like to render a summary of all images linked from a web page at a given URL. It is possible to render each image Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 5.3 Chapter 5 · Event-Based Programming 60 def renderImages(url: String){ val imageInfos = scanForImageInfo(url) val dataFutures = for (info <- imageInfos) yield { val loader = actor { react { case Download(info) => reply(info.downloadImage()) } } loader !! Download(info) } for (i <- 0 until imageInfos.size) { dataFutures(i)() match { case dddata@ ImageData(_) => ren erImage(data) } } println("OK, all images rendered.") } Listing 5.7 · Image renderer using futures. individually once the image finished downloading. T o increase the through- put of the application, each image is downloaded by its o w n actor. Since each downloading actor performs a result-bearing task, it is convenient to use futures to keep track of the e x p e c t e d results. Listing 5.7 shows the code for rendering images in this way. First, the URL provided as a parameter is scanned for image information. F o r each image, we start a new actor that downloads the image, and replies with image data. W e obtain a future using the ! message send v a r i a n t . Once all the futures have been collected in dataFutures, the current actor waits for each of the futures in turn by invoking the future’s apply method (with empty parameter list). Example: image r e n d e r e r with react and futures The implementation that we just described blocks the underlying thread while waiting for a future. However, it is also possible to wait for a future Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 5.3 Chapter 5 · Event-Based Programming 61 def renderImages(url: String){ val imageInfos = scanForImageInfo(url) val dataFutures = for (info <- imageInfos) yield { val loader = actor { react { case Download(info) => reply(info.downloadImage()) } } loader !! Download(info) } var i = 0 loopWhile (i < imageInfos.sizeee){ i += 1 dataFutures(i-1).inputChann l.rrreeeact{ case data @ ImageData(_) => nderImage(data) } } andThen { println("OK, all images rendered.")} } Listing 5.8 · Using react to wait for futures. in a non-blocking, event-based way using react. The key for this to work is the InputChannel associated with each Future instance. This channel is used to transmit the result of the future to the actor that created the future. Invoking a future’s apply method waits to receive the result on that channel, using the thread-based receive. However, we can also wait for the results in an event-based way using react on the future’s InputChannel. Listing 5.8 shows an implementation that does just that. Since it is nec- essary to invoke react several times in sequence, you have to use one of the control-flow combinators of Section 5.2. In this e x a m p l e we use loopWhile to emulate the indexing scheme of the previous v e r s i o n in Listing 5.7. The main difference is that in this implementation the index v a r i a b l e i is declared and incremented explicitly, and that the generator in the for-comprehension has been replaced with a termination condition. Y o u can also b u i l d custom control-flow combinators that allow you to use react inside for-comprehensions. In the following section we e x p l a i n how this can be done. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 5.3 Chapter 5 · Event-Based Programming 62 def renderImages(url: String){ val imageInfos = scanForImageInfo(url) val dataFutures = for (info <- imageInfos) yield { val loader = actr { react { case Download(info) => reply(info.dooownloadImage()) } } loader !! Dooownload(info) } (for (ft <- F rEach(datttaFutures)) ft.innnputChaaannel.reac{ case dat @ ImageData(_) => renderImage(data) }) a dThen { println("OK, all images rendered.") } } Listing 5.9: A custom ForEach operator enables react in for- comprehensions. caseeeclllassForEach[T](iter: Iterable[T]){ df foreach(fun: T => Unit): Unit = { va it = iter...elemeeenttts loopWhiiile(ittthasN x){ fun( t.nex ) } } } Listing 5.10 · Implementing the custom ForEach operator. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 5.3 Chapter 5 · Event-Based Programming 63 Building custom control-flow operators Sometimes the e x i s t i n g control-flow combinators provided by the Actor ob- ject are not well-suited for the task at hand. In such cases b u i l d i n g cus- tom control-flow operators can help. In this section you will learn how the control-flow combinators provided by the Actor object can be used to b u i l d custom operators that enable the use of react–and methods using react– inside for-comprehensions. Listing 5.9 shows the usage of a custom ForEach operator that allows iterating o v e r a list, while invoking react for each element in the list. In this case, we want to iterate o v e r the futures in dataFutures. W e use ForEach to convert the plain dataFutures list into an object that acts as a generator in for-comprehensions. It generates the same v a l u e s as the dataFutures list, namely all of the list’s elements. However, it does so in a way that allows continuing the iteration e v e n after an invocation of react in the body of the for-comprehension. Listing 5.10 shows the implementation of ForEach. Making ForEach a case class allows omitting new when creating new instances. The constructor takes a parameter of type Iterable[T]—the collection that generates the elements for our iteration. The ForEach class has a single method foreach that takes a param- eter of function type T => Unit. Implementing the foreach method en- ables instances of the ForEach class to be used as generators in simple for- comprehensions like the one in Listing 5.9. The v a r i a b l e that is bound to the generated elements in the for-comprehension corresponds to the parameter of the function fun. The body of the for-comprehension corresponds to the body of fun. Inside foreach we first obtain an iterator it from the Iterable. Then, we iterate o v e r the collection using it and the loopWhile combinator of Section 5.2. In each iteration we apply the parameter function fun to the current element of the collection. Since we are using loopWhile, it is safe to invoke react inside fun. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Chapter 6 Exception Handling, Actor Termination and Shutdown In this chapter we will take a look at how to handle errors in concurrent, actor-based programs. Actors provide several additional ways to handle e x - ceptions compared to sequential Scala code. In particular, we will show how an actor can handle e x c e p t i o n s that are thrown b u t not handled by other ac- tors. More generally, we will look at ways in which an actor can monitor other actors to detect whether they terminated normally or abnormally (for instance, through an unhandled exception). Finally, we introduce a num- ber of concepts and techniques that can simplify termination management of actor-based programs. 6.1 Simple e x c e p t i o n handling An actor terminates automatically when an e x c e p t i o n is thrown that is not handled inside the actor’s body. One possible symptom of such a situation is that other actors wait indefinitely for messages from the dead actor. Since, by default, terminating actors do not generate any feedback, it can be quite time-consuming to find out what happened and why. The simplest way of guarding against actors that silently terminate because of unhandled e x c e p t i o n s is to provide a global e x c e p t i o n han- dler that is invoked whenever an e x c e p t i o n propagates out of the actor’s body. This is done by subclassing Actor (or Reactor) and o v e r r i d i n g its exceptionHandler member. It is defined in Reactor as follows (omitting the modifiers): Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 6.1 Chapter 6 · Exception Handling, Actor Termination and Shutdown 65 object A extends Actor { def act() { react { case 'hello => throw new Exception("Error!") } } override def exceptionHandler = { case e: Exception => println(e.getMessage()) } } Listing 6.1 · Defining an actor-global e x c e p t i o n handler. def exceptionHandler: PartialFunction[Exception, Unit] As you can see, it is a parameterless method that returns a partial func- tion that can be applied to instances of java.lang.Exception. When- e v e r an e x c e p t i o n is thrown inside the body of an actor that would nor- mally cause the actor to terminate, the run-time system checks whether the actor’s exceptionHandler matches the given e x c e p t i o n . If so, the exceptionHandler partial function is applied to the e x c e p t i o n . After that the actor terminates normally. Listing 6.1 shows how to o v e r r i d e the exceptionHandler method so that it returns a custom partial function. Let’s interact with the A actor using Scala’s interpreter shell: scala> A.start() res0: scala.actors.Actor = A$@1ea414e scala> A!'hello Error! As e x p e c t e d , A’ s o v e r r i d d e n exceptionHandler method runs, thereby print- ing the message string attached to the thrown e x c e p t i o n , which is just "Error!". This form of e x c e p t i o n handling using exceptionHandler works well together with control-flow combinators, such as loop. The combinators can Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 6.1 Chapter 6 · Exception Handling, Actor Termination and Shutdown 66 be used to resume the normal e x e c u t i o n of an actor after handling an e x c e p - tion. F o r e x a m p l e , let’s modify the act method of the A actor in Listing 6.1 as follows: def act() { var lastMsg: Option[Symbol] = None loopWhile (lastMsg.isEmpty || lastMsg.get != 'stop){ react { cccaaassseee'hello => throw new Exception("Error!"))) any: Symbol => println("your message: "+any lastMsg = Some(any) } } } The invocation of react is now wrapped inside a loopWhile that tests whether the last received message is equal to 'stop, in which case the ac- tor terminates. Now, if the actor receives a 'hello message, it throws the e x c e p t i o n , which is handled as before. However, instead of terminating, the actor simply resumes its e x e c u t i o n by continuing with the next loop itera- tion. This means that the actor is ready to receive more messages after the e x c e p t i o n has been handled. Let’s try this out in the interpreter: scala> A.start() res0: scala.actors.Actor = A$@1cb048e scala> A!'hello Error! scala> A.getState res2: scala.actors.Actor.State.Value = Suspended scala> A!'hi your message: 'hi scala> A!'stop your message: 'stop scala> A.getState Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 6.2 Chapter 6 · Exception Handling, Actor Termination and Shutdown 67 res5: scala.actors.Actor.State.Value = Terminated Note that after sending 'hello the actor e v e n t u a l l y suspends waiting for the next message. The getState method can be used to query an actor’s e x e c u t i o n state. It returns v a l u e s of the Actor.State enumeration, which is defined in the Actor object. The Suspended state indicates that the actor has invoked react and is now waiting for a matching message. Therefore, we can continue to interact with the actor by sending it a 'hi message. After the actor receives a 'stop message its loopWhile loop finishes and the actor terminates normally. The final state v a l u e is Terminated. 6.2 Monitoring actors There are a number of scenarios where it is necessary to monitor the life c y c l e of (a group of) actors. In particular, error handling and fault tolerance in a concurrent system can be significantly simplified through monitoring. Here are some e x a m p l e s : Scenario A. W e want to be notified when an actor terminates normally or abnormally. F o r instance, we might want to replace an actor that ter- minated because of an unhandled e x c e p t i o n . Or we might want to rethrow the e x c e p t i o n in a different actor that can handle it. Scenario B. W e want to e x p r e s s that an actor depends on some other actor in the sense that the former cannot function without the latter. F o r in- stance, in a typical master-slave architecture the work done by a slave is useless if the master has crashed. In this case it would be desirable if all slaves would terminate automatically whenever the master crashes to a v o i d needless consumption of resources, such as memory. • Scenario A. W e want to be notified when an actor terminates normally or abnormally. F o r instance, we might want to replace an actor that terminated because of an unhandled e x c e p t i o n . Or we might want to rethrow the e x c e p t i o n in a different actor that can handle it. • Scenario B. W e want to e x p r e s s that an actor depends on some other actor in the sense that the former cannot function without the latter. F o r instance, in a typical master-slave architecture the work done by a Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 6.2 Chapter 6 · Exception Handling, Actor Termination and Shutdown 68 slave is useless if the master has crashed. In this case it would be desir- able if all slaves would terminate automatically whenever the master crashes to a v o i d needless consumption of resources, such as memory. Both of the above scenarios require monitoring the life c y c l e of an actor. In particular, they require us to be notified when an actor terminates, normally or abnormally. The actors package provides special support for managing such notifications. However, before diving into those monitoring constructs it is helpful to take a look at the ways in which actors can terminate. Actor termination There are three reasons why an actor terminates: 1. The actor finishes e x e c u t i n g the body of its act method; 2. The actor invokes exit; 3. An e x c e p t i o n propagates out of the actor’s body. The first reason is really a special case of the second one: after e x e c u t i n g an actor’s body, the run-time system invokes exit implicitly on the termi- nating actor. The exit method can be invoked with or without passing an argument. The Actor trait contains the following definitions (omitting the modifiers): dddeeefffeeexxxiiittt((():Nothing reason: AnyRef): Nothing Both methods have result type Nothing, which means that invocations do not return normally, because an e x c e p t i o n is thrown in all cases. In this case, the particular instance of Throwable should never be caught inside the ac- tor, since it is used for internal life-cycle control. Invoking exit (with or without argument) terminates the e x e c u t i o n of the current actor. The reason parameter is supposed to indicate the reason for terminating the actor. Invok- ing exit without an argument is equivalent to passing the Symbol 'normal to exit; it indicates that the actor terminated normally. Examples for argu- ments that indicate abnormal termination are: • Exceptions that the actor cannot handle; Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 6.2 Chapter 6 · Exception Handling, Actor Termination and Shutdown 69 • Message objects that the actor cannot process; • Invalid user input. Exceptions that propagate out of an actor’s body lead to abnormal termina- tion of that actor. In the following section you will learn how actors can react to the termination of other actors. W e will show the difference between nor- mal and abnormal termination as seen from an outside actor. Importantly, we will see how to obtain the e x i t reason of another actor that terminated abnormally. Linking actors An actor that wants to receive notifications when another actor terminates must link itself to the other actor. Actors that are linked together implicitly monitor each other. F o r e x a m p l e , Listing 6.2 shows a slave actor, which is supposed to do work on behalf of a master actor. The work done by the slave is useless without the master, since the master manages all results produced by the slave–the slave depends on its master. This means that whenever the master crashes its dependent slave should terminate, since otherwise it would only needlessly consume resources. This is where links come into play. Using the link method the slave actor links itself to the master actor to e x p r e s s the fact that it depends on it. As a result, the slave is notified whenever its master terminates. By default, termination notifications are not delivered as messages to the mailbox of the notified actor. Instead, they have the following effect: • If the e x i t reason of the terminating actor is 'normal, no action is taken; • If the e x i t reason of the terminating actor is different from 'normal, the notified actor automatically terminates with the same e x i t reason. In our master-slave e x a m p l e this means that the termination of the master actor caused by the unhandled e x c e p t i o n results in the termination of the slave actor; the e x i t reason of the slave actor is the same as for the master actor, namely an instance of UncaughtException. The purpose of class UncaughtException is to provide information about the context in which Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 6.2 Chapter 6 · Exception Handling, Actor Termination and Shutdown 70 object Master extends Actor { def act() { Slave !'doWork react { case 'done => throw new Exception("Master crashed") } } } objeeectSlave extends Actor { df act() { lllink(Master) oop { react { case 'doWork => println("Done") reply('done) } } } } Listing 6.2 · Linking dependent actors. the e x c e p t i o n was thrown, such as the actor, the last message processed by that actor, and the sender of that message. The next section shows how to use that information effectively. Let’s use the interpreter shell to interact with the two actors. scala> Slave.start() res0: scala.actors.Actor = Slave$@190c99 scala> Slave.getState res1: scala.actors.Actor.State.Value = Suspended scala> Master.start() Done res2: scala.actors.Actor = Master$@395aaf Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 6.2 Chapter 6 · Exception Handling, Actor Termination and Shutdown 71 scala> Master.getState res3: scala.actors.Actor.State.Value = Terminated scala> Slave.getState res4: scala.actors.Actor.State.Value = Terminated Right after starting the Slave its state is Suspended. When the Master starts it sends a 'doWork request to its Slave, which prints Done to the console and replies to the Master with 'done. Once the Master receives 'done, it throws an unhandled e x c e p t i o n causing it to terminate abnormally. Because of the link between Slave and Master, this causes the Slave to terminate automatically. Therefore, at the end both actors are in state Terminated. T r a p p i n g termination notifications. In some cases, it is useful to receive termination notifications as messages in the mailbox of a monitoring actor. F o r e x a m p l e , a monitoring actor may want to rethrow an e x c e p t i o n that is not handled by some linked actor. Or, a monitoring actor may want to react to normal termination, which is not possible by default. Actors can be configured to receive all termination notifications as nor- mal messages in their mailbox using the Boolean trapExit flag. In the following e x a m p l e actor b links itself to actor a: vvvaaallla ===aaaccctttooorrr{{{... } b self.trapExit = true link(a) ... } Note that before actor b invokes link it sets its trapExit member to true; this means that whenever a linked actor terminates (normally or abnormally) it receives a message of type Exit (see below). Therefore, actor b is going to be notified whenever actor a terminates (assuming that actor a did not terminate before b’ s invocation of link). Listing 6.3 makes this more concrete by having actor a throw an e x c e p t i o n . The e x c e p t i o n causes a to terminate, resulting in an Exit message to actor b. Running it produces the following output: Actor 'a' terminated because of UncaughtException(...) Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 6.2 Chapter 6 · Exception Handling, Actor Termination and Shutdown 72 val a = actor { react { case 'start => val somethingBadHappened = true if (somethingBadHappened) throw new Exception("Error! println("Nothing bad happened"""))) } } val b = actor { self.trapExit = true link(a) a !'start react { case Exit(((from,reason) if from == a => println "Actor 'a' terminated because of " + reason) } } Listing 6.3 · Receiving a notification because of an unhandled e x c e p t i o n . Exit is a case class with the following parameters: case class Exit(from: AbstractActor, reason: AnyRef) The first parameter tells us which actor has terminated. In Listing 6.3 actor b uses a guard in the message pattern to only react to Exit messages indicating that actor a has terminated. The second parameter of the Exit case class indicates the reason why actor from has terminated. The termination of a linked actor caused by some unhandled e x c e p - tion results in an Exit message where reason is equal to an instance of UncaughtException; it is a case class with the following fields: actor: Actor: the actor that threw the uncaught e x c e p t i o n ; message: Option[Any]: the (optional) message the actor was processing; None if the actor did not receive a message; sender: Option[OutputChannel[Any]]: the (optional) sender of the most recently processed message; Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 6.2 Chapter 6 · Exception Handling, Actor Termination and Shutdown 73 cause: Throwable: the uncaught e x c e p t i o n that caused the actor to termi- nate. • actor: Actor: the actor that threw the uncaught e x c e p t i o n ; • message: Option[Any]: the (optional) message the actor was pro- cessing; None if the actor did not receive a message; • sender: Option[OutputChannel[Any]]: the (optional) sender of the most recently processed message; • cause: Throwable: the uncaught e x c e p t i o n that caused the actor to terminate. Since UncaughtException is a case class, it can be matched against when receiving an Exit message. F o r instance, in Listing 6.3 we can e x t r a c t the e x c e p t i o n that caused actor a to terminate directly from the Exit message: react { case Exit(((from,UncaughtException(_, _, _, _, cause)))) if from == a => println "Actor 'a' terminated because of " + cause } Running Listing 6.3 with the above change results in the following output: Actor 'a' terminated because of java.lang.Exception: Error! When the trapExit member of an actor is true, the actor is also notified when a linked actor terminates normally, for instance, when it finishes the e x e c u t i o n of its body. In this case, the Exit message’s reason field is the Symbol 'normal.1 Y o u can try this out yourself by changing the local v a r i - able somethingBadHappened to false. The output of running the code should then look like this: Nottthingbad happeneeeddd Ac or 'a' terminat because of 'normal 1In Scala, makes equality checks fast. Also, the syntax for creating Symbols are similar to strings, ex c e p t that they are always interned, which Symbols is slightly more light- weight compared to strings. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 6.2 Chapter 6 · Exception Handling, Actor Termination and Shutdown 74 Restarting crashed actors In some cases it is useful to restart an actor that has terminated because of an unhandled e x c e p t i o n . By resetting (parts of) the state of a crashed actor, chances are that the actor can successfully process outstanding messages in its mailbox. Alternatively, upon restart the outstanding messages could be retrieved from the crashed actor’s mailbox and forwarded to a healthy actor. // assumes `self` linked to `patient` and `self.trapExit == true` def keepAlive(((patient:Actor): Nothing = { react { case Exit from, reason) if from == patient => if (reason != 'normal){ link(patieeenttt) patient.r s art() keepAlive(patient) } } } Listing 6.4 · Monitoring and restarting an actor using link and restart. Listing 6.4 shows how to create a keep-alive actor that monitors another actor, restarting it whenever it crashes. The idea is that the keep-alive ac- tor first links itself to the monitored actor (the patient), and then invokes keeeepAlive. The keepAlive method works as follows. When receiving an Exit message indicating the abnormal termination of patient (in this case, r ason != 'normal), we re-link self to patient and restart it. Finally, keepAlive invokes itself recursively to continue monitoring the patient. Y o u may wonder why we link self to the patient actor before restart- ing it. After all, keepAlive assumes that this link already e x i s t s . The reason is that self automatically unlinks itself when receiving an Exit message from patient. This is done to a v o i d leaking memory through links that are never removed. Since in most cases terminated actors are not restarted, this behavior is a good default. Listing 6.5 shows how to use our keepAlive method to automatically restart an actor whenever it crashes. Actor crasher is the actor that we want to monitor and restart. It maintains a counter such that whenever the counter is e v e n , handling a 'request message results in an e x c e p t i o n being thrown. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 6.2 Chapter 6 · Exception Handling, Actor Termination and Shutdown 75 val crasher = actor { println("I'm (re-)born") var cnt = 0 loop { cnt += 1 react { case 'request => println("I try tttoservicccea request") if (cnnnttt% 2 == 0){ pri ln("some imes I rash...") throw new Exception } case 'stop => exit() } } } val client = actor { react { case 'start => for (_ <- 1 to 6){ crasher !'request } crasher !'stop } } actor { self.tttrrraaapExit= true link(c sherrr) clien !'start keepAlive(c asher) } Listing 6.5 · Using keepAlive to automatically restart a crashed actor. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 6.2 Chapter 6 · Exception Handling, Actor Termination and Shutdown 76 Since the e x c e p t i o n is not handled, it causes the actor to crash. W e can also tell the crasher to stop, thereby terminating it normally. The client actor waits for a 'start message, and then sends a number of requests to crasher, some of which cause crashes. The last actor, the keep-alive actor, links itself to the crasher with t pExit set to true. It is important that the keep-alive actor links itself to the crasher before the client starts. Otherwise, the client could cause the crrraaasherto terminate without sending an Exit message to the keep-alive actor; since the Exit message would never be received, the crasher actor would not be restarted. Running the code in Listing 6.5 produces the follow- ing output: III'm (re-)borrrn tttrrryyytttooossseeevvviiiccceeeaaarrreeeqqquuueeesssttt sometimes I crash... III'm (re-)borrrn tttrrryyytttooossseeevvviiiccceeeaaarrreeeqqquuueeesssttt sometimes I crash... III'm (re-)borrrn tttrrryyytttooossseeevvviiiccceeeaaarrreeeqqquuueeesssttt sometimes I crash... I'm (re-)born As you can see, the crasher actor processes six 'request messages. Every second message results in a crash, causing the keep-alive actor to restart it. Restarting the crasher re-runs its body, producing a rebirth message. Exception handling using futures One advantage of futures o v e r simple asynchronous messages is that they make it easy to identify to which request they correspond. Basically, each future is a representation of the asynchronous request that created the future. W e can leverage this property of futures for e x c e p t i o n handling. Let’s revisit the image downloader e x a m p l e of chapter 5. In the following we will show how you can e x t e n d Listing 5.8 to handle e x c e p t i o n s that may be thrown during the retrieval of the images (for instance, IOExceptions). Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 6.2 Chapter 6 · Exception Handling, Actor Termination and Shutdown 77 def renderImage (url: String){ vvvaaalllimageInfosss= scanForImageInfo(url) self.traaapExit= true dat Futures = for (info <- imageInfos) yield { val loader = link { react { case Download(((info)=> throw new Exception "no connection") reply(info.downloadImage()) }: Unit } loader !! Download(info) } var i = 0 loopWhile (i < imaaageInfosss.size){ i += 1 val Input = dat Futtture(i-1).inputChannel react { cccaaassseeeInput !(((dddaaaaaa@ ImageData(_)) => renderImage t ) Exit(((from,UncaughtException(___,,,Some(Download(info)), _, cause)) => println """Couldn't download image "+info+ because of "+cause) } } } Listing 6.6 · Reacting to Exit messages for e x c e p t i o n handling. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 6.2 Chapter 6 · Exception Handling, Actor Termination and Shutdown 78 Listing 6.6 shows the renderImages method e x t e n d e d with code to han- dle uncaught e x c e p t i o n s in the downloader actors. The idea is as follows. First, the actor that renders the images sets its trapExit member to true, which enables it to receive termination notifications from linked actors. Sec- ond, the renderer actor links itself to each downloader actor. F o r this, we use one of the link methods defined in the Actor object. The v a r i a n t we use takes a code block (more precisely, a by-name parameter of type => Unit) as an argument, creates a new actor to e x e c u t e that block, and links the caller to the newly created actor. Importantly, linking and starting the new actor is done in a single, atomic operation to a v o i d a subtle race condition: be- tween starting the new actor and linking to it, the newly created actor could already have died, which would result in a lost Exit message. This is the main reason why the Actor object provides a link method that takes a code block as an argument. Note that you have to add an e x p l i c i t type annota- tion to the react expression. The reason is that the return type of react is Nothing which is compatible with both link methods, since Nothing is a subtype of e v e r y other type. By adding the : Unit type ascription, we force the compiler to select the link method that takes a code block. After the renderer actor has sent out all download requests, it loops trying to receive ImageData objects from each future’s input channel. T o handle uncaught e x c e p t i o n s in the downloader actors, the renderer also reacts to Exiiitmessages: whenever an Exit message that has an UncaughtExcept on as its reason is received, we e x t r a c t the message that the terminated actor was processing using a nested pattern match. This en- ables us to easily get access to the corresponding ImageInfo, since it was passed as part of the Download message. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Chapter 7 Customizing Actor Execution Actors are e x e c u t e d using a run-time system which is backed by an efficient task e x e c u t i o n framework. Y o u have seen that such a run-time system en- ables concurrent programs to scale to a large number of fairly lightweight actors. In this chapter you will learn how to customize the run-time system, to improve the integration with threads and thread-local data, to simplify testing, and more (Section 7.1). In chapter 5 you have seen that event-based actors require only a small number of worker threads for their e x e c u t i o n . However, when actors use blocking operations, the number of worker threads often must be increased to a v o i d locking up the thread pool. Managed blocking provides a way to automatically adjust the thread pool size depending on the blocking behavior of operations. In this chapter you will learn how to use managed blocking to enable a safe use of e x i s t i n g blocking concurrency classes (Section 7.2). 7.1 Pluggable schedulers In some cases the way in which actors are e x e c u t e d must be customized. There are a number of e x a m p l e s where this is the case: • Maintaining thread-bound properties such as ThreadLocals; • Interfacing with e x i s t i n g e v e n t dispatch threads; • Daemon-style actors; • Deterministic e x e c u t i o n of message sends/receives for reproducible testing; Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 7.1 Chapter 7 · Customizing Actor Execution 80 • Fine-grained control o v e r resources consumed by the underlying thread pool. In all these cases it is necessary to customize the way in which actors are e x e c u t e d . The part of the runtime system that is responsible for e x e c u t i n g an actor’s behavior is called scheduler. Each actor is associated with a scheduler object that e x e c u t e s the actor’s actions, that is, its body as well as its reactions to received messages. By default, a global scheduler e x e c u t e s all actors on a single thread pool. However, in principle each actor may be e x e c u t e d by its o w n scheduler. T o customize an actor’s scheduler, it suffices to o v e r r i d e the scheduler method inherited from the Reactor trait. The method returns an instance of IScheduler, which is used to e x e c u t e the actor’s actions. By returning a custom IScheduler instance the default e x e c u t i o n mechanism can be o v e r- ridden. In the following we are going to show how this is done in each of the above cases. Maintaining thread-bound properties When an application is run on the JVM, certain properties are maintained on a by-thread basis. Examples for such properties are the context class loader, the access control context, and programmer-defined ThreadLocals. In applications that use actors instead of threads, these properties are still useful or maybe e v e n necessary to interoperate with JVM-based libraries and frameworks. Using ThreadLocals or other thread-bound properties is unchanged compared to threads when using thread-based actors. F o r event-based actors, the situation is slightly more complicated, since the underlying thread that is e x e c u t i n g a single event-based actor may change o v e r time. Remember that each time an actor suspends in a react, the underlying thread is released. When this actor is resumed it may be e x e c u t e d by a different (pool) thread. Thus, without some additional logic, ThreadLocals could change unexpect- edly during the e x e c u t i o n of an event-based actor, which would be v e r y con- fusing. In this section we show you how to correctly maintain thread-bound properties, such as ThreadLocals, o v e r the lifetime of event-based actors. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 7.1 Chapter 7 · Customizing Actor Execution 81 Example: thread-local v a r i a b l e s Listing 7.1 shows an attempt to use a ThreadLocal to keep track of a name string that is associated with the current actor. The name is stored in a ThreadLocal[String] called tname. In Java, ThreadLocals are typically declared as static class members, since they hold data that is not specific to a class instance, b u t to an entire thread. In Scala, there are no static class members. Instead, data that is not specific to a class instance is held in sin- gleton objects. Object members are translated to static class members in the JVM bytecode. Therefore, we declare the thread-local tname as a mem- ber of the application’s object, as opposed to a member of a class or trait. W e o v e r r i d e the initialValue method to provide the initial v a l u e "john". The joeActor responds to the first 'YourName request by first setting its name to "joe jr.", and then sending it back in a reply to the sender. Upon the second 'YourName request the thread-local name is sent back unchanged as a reply. The other actor simply sends two requests and prints their responses. W e e x p e c t the program to produce the following output: yyyooouuurrrnnnaaammmeee::jjjoooeeejjjrrr... However, surprisingly, some e x e c u t i o n s produce the following output: yyyooouuurrrnnnaaammmeee ::jjjoooejr. hn Apparently, in this e x e c u t i o n the second 'YourName request returns the ini- tial v a l u e of the ThreadLocal, e v e n though it has been set previously by the actor. As already mentioned, the underlying problem is that parts of an event-based actor are not always e x e c u t e d by the same underlying thread. After resuming the second rrreeeaaact, it is possible that the actor is e x e c u t e d by a thread that is different from the thread that e x e c u t e d the reaction to the first request. In that case, the Th dLocal has not been updated, yet, containing the initial v a l u e . The above problem can be a v o i d e d as follows. First, we create a subclass of the Actor trait that stores a copy of the thread-local v a r i a b l e . The idea is to restore the actual ThreadLocal using this copy whenever the actor resumes. Conversely, we save the current v a l u e of the ThreadLocal to the actor’s copy whenever the actor suspends. This way, we make sure that the Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 7.1 Chapter 7 · Customizing Actor Execution 82 object ActorWithThreadLocalWrong extends Application { val tname = new ThreadLocal[String] { override protected def initialValue() = "john" } val joeActor = actor { react { case 'YourName => tnaaameeeset "joe jr." seeendr ! tname.get r ct { case 'YourName => sender ! tname.get } } } actor { ppprrriiinnntttlllnnn((("""yyyooouuurrrnnnaaammmeee:::"""+++(((jjjoooeeeAAAccctttooo rr!!!???'''YYYooouuurrrNNNaaammmeee)))))) } } Listing 7.1 · Incorrect use of ThreadLocal. abstract class ActorWithThreadLocccal(private var name: String) exteeendsActor { override val scheeeduler= new S hedulerAdapter { df executtte(block:=> Unit): Unit = ActttorWihThr aaadLocal.super.schedulerexecute { name set n me block name = tname.get } } } Listing 7.2 · Saving and restoring a ThreadLocal. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 7.1 Chapter 7 · Customizing Actor Execution 83 ThreadLocal holds the correct v a l u e while the actor e x e c u t e s , namely the v a l u e associated with the current actor. The solution that we just outlined requires us to run custom code upon an actor’s suspension and resumption. W e can achieve this by o v e r r i d i n g the scheduler that is used to e x e c u t e the actor. However, we only want to o v e r r i d e a specific method of the scheduler, namely the method that receives the code to be e x e c u t e d after an actor resumes and before it suspends. Since this is the most common use case when o v e r r i d i n g an actor’s scheduler, there is a helper trait SchedulerAdapter, which allows us to o v e r r i d e only this required method. This implementation is shown in Listing 7.2. The abstract ActorWithThreadLocal class o v e r r i d e s the scheduler member with a new instance of a ScheeedulerAdaptersubclass. This subclass provides an implementation of the xecute method that receives the code block, which is e x e c u t e d after this actor resumes and before it suspends. T o insert the required code we invoke the execute method of the inherited scheduler, passing a closure that surrounds the e v a l u a t i o n of the by-name block argu- ment with additional code. Before the actor resumes, we restore the thread- local tname with the v a l u e of the actor’s copy in its private name member. Normally, after running the code block, the actor would suspend. In our e x - tended closure we additionally save the current v a l u e of tname in the actor’s name member before we suspend. By making joeActor in Listing 7.1 an instance of ActorWithThreadLocal its thread-local state is managed cor- rectly. abstract class SwingActor extends Actttor{ override val scheduleeer= new SchedulerAdapter { def executtte(((block:=> Unit): Uni = java.aw .EventQu ue.invokeLater(new Runnable() { def run) = block }) } } Listing 7.3 · Executing actors on the Swing e v e n t dispatch thread. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 7.1 Chapter 7 · Customizing Actor Execution 84 Interfacing with e v e n t dispatch threads Some frameworks restrict certain actions to special threads that are managed by the framework. F o r e x a m p l e , the e v e n t queue of Java’s Swing class library is managed by a special e v e n t dispatch thread. F o r thread safety, Swing UI components may only be accessed inside e v e n t handlers that are e x e c u t e d by this dispatch thread. Therefore, an actor that wants to interact with Swing components must run on the e v e n t dispatch thread. This is just one e x a m p l e where it is necessary to “bind” actors to specific threads that are provided by some framework. Another e x a m p l e is a library that interacts with native code through JNI where all accesses must be done from a single JVM thread. By o v e r r i d i n g an actor’s scheduler we can ensure that its actions are e x - ecuted on a specific thread, instead of an arbitrary worker thread of the actor runtime system. F o r this, we can again use the SchedulerAdapter trait, which we have already seen in the previous section. Listing 7.3 shows the implementation of a subclass of Actor that e x e c u t e s its instances only on the Swing e v e n t dispatch thread. F o r this, we o v e r r i d e the scheduler member with a new instance of a SchedulerAdapter that e x e c u t e s the actor’s ac- tions by submitting Runnables to the Swing e v e n t dispatch thread. This is done using the invokeLater method of java.awt.EventQueue. Daemon-style actors In many cases, we don’t have to care about the termination of an actor-based program: when all actors have finished their e x e c u t i o n , the program termi- nates. However, when actors are long-running or react to messages inside an infinite loop, orderly termination of actors and the underlying thread pool can become challenging. Some applications use actors that are always ready to accept requests for work to be processed in the background. T o simplify termination in such cases, it can help to make those actors daemons: the e x i s t e n c e of active daemon actors does not prevent the main program from terminating. This means that as soon as all non-daemon actors have terminated, the application terminates. Listing 7.4 shows how to create actors with daemon-style seman- tics. It suffices to o v e r r i d e the actor’s scheduler method to return the DaemonScheduler object. DaemonScheduler uses the e x a c t same configu- ration as the default Scheduler object, e x c e p t that the actors that it manages Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 7.1 Chapter 7 · Customizing Actor Execution 85 import scala.actors.Actor import scala.actors.scheduler.DaemonScheduler object DaemonActors { class MyDaemon extends Actor { override def scheduler = DaemonScheduler def act() loop { react {{{case num: Int => reply(num + 1)} } } } def main(args: Arraaay[String]){ val d = (new MyD emon).start() println(d !? 41) } } Listing 7.4 · Creating daemon-style actors. do not prevent the application from terminating. In Listing 7.4 the d actor is again waiting for a message after the synchronous send has been served. However, since the main thread finishes, the DaemonScheduler also termi- nates, and with it the d actor. Deterministic actor execution By default, the e x e c u t i o n of concurrent actors is not deterministic. This means that two actors that are ready to react to a received message may be e x e c u t e d in any order, or, depending on the number of a v a i l a b l e processor cores, in parallel. Since actors do not share state,1 you do not have to worry about data races e v e n if the actual e x e c u t i o n order is not known in advance. In fact, for best performance and scalability we would like to have as many actors as possible e x e c u t e d in parallel! 1Currently this is a mere convention; however, efforts ex i s t to have actor isolation checked using an annotation checker plug-in for the Scala compiler. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 7.1 Chapter 7 · Customizing Actor Execution 86 However, in some cases it can be v e r y helpful to e x e c u t e actors determin- istically. The main reason is that a deterministic e x e c u t i o n enables reproduc- ing program e x e c u t i o n s that are not influenced by timing-dependent v a r i a - tions in thread scheduling, which make multi-threaded programs e x t r e m e l y hard to test. class Gear(val id: Int, var speed: Int, val controller: Actor) exteeendsActor { df act() {{{ loop { react case SyncGear(targetSpeed: Int) => println("""[Gear"+id+ ] synchronize from current speed "+speed+ to target speed "+targetSpeed) adjustSpeedTo(targetSpeed) } } } def adjustSpeedTo(targeeetSpeed:Int){ if (targetSpeed > sp ed) { ssspeed+= 1 elf ! SyncGear(targetSpeeeed) } else if (targetSpeed < sp ed) { ssspeed-= 1 elf ! SyncGear(targetSpeed) } else if (targetSpeed == speed) { priiintln("[Gear "+id+"] has target speed") contttroller! SyncDone(this) ex () } } } Listing 7.5 · Synchronizing the speed of Gear actors. F o r e x a m p l e , consider a concurrent application simulating mechanical gears and motors. Assume that the speed of each gear is adjusted using a Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 7.1 Chapter 7 · Customizing Actor Execution 87 controller. T o model the concurrency of the real-world, each gear as well as the controller is represented as an actor. Listing 7.5 shows the implementa- tion of a gear as an actor. The Gear actor responds to SyncGear messages (not shown for brevity) that cause the gear to adjust its speed in a step-wise manner. F o r instance, a gear with current speed 7 units requires two steps to adjust its speed to 5 units. Each step is initiated by a SyncGear message. T o process each message, the gear decrements its speed by a fixed amount and sends itself another SyncGear message if it has not reached its target speed. Otherwise, the gear reports to its controller that it has reached the target speed using a SyncDone message. Let’s write a little driver to test the Gear actor: objeeectNonDeterministicGears { df main(args: Array[String]){ actor { vvvaaalllggg1===(((nnneeewwwGGGeeeaaarrr(((1,,,7,,,ssseeelllfff))))))...ssstttaaarrrttt((())) 2 2 1 ggg1!!!SSSyyynnncccGGGeeeaaarrr(((555))) 2 react { case SyncDone(_) => react { case SyncDone(_) => } } } } } The above driver creates two gears that, initially, are running at speeds 7 and 1, respectively. Afterwards the controller actor instructs the gears to ad- just their speed to 5 by sending asynchronous SyncGear messages. Finally, it waits until both gears have synchronized their speeds. Running the driver produces output such as the following: [[[GGGeeeaaarrr222]]]sssyyynnnccchhhrrrooonnniiizzzeeefffrrrooommmcccuuurrrrrreeenn tttssspppeeeeeeddd1 tttoootttaaarrrgggeeetttssspppeeeeeeddd555 2 111 7 [[[GGGeeeaaarrr]]]sssyyynnnccchhhrrrooonnniiizzzeeefffrrrooommmcccuuurrrrrreeenn tttssspppeeeeeeddd6 tttoootttaaarrrgggeeetttssspppeeeeeeddd555 5 2 3 [Gear 1] has target speed Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 7.2 Chapter 7 · Customizing Actor Execution 88 [Gear 2] synchronize from current speed 4 to target speed 5 [Gear 2] synchronize from current speed 5 to target speed 5 [Gear 2] has target speed As you can see, the speed-adjusting steps of the different gears are inter- leaved, since the gear actors are running concurrently. A subsequent program run may produce an entirely different interleaving of the steps. However, this means that you could not use the above driver for a unit test that compares the actual output to some e x p e c t e d output. Using the SingleThreadedScheduler (in package scala.actors.scheduler) it is possible to make the e x e c u t i o n of concurrent actors deterministic. As its name suggests, this scheduler runs the behavior of all actors on a single thread. Since the e x e c u t i o n of that thread is deterministic, the entire actor system e x e c u t e s deterministically. In particular, any side effects that actors might do as part of their reaction to messages, such as I/O, is done in the same order in all program runs. This scheduler works by running the behavior of actors on a single thread. Usually, this means that the reaction of an actor receiving a mes- sage is immediately e x e c u t e d on the same thread that has been e x e c u t i n g the sender of that message. Since it is a v a l i d pattern to have an actor sending messages to itself in a loop, sometimes the scheduler must delay the process- ing of a message to a v o i d a stack o v e r fl o w. F o r this, the scheduler maintains a queue of reactions that are e x e c u t e d when there is nothing else left to be done. However, it is possible that some reactions remain in the scheduler’s queue just before the application should terminate. Therefore, to make sure that the scheduler processes all tasks, one has to invoke shutdown explicitly. 7.2 Managed blocking The actor run-time system uses a thread pool, which is initialized to use a rel- atively small number of worker threads. By default, the number of workers used is twice the number of processor cores a v a i l a b l e to the JVM. In many cases this configuration allows e x e c u t i n g actors with a maximum degree of parallelism while consuming only little system resources for the thread pool. In particular, actor programs that use only event-based operations such as react can always be e x e c u t e d using a fixed number of worker threads. However, in some cases actors use a mix of event-based code and thread-based code. F o r instance, some methods like receive are imple- Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 7.2 Chapter 7 · Customizing Actor Execution 89 mented using thread-blocking operations. Moreover, actor-based code may have to interoperate with code using the Java concurrency utilities (the java.util.concurrent package). In both cases, operations that may block the underlying thread have to be used with care, so as to a v o i d locking up the entire thread pool. scala.actors.Actor._ iiimmmpppooorrrtttjava.util.concurrent.CountDownLatch objeeecPoolLockup { df main(args: Array[String]){ val numCores = Runtime.getRunnntime().availableProcessors() println("available cores: "+ umCores) val latch = new CountDownLatch(1) for (i <- 1 ttto(numCores * 2))))actor { latch.awai () println("actor "+i+" done" } actor { latch.countDown() } } } Listing 7.6 · Blocked actors may lock up the thread pool. F o r e x a m p l e , Listing 7.6 shows what happens if too many actors are blocked simultaneously. T o simplify the demonstration we use the CountDownLatch class in the java.util.concurrent package. Note that e v e n though the actual code e x a m p l e may not be v e r y useful in and of itself, there are probably places in your actor-based program where the Java con- currency utility classes come in handy. Therefore, the following discussion should be useful to anyone who wants to reuse blocking concurrency code in her actor code. Basically, we use a CountDownLatch to notify a b u n c h of actors once the “main actor” reaches a certain point. T o do this, we initial- ize the latch to one, and tell our actors to wait until the latch becomes zero. Once the main actor sets the latch to zero, the other actors can continue and print a message before terminating. Now, the problem is that if too many actors wait for the latch to become zero, it is possible that all worker threads Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 7.2 Chapter 7 · Customizing Actor Execution 90 in the underlying thread pool are blocked, so that there is no thread left to e x e c u t e the main actor. As a result, the blocked actors wait indefinitely, the thread pool is locked up, and the program fails to terminate. Note that in the e x a m p l e we took care to start twice as many blocking actors as there are processor cores a v a i l a b l e to the VM. This corresponds e x a c t l y to the number of pool threads created by default. Therefore, starting fewer blocking actors will not cause problems, since there will be a pool thread left to e x e c u t e the main actor, which releases the blocked actors. There are several ways to prevent the thread pool from locking up our actor-based program: • Configure the thread pool to create more worker threads on start up; • Use managed blocking to dynamically resize the thread pool before invoking a blocking operation. The first alternative can be implemented either (1) using the actors.corePoolSize and actors.maxPoolSize JVM properties (see Chapter 5), or (2) using a customized scheduler (see Section 7.1). How- e v e r , pre-configuring the thread pool size can be fragile if the number of blocking actors is hard to predict. Moreover, overprovisioning of thread pool resources is likely to negatively impact the performance of your application. The second alternative is a much more efficient way of dealing with blocking operations, since the thread pool grows only on demand, and (usu- ally) only for a short period of time. It also a v o i d s the problem of having to predict the maximum number of actors that may be blocked simultane- ously. The basic idea of managed blocking is to invoke blocking operations indirectly through an interface that allows the thread pool to resize itself be- fore blocking. Additionally, the interface allows the pool to query a wrapped blocking operation to check whether it no longer needs to block. This enables shrinking the pool back to the size it had before growing to accommodate the blocking operation. Listing 7.7 shows how you can use the ManagedBlocker interface to a v o i d locking up the thread pool. Managed blocking requires the use of methods that are not accessible when defining actors inline using actor {...}. Therefore, you have to create your blocking actors by subclassing the Actor trait. Note that inside the body of act we replaced the invoca- tion of latch.await() with a call to managedBlock, a method declared Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 7.2 Chapter 7 · Customizing Actor Execution 91 actors.Ac or actors.Ac or._ iiimmmpppooorrrtttssscccaaalllaaa...concurrenttt.ManagedBlocker java.util.concurrent.CountDownLatch object ManagedBlocccking{ class BlockingAAAtttooorrr(i: Int, latch: CountDownLatch) extends c { val blockeeer= new ManagedBlocker { dddeeefffblock() = { latch.aaawait();true } isR leasable = { l tch.getCount() == 0 } } def acttt(){ scheduler.managedBlock(blocker) prin ln("actor "+i+" done") } } def main(args: Array[String]){ val numCores = Runtime.getRunnntime().availableProcessors() println("available cores: "+ umCores) val latch = new CountDownLatch(1) for (i <- 1 to (numCores * 2)))) (new BlockingActor(i, latch ).start() actor { latch.countDown() } } } Listing 7.7 · Using managed blocking to prevent locking up the thread pool. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 7.2 Chapter 7 · Customizing Actor Execution 92 in the IScheduler trait. It is invoked on the scheduler instance that is used to e x e c u t e the current actor (this). managedBlock takes an instance of ManagedBlocker as an argument. The ManagedBlocker trait is used to wrap blocking operations in a way that allows the underlying thread pool to choose when and how to invoke that operation. The trait contains the following two methods: • dddeeefffblock(): Boolean • isReleasable: Boolean The two methods are supposed to be implemented in the following way. The block method invokes a method that possibly blocks the current thread. The underlying thread pool makes sure to invoke block only in a context where blocking is safe; for instance, if there are no idle worker threads left, it first creates an additional thread that can process submitted tasks in the case all other workers are blocked. The Boolean result indicates whether the current thread might still have to block e v e n after the invocation of block has returned. In most cases it is sufficient to just return true, which indicates that no additional blocking is necessary. The isReleasable method, like block, indicates whether additional blocking is necessary. Unlike block, it should not invoke possibly blocking operations itself. Moreover, it can (and should) return true e v e n if a previous invocation of block returned false, b u t blocking is no longer necessary. The implementations of block and isReleasable in Listing 7.7 are straightforward. The block method simply invokes latch.await and re- turns true after that; clearly, once await has returned no additional block- ing is necessary. In isReleasable we use the getCount method of CountDownLatch to determine whether the call to await has already un- blocked the thread or not. Running the program e x t e n d e d with managed blocking in this way shows that the pool no longer locks up. Managed blocking and receive The receive method allows actors to receive messages in a thread-based way. This means that receive can be used just like any other possibly block- ing operation. This is unlike the react method, which is more lightweight, b u t also more restricted. (Chapter 5 shows how to do event-based program- ming using react.) Since receive uses standard JVM monitors under the Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 7.2 Chapter 7 · Customizing Actor Execution 93 hood, it has the same potential problems as any other blocking code when in- v o k e d from within actors. However, since all v a r i a n t s of receive are imple- mented in objects and types in the scala.actors package, it uses managed blocking internally to a v o i d thread pool lock-ups. Consequently, there is no need to wrap invocations of receive in ManagedBlockers in user code. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Chapter 8 Remote Actors Scala actors can communicate with each other not only within the same Java V i r t u a l Machine address space, b u t also across virtual machines, and e v e n across network nodes. T o e x p l a i n the constructs involved in using remote actors, in this chapter we are revisiting the chat e x a m p l e application of Chap- ter 4. Y o u are going to learn how to create remote actors, and how to address and communicate between remote actors. The chat application of Chapter 4 creates an actor that is responsible for managing a chat room. Clients send v a r i o u s types of messages to the chat room actor, such as Subscribe and Unsubscribe messages. By making the chat room actor remotely accessible, the chat service can be used across a network. 8.1 Creating remote actors Listing 8.1 shows how to turn the chat room actor into a remote actor. First, the actor runtime system needs to be informed that the actor wants to en- gage in remote communication with other actors. This is done by invoking the alive method of the RemoteActor object. It requires specifying a port number which is used to listen for incoming TCP connections. Actors run- ning on different machines in the network use this port number to obtain a remote reference to the chat room actor. The port number is not enough to uniquely identify the actor, though; several remote actors may be accessible via the same port. Therefore, remote actors must be registered under a name which is unique for a given port number. This is done using the register method of RemoteActor: Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 8.1 Chapter 8 · Remote Actors 95 import scala.actors.Actor import Actor._ import scala.actors.remote.RemoteActor.{alive, register} class ChatRoom extends Actor { def act() { alive(9000) register('chatroom, self) while (true){ receive { cccaaassseeeSubscribe(user) => //////hhhaaannndddllleeesubscriptttions UUUnsubscribe(user) => uuunsubscriptions serPost(user, post) => ser pos s } } } } Listing 8.1 · Making the chat room actor remotely accessible. def register(name: Symbol, a: Actor): Unit The method e x p e c t s two arguments: the first argument is the name under which the actor should be registered. Note that names are Symbols, which are similar to strings that are always interned, b u t with a more lightweight syntax. The second argument is the actor that should be registered, in the e x a m p l e in Listing 8.1 simply self. Subsequently, it is possible to obtain a remote reference to the chat room actor using the port number, the IP address of the machine that the actor is running on, and the name under which it is registered on that machine. Note that it is possible to change the name under which an actor is reg- istered by repeatedly invoking register, passing different symbols. How- e v e r , at any point in time an actor is registered under a single name only. The most recent invocation of register "wins." Messages f o r r e m o t e communication T o communicate with the chat room, messages must be serialized and sent o v e r the network. Therefore, it is necessary that the message classes are Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 8.2 Chapter 8 · Remote Actors 96 serializable. Fortunately, the message classes defined in Listing 4.1 are case classes which are serializable by default. 8.2 Remote communication The chat room actor can now receive messages from actors running on dif- ferent nodes on the network. However, its clients first have to obtain a remote reference to it. This is done using the select method of the RemoteActor object: def select(node: Node, sym: Symbol): AbstractActor The first argument is a Node instance which specifies the IP address and port number of the target node. The second argument is the name of the target actor. Invoking select returns an object of type AbstractActor. Y o u can think of AbstractActor as a trait that contains all the functionality of Actor e x c e p t for methods that are not supported by remote actors, such as start, restart, and getState. Node is a case class defined as follows: case class Node(address: String, port: Int) F o r e x a m p l e , let’s select the chat room actor running on the local node on port 9000: val chatRoom = select(Node("127.0.0.1", 9000), 'chatroom) The AbstractActor reference returned by select can then be used to com- municate with the chat room using the usual message send operations: ccchhhaaatttRRRoooooommm!!!Subscribe(User("Alice")))) ? Subscribe(User("Bob") val future = chatRoom !! Subscribe(User("Charly")) ... Just like with local actors, in all the above cases the remote actor implicitly receives a reference to the sending actor. As before, the remote actor can access the reference via the sender method of the Actor object. This means that the chat room actor’s code to process incoming messages does not have to change. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 8.3 Chapter 8 · Remote Actors 97 Selecting an actor using a name that has no actor registered on the target node will not cause the select invocation to fail (by throwing an e x c e p t i o n , say). Instead, the AbstractActor reference returned by select is lazy (or delayed) in the sense that no attempt to communicate with the remote node is made at this point. The select method merely creates a proxy object which forwards all messages it receives to the remote actor. Sending a message to a remote actor (or rather, its proxy) will result in a lost message if the symbol passed to select is not registered with an e x i s t i n g actor on the target node. Linking to r e m o t e actors An actor can link itself to a remote actor just like a local actor. It does not matter whether the receiver of an invocation of link is local or (also) remote. Reacting to the termination of linked remote actors is unchanged compared to the non-remote case as discussed in Chapter 6. 8.3 A remote start service In a distributed application it is often useful to have a parent actor manage several child actors that run on different nodes on the network. Moreover, the number of child actors usually is not fixed when the application (or the parent actor) starts; instead, the parent actor must be able to start child actors dynamically (depending on input data, the state of the application etc.). This design typically requires a service that allows actors to be started remotely. Using such a remote start service a parent actor can start new child actors on nodes different from its o w n node. In the following we are going to e x p l a i n how to implement such a remote start service. W e can model our remote start service as a remote actor which responds to the following two types of messages: cccaaassseeeclass Start(clazz: Class[_ <: Actor]) object Stop An instance of the Start case class should instruct the remote start service to create and start an actor of the type specified by the clazz argument. Instances of type Class[_ <: Actor] are run-time representations of classes that e x t e n d the Actor trait. In general, an object of type Class[A] can be used to create instances of class A.1 This means that we can use the argument 1Java’s reflection framework adds the constraint that the instantiated class type must define a no-argument constructor. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 8.3 Chapter 8 · Remote Actors 98 of a Start message to create Actor instances which can then be started on the remote node. The Stop case object is a message instructing the remote start service to terminate itself. class Server extends Actor { var numStarted = 0 def act){ aliiive(((19000) register('serveeer, this) pr ntln("remot start server running...") loop { react { case Start(clazz) => val aaa:Actor = clazz.newInstance() a.st rt() numStarted += 1 reply() case Stop => priiintln("remote start server started " + numStarted + " remote actors") ex t() } } } } Listing 8.2 ·A server actor implementing a remote start service. Listing 8.2 shows a Server actor which implements a basic remote start service. Inside the act method we use alive and register to make the actor remotely accessible on port 19000 under the name 'server. After that the actor loops, reacting to the above Start and Stop messages. When the next message matches the Staaart(clazz)pattern, we create a new instance of the Actor subclass that cl zz represents by invoking newInstance. The object returned by newInstance has type Actor since the type parameter in the type of clazz is constrained to be a subtype of Actor. After starting the new actor, the remote start actor replies to the sender of the Start message. The reason is that Start messages are supposed to be sent synchronously; Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 8.3 Chapter 8 · Remote Actors 99 this ensures that the new actor has been started when the synchronous mes- sage send completes. class EchoActor extends Actor { def act() { alive(19000) gister('echo, this) rrreeeact{ case any => reply("echo: " + any) } } } Listing 8.3 · An echo actor that can be started remotely. Listing 8.3 shows an EchoActor class that is suitable for remote starting. T o be able to interact with new EchoActor instances remotely, we use the aaallliiivvveeeand register methods like in previous e x a m p l e s . Note that calling twice on the same node with the same argument port number has no effect. This is important, since the alive invocation of a new EchoActor instance will be called while running on the same node as the Server actor. Let’s use our remote start service to start a new EchoActor: vvvaaalllserver = seeelect(Node("localhost", 1900000), 'server) serveeer!? Start(classOf[EchoActor]) cho ===sel ct(Node("localhost", 1900 ), 'echo) val resp echo !? "hellllo" println("remote start c ient received " + resp) First, we obtain the server remote reference to the remote start service using select. T o start a new instance of EchoActor on the node that runs the remote start service, we send a Start(classOf[EchoActor]) message to the server. After that the remotely started actor can be referenced using select using the name that the EchoActor used to register itself on the remote node. The echo reference is a normal remote actor reference; as e x p e c t e d , running the above code results in the following message printed to the console: remote start client received echo: hello Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Chapter 9 Using Scala Actors with Java APIs One of Scala’s promises to developers is that Scala code can seamlessly in- v o k e Java APIs. That allows you to combine the best of both worlds: make use of thousands of e x i s t i n g Java APIs, b u t also take advantage of Scala’s advanced language features. This chapter will describe, and illustrate with e x a m p l e s , how to use Scala actors with e x i s t i n g Java APIs, including Java EE 6. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Chapter 10 Distributed and P a r a l l e l Computing The Scala Actors API puts at your disposal a powerful, yet simple, parallel computing framework b u i l t on top of the JVM. This chapter illustrates how to accomplish some common parallel and distributed computing tasks with actors. In particular, we are focusing on two patterns that are useful in many applications: MAPREDUCE, and reliable broadcasting. MAPREDUCEis a paradigm for parallel and distributed programming which has been estab- lished as a de facto standard to accomplish a wide v a r i e t y of tasks, such as (hypertext) document processing, machine learning, and data mining. Re- liable broadcasting, on the other hand, is often necessary in distributed ap- plications where machines in a cluster can fail due to hardware outages or communication delays. 10.1 MapReduce MAPREDUCEis a parallel computing framework originally developed at Google to simplify programming large-scale distributed computations while providing fault tolerance and e x c e l l e n t scalability.1 MAPREDUCEsimplifies parallel programming, since the programmer does not have to manage par- allelism explicitly. Instead, the MAPREDUCEframework is taking care of creating parallel tasks, synchronizing them, and distributing the work load. Moreover, a MAPREDUCEimplementation typically also provides fault tol- erance. This means that it is possible to successfully complete a MAPRE- 1Dean and Ghemawat, “MapReduce: simplified data processing on large clus- ters” [Dea08] Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.1 Chapter 10 · Distributed and P a r a l l e l Computing 102 DUCEcomputation e v e n if some of the machines in the cluster fail to com- pute or to communicate their results. MapReduce history Why was MAPREDUCEinvented at Google? Jeffrey Dean and Sanjay Ghemawat, the Google engineers that invented MAPRE- DUCE, recount that the abstraction emerged after they had written hundreds of special-purpose computations to process large amounts of raw data, such as crawled web pages, web server logs, etc. While the computations per- formed on the data were simple, the input data was so large that the compu- tation had to be performed in parallel if it was to finish within a reasonable amount of time. Google’s data centers do not consist of large, e x p e n s i v e supercomput- ers. Instead, they are populated with large clusters of inexpensive commod- ity hardware, typically Linux desktop machines connected via an ethernet network. Computations thus needed to be parallelised for a distributed envi- ronment in which network bandwidth is scarce and machine failures are v e r y common. As a result, the simplicity of the computations was lost in the com- plexity of recurring issues such as how to distribute the data, how to par- allelise the computation across machines, how to deal with load imbalance, machine failures and so on. Inspired by the map and r e d u c e higher-order functions from functional programming, Dean and Ghemawat identified a way to separate out the computation-specific parts into higher-order func- tions. The programmer supplies just these functions, and the MAPREDUCE framework calls them on an appropriate machine, with part of the input data, hiding most of the complexity of the parallel and distributed computing en- vironment. MAPREDUCEtruly is a success story based on the principles of higher-order functional programming. Let’s take a look at a concrete e x a m p l e which is amenable to parallel processing. Consider the task of b u i l d i n g an inverted index for a collection of text files. An inverted index can be used to quickly look up the (list of) files in which a given word occurs. T o create such an index the mapping from files to their contents must be "inverted"–hence the name "inverted index." Our strategy for b u i l d i n g it is as follows. F o r each file f we will create a list that contains pairs (word, f) where word is a word occurring in f. This means that each of these lists contains pairs that all have the same second component–the file in which the word occurs. In the next step, we will go Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.1 Chapter 10 · Distributed and P a r a l l e l Computing 103 through all of the lists, and fill a Map that maps words to lists of files in which they occur; the file lists should not contain duplicates. The reason why we first create lists of word/file pairs (instead of directly b u i l d i n g the Map) is that this allows us to parallelize the task: each file can be processed in parallel to create the intermediate lists. def inve dIndex(input: List[(File, List[String])]) = { vvvaaalllmasttteeerrr= self wooorrrks = for ((file, words) <- input) yield act { val wordsAndFiles = for (word <- words) yield (word, file) master ! Intermediate(wordsAndFiles) } varrrintermediates = List[(String, File)]() fo (_ <- 1 to input.lengttth) receive { case Intermediate(lis) => intermediates :::= list } varrrdict = Map[String, List[File]]() withDefault (k => List()) fo ((word, file) <- iiintermedddiiiatttes) dict += (word -> (f le :: c (word))))) varrrresult = Map[String, List[File]]( fo ((word, files) <- dict) yield result += (word -> files.distinct) result } Listing 10.1 ·A function for b u i l d i n g an inverted index. Listing 10.1 shows a parallel implementation using actors. F o r each file in the input list, we create a worker actor. A worker generates the list of word/file pairs, and sends them in an Intermediiiateeemessage back to the master actor (master is the actor invoking the nv rtedIndex method.) The Intermediate class is defined as follows: case class Intermediate(list: List[(String, File)]) Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.1 Chapter 10 · Distributed and P a r a l l e l Computing 104 The master actor concatenates all the intermediate results it receives which yields a list of type List[(String, File)]. The word/file pairs in this list are inserted into a Map, so that each word gets mapped to a list of files in which it occurs. Finally, we remove duplicates from these file lists, yielding the result map which represents our inverted index (the inferred result type of the invertedIndex method is Map[String, List[File]].) The parallel construction of our inverted index follows a certain pattern which–as it turns out–is useful for many applications. Let’s walk through that pattern step-by-step. In the first step, a function, let’s call it the mapping function, is applied to each pair of the input list in order to produce another list of pairs. In the e x a m p l e , we generate for each input pair of type (File, List[String]) a list of pairs of type (String, File)–each pair associates a word with the file in which it occurs. W e can encapsulate just this compu- tation in the following function: defffmapIndex(file: File, words: List[String]) = or (word <- words) yield (word, file) (The inferred result type of mapIndex is List[(String, File)].) The mapping function (mapIndex or some other function) is applied in parallel by creating a worker actor for each input pair: val wooorrrkeeerrrs= for ((key, value) <- input) yield act { mast ! Intermediate(mapping(key, value)) } Note that we changed the code to call the (generic) mapping function to pro- duce the list of intermediate pairs. Moreover, we renamed the components of an input pair, replacing (file, words) by (key, value). The reason is that besides factoring out the mapping function, this "mapping stage" can be generalized further to operate on arbitrary inputs of type List[(K, V)]. The type of the intermediate output pairs can be generic, too. Instead of always producing a List[(String, File)], a mapping function may produce a List[(K2, V2)]. T a k e n together, the generic mapping function has type (K, V) => List[(K2, V2)]. The next step is to collect the intermediate results sent to the master actor in Intermediate messages: Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.1 Chapter 10 · Distributed and P a r a l l e l Computing 105 var intermediates = List[(K2, V2)]() for (_ <- 1 to input.length) receive { case Intermediate(list) => intermediates :::= list } After the mapping function has been applied to each input pair (a file paired with its contents), we group the intermediate output pairs by their first com- ponent, inserting them into a Map: varrrdict = Map[K2, List[V2]]() withDefault (k => List()) fo ((key, value) <- intermedddiiiatttes) dict += (key -> (value :: c (key))) Subsequently, the intermediate results which have been grouped by their key are further processed using a function–let’s call it the reducing function–to yield the final result. In the e x a m p l e , the list of files corresponding to each word is reduced by removing duplicates. Again, it is easy to define a function which encapsu- lates just this reduction step: def reeeduceIndex(keyString, files: List[File]) = fil s.distinct The reducing function is applied to each entry in the dict map of interme- diate results, yielding the result map: varrrresult = Map[K2, List[V2]]() fo ((key, value) <- dict) result += (key -> reducing(key, value)) According to its use in the above reduction step, the reducing function has type (K2, List[V2]) => List[V2]. Listing 10.2 shows a generic function, called mapreduce, which imple- ments the parallel programming pattern that we just described. As its name suggests, it is a basic (in-memory) MAPREDUCEimplementation. Note that we have moved the declaration of the Intermediate case class into the method body. The reason is that this allows us to recover more type information when pattern matching on Intermediate messages. Let’s see what happens if Intermediate is defined outside the mapreduceBasic method, like this: Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.1 Chapter 10 · Distributed and P a r a l l e l Computing 106 def mapreduceBasic[K, V, K2, V2]( inpppuuut:List[(K, V)], ma ping: (K, V) => List[(K2, V2)], red cing: (K2, List[V2]) => List[V2] ): Map[K2, List[V2]] = { caaaseclass Intermediate(list: List[(K2, V2)]) vvvlllmasteeerrr= self a wooorrrks = for ((key, value) <- input) yield act { master ! Intermediate(mapping(key, value)) } varrrintermediates = List[(K2, V2)]() fo (_ <- 1 to input.lengttth) receive { case Intermediate(lis) => intermediates :::= list } varrrdict = Map[K2, List[V2]]() withDefault (k => List()) fo ((key, value) <- intermedddiiiatttes) dict += (key -> (value :: c (key))) varrrresult = Map[K2, List[V2]]() fo ((key, value) <- dict) result += (key -> reducing(key, value)) result } Listing 10.2 ·A basic MAPREDUCEimplementation. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.1 Chapter 10 · Distributed and P a r a l l e l Computing 107 case class Intermediate[K2, V2](list: List[(K2, V2)]) Now, when pattern matching against an instance of Intermediate, an in- stantiation of the type parameters has to be found. When matching against Intermediate messages received inside mapreduceBasic, K2 and V2 are both instantiated to Any. As a result, we get the following type error: ...: errrror:type mismatch; found ::LLLiiisssttt[[[(((Any,Any)] requi ed K2, V2)] case Intermediate(list) => intermediates :::= list one error found By moving the definition of the Intermediate class into the method body, the type of its list argument is List[(K2, V2)] where K2 and V2 are no longer generic, b u t fixed to the type arguments of the enclosing method. Then, in the pattern match, list has type List[(K2, V2)] which makes it compatible with the type of the intermediates list. Parallel r e d u c t i o n s The basic MAPREDUCE implementation shown above can be improved by parallelizing the reduction stage in addition to the mapping stage. Listing 10.3 shows how to modify our previous imple- mentation to apply the reducing function in parallel. Similar to the mapping stage, we create an actor for each key in the dict map. The actor applies the reducing function to the key and the v a l u e s with which the key is as- sociated in dict. The result is sent to the master actor in a message of type Reduced. The master collects these messages like the Intermediate messages before, this time yielding the final result map. Now that we have parallelized the reduction stage, let us summarize the basic e x e c u t i o n model of MAPREDUCE: • The MAPREDUCEcomputation is supervised by a single actor, called the master. • Input data is represented as a list of “records”, represented here as pairs of type (K, V). The master partitions the input data across a set of “mapper” workers. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.1 Chapter 10 · Distributed and P a r a l l e l Computing 108 def mapreduce[K, V, K2, V2]( input: List[(K, V)], mapping: (K, V) => List[(K2, V2)], reducing: (K2, List[V2]) => List[V2] ): Map[K2, List[V2]] = { Intermediate(list: List[(K2, V2) cccaaassseeeccclllaaassssssReduced(key: K2, values: List[V2]]]))) // ... val reducers = for ((((kkkeeeyyy,,,valuuues)<- dict) yield actor { master ! Reduced red cing(key, values)) } varrrreeesult= Map[K2, List[V2]]() fo (_ <- 1 to dict.size) rec ive { case Reduced(((kkkeeeyyy,values) => result += -> values) } result } Listing 10.3 · Applying the reducing function in parallel. • Each mapper worker is a separate actor that applies the mapping func- tion to a different part of the input data in parallel. F o r each input pair (k, v), a mapper generates a new list of pairs, which may be of a different type (K2, V2). • The master collects the intermediate results of the mapping stage and then sorts this data according to the intermediate key type K2. • F o r each such key, the master asks a “reducer” worker to reduce the list of v a l u e s of type V2. Again, each reducer is a separate actor that can perform this step in parallel. Reducers may return a List[V2], b u t it is not uncommon for this list to contain a single, reduced, v a l u e . Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.1 Chapter 10 · Distributed and P a r a l l e l Computing 109 • The master collects the reduced V2 v a l u e s and simply combines them in a result Map. Figure 10.1 illustrates the flow of data between master, mapper and re- ducer actors for the inverted index e x a m p l e . Figure 10.1 · Dataflow in a basic MAPREDUCEimplementation. Fault-tolerance Our basic MAPREDUCE implementation is not fault- tolerant, that is: it cannot tolerate the failures of either the master actor or any of the mapper or reducer worker actors. If a worker crashes, the master will block waiting for a reply indefinitely. In our e x a m p l e implementation, since we are assuming a shared-memory environment, chances are that a system failure will bring all of the actors to a halt. However, in a distributed MAPREDUCEimplementation, the master and workers e x e c u t e on different machines. In such an environment, partial failure, e.g. a single machine failure, can be common. Let us therefore e x t e n d our sample implementation to at least tolerate worker failures. W e will use the mechanisms described in section 6.2 to implement fault tolerance: the master actor will link itself to all of the workers it spawns, and is configured to trap e x i t s (using self.trapExit = true). This causes the worker actors to send a special Exit message to the master when they terminate, allowing the master to identify crashed workers. Listing 10.4 e x t e n d s the basic MAPREDUCEimplementation of List- ing 10.2 with this fault-tolerance mechanism. Notice that the mapper actors are spawned using link instead of actor such that they are automatically linked to the master actor. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.1 Chapter 10 · Distributed and P a r a l l e l Computing 110 def mapreduce[K, V, K2, V2]( input: List[(K, V)], mapping: (K, V) => List[(K2, V2)], reducing: (K2, List[V2]) => List[V2] ): Map[K2, List[V2]] = { cccaaassseeeccclllaaassssssIntermediate(list: List[(K2, V2)]]]))) Reduced(key: K2, values: List[V2 vvvaaalmassster= self self.trapExit = true r as ignedMappers = Map[Actor, (K, V)]() def spawnMapper(key: K, value: V) = { val mapper = link { masteeer! Intermediate(mapping(key, value)))) } assign dMappers += (mapper -> (key, value) mapper } for ((key, valueee)<- input) spawnMapper(k y, value) vvvaaarrrintermediiiates= List[(K2, V2)]() nleft = nput.length while (nleft > 0) receive { cccaaassseeeIntermediate(list)))===>>>innnteeermediates:::= list EEExxxiiittt(((fffrrrooommm,,,'normal) => l ft -= 1 reason //////retrieveeeassigned worrrk val (key, value) = assignedMappers(from) spawn nw worker to e-execute the work spawnMapper(key, value) } // ... Listing 10.4 ·AMAPREDUCEimplementation that tolerates mapper faults. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.1 Chapter 10 · Distributed and P a r a l l e l Computing 111 F o r each spawned worker, the master also keeps track of what key-value pair it assigned to that worker. When a worker terminates with an abnormal reason, the master looks up what pair it assigned to the terminated worker and spawns a new worker to process the same input. When a worker termi- nates with a 'normal reason, the master decrements a count identifying the number of outstanding jobs. When that number becomes zero, the master knows that all workers (either the original or restarted ones) have finished. Our technique of simply re-executing a failed mapping is sound as long as the mapping function is really a function, i.e. if it has no side-effects. Otherwise, failures may affect the outcome of a MAPREDUCEcomputation. As Scala provides the right b u i l d i n g blocks to program in a functional style, restricting oneself to a purely functional subset to implement the mapping and reducing functions is usually not a problem. Another point worth mentioning is that this simple re-execution strat- e g y does not cope with deterministic errors. Say, for e x a m p l e , that there is a b u g in the mapping function that only manifests itself for particular v a l - ues of type K. The b u g may cause an e x c e p t i o n , terminating the worker. In this case, simply starting a new worker to process the same input will cause the e x c e p t i o n again, leading to endless re-execution, and no progress for the master. Actual MAPREDUCEimplementations deal with such cases by skip- ping o v e r such “bad” input data, giving up after a few retries. This makes sense, since MAPREDUCEis typically used for workloads where the loss of a little input data can still lead to a useful answer (e.g., indexing, search, data mining, ...). Note that e v e n in this e x t e n d e d implementation, the master is still a sin- gle point of failure: if it crashes, the entire MAPREDUCEcomputation is brought to a halt. If the chance of a master failure is small and the MAPRE- DUCEcomputation is not too large, it may not be worth dealing with this case. Otherwise, one has to either replicate the master, or to periodically checkpoint the state of the master to persistent storage, b u t these techniques are beyond what we can address in this chapter. Coarse-grained worker tasks In our previous implementations of MAPREDUCE, a new mapper actor is spawned for each input key K and a new reducer actor is spawned for each intermediate key K2. When the input data is large (e.g., when indexing thousands of files each containing thou- sands of words), this simple strategy may introduce too much o v e r h e a d . T o Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.2 Chapter 10 · Distributed and P a r a l l e l Computing 112 reduce this o v e r h e a d , we can make tasks assigned to the mapper and reducer workers more “coarse-grained” by having each of them process multiple key- v a l u e pairs. Listing 10.5 b u i l d s on the MAPREDUCEimplementation from List- ing 10.3 and adds support for coarse-grained tasks. The coarse-grained im- plementation takes two e x t r a arguments: numMappers, the number of par- allel mapper workers to spawn, and numReducers, the number of parallel reducer workers to spawn. The magic of this implementation is hidden inside a useful function from the Scala Seq API: grouped. Calling grouped(n) on a sequence returns a new sequence that returns elements from the original sequence, grouped in groups of size n. T o create more coarse-grained tasks, we split the input and the intermediate data into groups of an appropriate size, and then spawn a worker actor per group. Each such worker processes all key-value pairs in its assigned group. T o ensure that we spawn only numMappers mappers, it suffices to group the input data into groups of size input.length / numMappers. F o r e x - ample, if numMappers is 10 and we need to process 5000 files, then each mapper will need to process 500 pairs. Similarly, we partition the dic- tionary dict containing the sorted intermediate data into groups of size dict.size / numReducers. If the size of the data is not e x a c t l y divisi- ble by the required number of workers, the last group returned by grouped will contain less elements, so the last worker will get assigned less work. Our simple strategy of dividing work equally among the workers is fine as long as the amount of processing to be done is approximately the same for all keys. Actual MAPREDUCEimplementations will often employ more elaborate data distribution techniques to balance the load between the work- ers at run time if the work is not e v e n l y distributed. 10.2 Reliable broadcast When b u i l d i n g a distributed application it is often necessary that more than two actors operate in a coordinated manner. F o r e x a m p l e , you may want to ensure that all of your remote actors carry out some action, or none of them, while providing a way to handle the error case. F o r e x a m p l e , if a set of remote actors should save their internal state to a data base, then typically you want all of them to do it, or none, so that there is always a consistent Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.2 Chapter 10 · Distributed and P a r a l l e l Computing 113 def coarseMapReduce[K, V, K2, V2]( input: List[(K, V)], mapping: (K, V) => List[(K2, V2)], reducing: (K2, List[V2]) => List[V2], numMappers: Int, numReducers Int) Map[K, List[V2]] = { cccaaassseeeccclllaaassssssIntermediate(list::List[(K2, V222)]]]))) Reduced(key: K2, values::List[V2 val master = self for (grrroup<- input.grouped(input.length / numMappers)) acto { for ((key, value) <- group) master ! Intermediate(mapping(key, value)) } vvvaaarrrintermediates = ... dict = Map[K2, List[V2]]() withDefault (k => List()) fffooorrr((((key,value) <- inteeermedddiiiatttes) dict += (key -> (valuuue:: c (key))) grrroup<- dict.grop d(dict.size / numReducers)) acto { for ((key, values) <- group) master ! Reduced(key, reducing(key, values)) } varrrreeesult= Map[K2, List[V2]]() fo (_ <- 1 to dict.size) rec ive { case Reduced(((kkkeeeyyy,values)=> result += -> values) } result } Listing 10.5: A MAPREDUCEimplementation with coarse-grained worker tasks. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.2 Chapter 10 · Distributed and P a r a l l e l Computing 114 view persisted to the data base. abstract class BroadcastActor extends Actor { // can be set by external actor, therefore @volatile @volatile var isBroken = false private var canRun = true ivate var counter = 0L ppprrrotecteddef broadcasttt(mBSend) = if (!isBroken) { for (a <- m.recipien s) a ! BDeliver(m.data) } else if (canRun) { canRun = false // simulate it being broken for (a <- m.recipients.take(2)) a ! BDeliver(m.data) println("error at " + this) } // to be overridden in subtraits protected def reaction: PartialFunction[Any, Unit] = { case BCast(msg, receiverrrs)=> counter +===1 broadcast(BSend(msg, eceivers, counter)) case 'stop > exit() } def act = loopWhile (canRun) { react(reaction) } } Listing 10.6 · Best-effort broadcasting. Basic broadcasting Instructing a number of actors to carry out some action can be done by broad- casting a message to these actors. Listing 10.6 shows a simple broadcast im- plementation. First, we define a BroadcastActor which implements actttin a way that allows reacting to messages of type BCast and the special 's op message which causes the actor to terminate. The BCast case class is defined as follows: case class BCast(data: Any, recipients: Set[Actor]) Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.2 Chapter 10 · Distributed and P a r a l l e l Computing 115 A BCast message tells the actor to send some data to a set of actors spec- ified in the message. Note how the message handlers are defined using the reaction member which is a partial function that is passed to react in- side the act method. This has the advantage that subclasses can o v e r r i d e reaction to handle additional message patterns while inheriting some of the message handling logic from the supertrait. The broadcast method implements the actual message sending. broadcast is invoked passing a BSend message which contains the data, the set of recipients, and a time stamp (initially, we will not make use of the time stamp, though): case class BSend(data: Any, recipients: Set[Actor], timestamp: Long) T o make things more interesting, we allow an actor to be "broken" which is e x p r e s s e d using the v o l a t i l e isBroken field (the field is v o l a t i l e to safely allow changing its v a l u e from a different actor.) A broken actor fails to send the message to all of the recipients. The actual data is wrapped in a message of type BDeliver which is defined as follows: case class BDeliver(data: Any) A BDeliver message indicates to the recipient that the data was delivered using a broadcast. class MyActor extends BroadcastActor { override def reaction ===super.reaction orElse { case BDeliver(data) > println("Received broadcast message: " + data + " at " + this) } } Listing 10.7 · Using the broadcast implementation in user code. T o use the broadcast implementation in actual user code, we e x t e n d the BroadcastActor and o v e r r i d e its reaction member as shown in List- ing 10.7. The message handling logic of BroadcastActor must be enabled alongside the new handler for BDeliver messages. T o do this, we com- bine super.reaction with the new handler using orElse. As a result, MyActor instances respond to Broadcast and 'stop messages as defined in BroadcastActor in addition to BDeliver messages. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.2 Chapter 10 · Distributed and P a r a l l e l Computing 116 Let’s try out this basic broadcast implementation: val a1 = new MyActor; a1.start() val a2 = new MyActor; a2.start() val a3 = new MyActor; a3.start() val a4 = new MyActor; a4.start() a1 ! Broadcast("Hello!", Set(a1, a2, a3, a4)) As e x p e c t e d , running the above code will produce output like the following: RRReeeccceeeiiivvveeedddbbbrrroooaaadddcccaaassstttmmmeeessssssaaagggeee::HHHeeellllllooo!!!aaatttMMMyyyAAAccctttooorrr@@@3c3c9217 15af33d666 54520eb Received broadcast message: Hello! at MyActor@2c9b42e However, let’s try and set actor a1’ s isBroken field to true and initiate another broadcast: aaa111.isBroken= true ! Broadcast("Hello again!", Set(a1, a2, a3, a4)) Then, the output will look differently: error at MyActor@15af33d6 Received broadcast message: Hello again! at MyActor@2c9b42e6 In the above run, only one other actor besides a1 itself received the "Hello again!" message, because a1 failed before sending out more messages. In an actual distributed application the reason could be a machine failure or a network link which is down. T o guarantee that all recipients receive the broadcast message or none, it is necessary to implement a r e l i a b l e broadcast. Reliable broadcasting T o make the message broadcasting reliable we will e x t e n d our code to im- plement an algorithm known as "eager reliable broadcast." The idea of this algorithm is that e v e r y recipient of a broadcast message should forward that message to e v e r y other recipient. Since the forwarding should be done re- gardless of any failure, broadcasting is "eager." W e add this layer of reliability as follows. First, we e x t e n d the BroadcastActor as shown in Listing 10.8. Using the delivered set, an Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.2 Chapter 10 · Distributed and P a r a l l e l Computing 117 class RbActor extends BroadcastActor { var delivered = Set[BSend]() override def reaction = super.reaction orElse { case m @ BSend(data, _, _) => if (!delivered.contains(m)) { delivered += m broadcast(m) this ! BDeliver(data) } } } Listing 10.8 ·A reliable broadcast actor. RbActor keeps track of the BSend messages that it has received. Whenever it receives one, the actor checks whether it has already processed the message. This is the case if the condition delivered.contains(m) is true, since af- ter processing a BSend message, it is added to the deliverrredset. However, if the actor receives a fresh BSend message, it’ll invoke b oadcast to send it to all recipient actors. Moreover, it’ll send itself aBDeliver message, indicating that it received the data via a (reliable) broadcast. It is crucial here that each BSend message contains a time stamp. The time stamp is set in the BroadcastActor when creating a new BSend mes- sage as a reaction to receiving a (broadcast-initiating) BCast message. The time stamp let’s us identify to which broadcast a particular BSend message belongs. The messages forwarded by each RbActor do not change that time stamp. This way, each actor knows when it has already received a broad- cast message, in which case it does not forward it further. However, for the RbActor to work properly we need to slightly change the implementa- tion of the BroadcastActor. Basically, it is no longer sufficient to send a BDeliver message inside the broadcast method. The reason is that the RbActor needs to receive BSend messages, since only those contain time stamps. Therefore, we have to change the broadcast method accordingly; this is shown in Listing 10.9. As you can see, the BroadcastActor now simply sends the BSend mes- sages to the recipients (a ! m), instead of sending a BDeliver message which only contains the data (a ! BDeliver(m.data)). Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 10.2 Chapter 10 · Distributed and P a r a l l e l Computing 118 protected def broadcast(m: BSend) = if (!isBroken) { for (a <- m.recipients) a ! m } else if (canRun) { canRun = false // simulate it being broken for (a <- m.recipients.take(2)) a ! m println("error at " + this) } Listing 10.9: Sending messages with time stamps inside the broadcast method. Having made these changes, let’s re-run our client code. Before we can do that, however, we first have to change MyActor to e x t e n d RbActor instead of BroadcastActor: class MyActor extends RbActor { override def reaction ===super.reaction orElse { case BDeliver(data) > println("Received broadcast message: " + data + " at " + this) } } Running our short test code from above (setting a1.isBroken to true) should now produce output like the following: error at MyActor@28e70e30 RRReeeccceeeiiivvveeedddbbbrrroooaaadddcccaaassstttmmmeeessssssaaagggeee:::HHHeeelllllloooaaagggaaaiiinnn!!!aaatttMMMyyyAAAccctttooorrr@@@5954864a 3c3c9217 1ff82982 As you can see, e v e n though actor a1 failed after sending the broadcast mes- sage to itself and actor a2, actors a3 and a4 also received the message. The reason is that a2 sent the BSend message it received from a1 to all its recip- ients which includes a3 and a4. In fact, it is possible to prove mathemati- cally that the strategy of eager reliable broadcast will deliver a message to all recipients or none, provided the network communication between actors is based on a reliable transport protocol such as TCP which remote actors use by default. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Chapter 11 API Overview This chapter provides a detailed o v e r v i e w of the API of the scala.actors package of Scala 2.8. The organization follows groups of types that logically belong together as well as the trait hierarchy. The focus is on the run-time be- havior of the v a r i o u s methods that these traits define, thereby complementing the e x i s t i n g Scaladoc-based API documentation. 11.1 The actor traits Reactor , ReplyReactor , and Actor Actors can be created based on several traits which form a simple hierarchy: Actor <: ReplyReactor <: Reactor (read "<:" as "extends"). There are two main reasons to prefer using a simpler trait (that is, a trait further to the right in the hierarchy) instead of a subtrait: T y p e s F o r e x a m p l e , the Reactor trait has a type parameter which restricts the type of messages that can be received by instances of the trait. Scalability AReactor (or ReplyReactor) maintains fewer instance v a r i a b l e s than aReplyReactor (or Actor, respectively.) This means that an application scales to a larger number of Reactors than Actors, say. Efficiency The communication primitives provided by a trait are more efficient than those provided by its supertrait (if any). F o r e x a m p l e , message sends and reacts are faster between Reactors than Actors. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 11.1 Chapter 11 · API Overview 120 The Reactor trait Reactor is the super trait of all actor traits. It has a type parameter Msg which indicates the type of messages that the actor can receive. Extending the Reactor trait allows defining actors with basic capabilities to send and receive messages. The behavior of a Reactor is defined by implementing its act method. The act method is e x e c u t e d once the Reactor is started by invoking start, which also returns the Reactor. The start method is idempotent which means that invoking it on an actor that has already been started has no effect. Invoking the Reactor’ s ! method sends a message to the receiver. Send- ing a message using ! is asynchronous which means that the sending actor does not wait until the message is received; its e x e c u t i o n continues immedi- ately. F o r e x a m p l e , a ! msg sends msg to a. All actors have a mailbox which b u f f e r s incoming messages until they are processed. The Reactor trait also defines a forward method. This method is inher- ited from OutputChannel. It has the same effect as the ! method. Subtraits of Reactor, in particular the ReplyReactor trait, o v e r r i d e this method to enable implicit reply destinations (see below). A Reactor receives messages using the react method.1 react e x - pects an argument of type PartialFunction[Msg, Unit] which defines how messages of type Msg are handled once they arrive in the actor’s mail- box. In the following e x a m p l e , the current actor waits to receive the string "Hello", and then prints a greeting: react { case "Hello" => println("Hi there") } Invoking react never returns. Therefore, any code that should run after a message has been received must be contained inside the partial function that is passed to react. F o r e x a m p l e , two messages can be received in sequence by nesting two invocations of react: react { case Get(from) => 1By default, this method does not show up in the ScalaDoc API documentation, because its visibility is protected[actors] . This can be changed by selecting visibility "All" on the ScalaDoc page. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 11.1 Chapter 11 · API Overview 121 react { case Put(x) => from ! x } } The Reactor trait also provides control structures (see 11.2) which simplify programming with react. Termination and e x e c u t i o n states The e x e c u t i o n of aReactor terminates when the body of its act method has run to completion. AReactor can also terminate itself e x p l i c i t l y using the exit method. The return type of exit is Nothing, because exit always throws an e x c e p t i o n . This e x c e p t i o n is only used internally, and should never be caught. A terminated Reactor can be restarted by invoking its restart method. Invoking restart on a Reactor that has not terminated, yet, throws an IllegaaalStateException. Restarting a terminated actor causes its act method to be rerun. Re ctor defines a method getState which returns the actor’s cur- rent e x e c u t i o n state as a member of the Actor.Stattteenumeration. An actor that has not been started, yet, is in state Ac or.State.New. An actor that can run without waiting for a message is in state AAAccctttooorrr...SSStttaaattteee . Runnable. An actor that is suspended, waiting for a mes- sage is in state Actor.State.Suspended. A terminated actor is in state Terminated. Exception handling The exceptionHandler member allows defining an e x c e p t i o n handler that is enabled throughout the entire lifetime of a Reactor: def exceptionHandler: PartialFunction[Exception, Unit] The returned partial function is used to handle e x c e p t i o n s that are not oth- erwise handled: whenever an e x c e p t i o n propagates out of the body of a Reactor’ s act method, the partial function is applied to that e x c e p t i o n , al- lowing the actor to run clean-up code before it terminates.2 2Note that the visibility of exceptionHandler is protected. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 11.1 Chapter 11 · API Overview 122 Handling e x c e p t i o n s using exceptionHandler works well together with the control structures for programming with react (see 11.2). When- e v e r an e x c e p t i o n has been handled using the partial function returned by exceptionHandler, e x e c u t i o n continues with the current continuation clo- sure. Example: loop { react { case Msg(dddata)=> if (con)// process data else throw new Exception("cannot process data") } } Assuming that the Reactor o v e r r i d e s exceptionHandler, after an e x c e p - tion thrown inside the body of react is handled, e x e c u t i o n continues with the next loop iteration. The ReplyReactor trait The ReplyReactor trait e x t e n d s Reactor[Any] and adds or o v e r r i d e s the following methods: • The ! method is o v e r r i d d e n to obtain a reference to the current actor (the sender); together with the actual message, the sender reference is transferred to the mailbox of the receiving actor. The receiver has access to the sender of a message via the sender method (see below). • The forward method is o v e r r i d d e n to obtain a reference to the sender of the message that is currently being processed. T o g e t h e r with the actual message, this reference is transferred as the sender of the current message. As a consequence, forward allows forwarding messages on behalf of actors different from the current actor. • The added sender method returns the sender of the message that is currently being processed. Given the fact that a message might have been forwarded, sender may not return the actor that actually sent the message. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 11.1 Chapter 11 · API Overview 123 • The added reply method sends a message back to the sender of the last message. reply is also used to reply to a synchronous message send or a message send with future (see below). • The added !? methods provide synchronous message sends. Invok- ing !? causes the sending actor to wait until a response is received which is then returned. There are two o v e r l o a d e d v a r i a n t s . The two- parameter v a r i a n t takes in addition a timeout argument (in millisec- onds), and its return type is Option[Any] instead of Any. If the sender does not receive a response within the specified timeout period, !? re- turns None, otherwise it returns the response wrapped in Some. • The added !! methods are similar to synchronous message sends in that they allow transferring a response from the receiver. How- e v e r , instead of blocking the sending actor until a response is re- ceived, they return Future instances. A Future can be used to re- trieve the response of the receiver once it is a v a i l a b l e ; it can also be used to find out whether the response is already a v a i l a b l e with- out blocking the sender. There are two o v e r l o a d e d v a r i a n t s of the !! method. The two-parameter v a r i a n t takes in addition an argument of type PartialFunction[Any, A]. This partial function is used for post-processing the receiver’s response. Essentially, !! returns a fu- ture which applies the partial function to the response once it is re- ceived. The result of the future is the result of this post-processing step. • The added reactWithin method allows receiving messages within a given period of time. Compared to react it takes an additional pa- rameter msec which indicates the time period in milliseconds until the special TIMEOUT pattern matches (TIMEOUT is a case object in the scaaala.actorspackage). Example: re ctWithin(2000){ cccaaassseeeAnswer(text) => // process text TIMEOUT => println("no answer within 2 seconds") } The reactWithin method also allows non-blocking access to the mailbox. When specifying a time period of 0 milliseconds, the mailbox is first scanned Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 11.1 Chapter 11 · API Overview 124 to find a matching message. If there is no matching message after the first scan, the TIMEOUT pattern matches. F o r e x a m p l e , this enables receiving cer- tain messages with a higher priority than others: reactWithin(0){ case HighPriorityMsg => // ... case TIMEOUT => react { case LowPriorityMsg => // ... } } In the above e x a m p l e , the actor first processes the next HighPriorityMsg, e v e n if there is a LowPriorrriiityMsgthat arrived earlier in its mailbox. The ac- tor only processes a LowP orityMsg first if there is no HighPriorityMsg in its mailbox. The ReplyReactor trait adds the Actor.State.TimedSuspended e x - ecution state. A suspended actor, waiting to receive a message using reactWithin is in state Actor.State.TimedSuspended. The Actor trait The Actor trait e x t e n d s ReplyReactor and adds the following members: • The receive method behaves like react e x c e p t that it may return a result. This is reflected in its type, which is polymorphic in its result type: def receive[R](f: PartialFunction[Any, R]): R However, using receive makes the actor more heavyweight, since receive blocks the underlying thread while the actor is suspended waiting for a message. The blocked thread is unavailable to e x e c u t e other actors until the invocation of receive returns. • The link and unlink methods allow an actor to link and unlink itself to and from another actor, respectively. Linking can be used for mon- itoring and reacting to the termination of another actor. In particular, linking affects the behavior of invoking exit as e x p l a i n e d in the API documentation of the Actor trait. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 11.2 Chapter 11 · API Overview 125 • The trapExit member allows reacting to the termination of linked ac- tors independently of the e x i t reason (that is, it does not matter whether the e x i t reason is 'normal or not). If an actor’s trapExit member is set to true, this actor will never terminate because of linked actors. Instead, whenever one of its linked actors terminates it will receive a message of type Exit. The Exit case class has two members: from refers to the actor that terminated; reason refers to the e x i t reason. Termination and e x e c u t i o n states When terminating the e x e c u t i o n of an Actor instance, the e x i t reason can be set e x p l i c i t l y by invoking the following v a r i a n t of exit: def exit(reason: AnyRef): Nothing An actor that terminates with an e x i t reason different from the symbol 'normal propagates its e x i t reason to all actors linked to it. If an actor ter- minates because of an uncaught e x c e p t i o n , its e x i t reason is an instance of the UncaughtException case class. The Actor trait adds two new e x e c u t i o n states. An actor waiting to receive a message using receive is in state Actor.State.Blocked. An actor waiting to receive a message using receiveWithin is in state Actor.State.TimedBlocked. 11.2 Control structures The Reactor trait defines control structures that simplify programming with the non-returning react operation. Normally, an invocation of react does not return. If the actor should e x e c u t e code subsequently, then one can either pass the actor’s continuation code e x p l i c i t l y to react, or one can use one of the following control structures which hide these continuations. The most basic control structure is andThen. It allows registering a clo- sure that is e x e c u t e d once the actor has finished e x e c u t i n g e v e r y t h i n g else. F o r e x a m p l e , the actor shown in Listing 11.1 prints a greeting after it has processed the "hello" message. Even though the invocation of react does not return, we can use andThen to register the code which prints the greeting as the actor’s continuation. Note that there is a type ascription that follows the react invocation (: Unit). Basically, it let’s you treat the result of react as having type Unit, Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 11.2 Chapter 11 · API Overview 126 actor { { react { case "hello" => // processing "hello" }: Unit andThe { printlnnn("hi there") }}} } Listing 11.1 · Using andThen for sequencing. which is legal, since the result of an e x p r e s s i o n can always be dropped. This is necessary to do here, since aaandThencannot be a member of type Nothing which is the result type of re ct. T r e a t i n g the result type of react as Unit allows the application of an implicit conversion which makes the andThen member a v a i l a b l e . The API provides a few more control structures: • loop {... }. Loops indefinitely, e x e c u t i n g the code in braces in each iteration. Invoking react inside the loop body causes the actor to react to a message as usual. Subsequently, e x e c u t i o n continues with the next iteration of the same loop. • loopWhile (c) {... }. Executes the code in braces while the con- dition c returns true. Invoking react in the loop body has the same effect as in the case of loop. • continue. Continues with the e x e c u t i o n of the current continuation closure. Invoking continue inside the body of a loop or loopWhile will cause the actor to finish the current iteration and continue with the next iteration. If the current continuation has been registered using andThen, e x e c u t i o n continues with the closure passed as the second argument to andThen. The control structures can be used anywhere in the body of a Reactors act method and in the bodies of methods (transitively) called by act. F o r actors created using the accctttooorrr{... } shorthand the control structures can be imported from the A object. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 11.3 Chapter 11 · API Overview 127 11.3 Futures The ReplyReactor and Actor traits support result-bearing message send operations (the !! methods) that immediately return a future.A future, that is, an instance of the Future trait, is a handle that can be used to retrieve the response to such a message send-with-future. The sender of a message send-with-future can wait for the future’s re- sponse by applying the future. F o r e x a m p l e , sending a message using val fut = a !! msssgallows the sender to wait for the result of the future as follows: val re = fut(). In addition, aFuture can be queried to find out whether its result is a v a i l a b l e without blocking using the isSet method. A message send-with-future is not the only way to obtain a future. Fu- tures can also be created from computations using the future method. In the following e x a m p l e , the computation body is started to run concurrently, returning a future for its result: val fut = future { body } // ... fut() // wait for future What makes futures special in the context of actors is the possibility to retrieve their result using the standard actor-based receive operations, such as receive etc. Moreover, it is possible to use the event-based operations react and reactWithin. This enables an actor to wait for the result of a future without blocking its underlying thread. The actor-based receive operations are made a v a i l a b l e through the future’s innnputChannel. F o r a future of type Future[T], its type is InputCha nel[T]. Example: val fut = a !! msg // ... fut.inputChannel.react { case Response => // ... } Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 11.4 Chapter 11 · API Overview 128 11.4 Channels Channels can be used to simplify the handling of messages that have different types b u t that are sent to the same actor. The hierarchy of channels is divided into OutputChannels and InputChannels. OutputChannels can be sent messages. An OutputChannel out sup- ports the following operations: • out ! msg. Asynchronously sends msg to out. A reference to the sending actor is transferred as in the case where msg is sent directly to an actor. • out forward msg. Asynchronously forwards msg to out. The sending actor is determined as in the case where msg is forwarded directly to an actor. • out.receiver. Returns the unique actor that is receiving messages sent to the out channel. • out.send(msg, from). Asynchronously sends msg to out supplying from as the sender of the message. Note that the OutputChannel trait has a type parameter that specifies the type of messages that can be sent to the channel (using !, forward, and send). The type parameter is contravariant: trait OutputChannel[-Msg]. Actors can receive messages from InputChannels. Like OutputChannel, the InputChannel trait has a type parameter that specifies the type of messages that can be received from the channel. The type parameter is covariant: trait InputChannel[+Msg]. An InputChannel[Msg] in supports the following operations: • in.receeeiveee{case Pat1 => ... ; case Patn => ... } (and sim- ilarly, in.r ceiveWithin). Receives a message from in. Invoking rrreeecc eeeiiivvvon an input channel has the same semantics as the standard e operation for actors. The only difference is that the partial function passed as an argument has type PartialFunction[Msg, R] where R is the return type of receive. • in.react { case Pat1 => ... ; case Patn => ... } (and sim- ilarly, in.reactWithin). Receives a message from in using the Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 11.4 Chapter 11 · API Overview 129 event-based react operation. Like react for actors, the return type is Nothing, indicating that invocations of this method never return. Like the receive operation above, the partial function passed as an argument has a more specific type: PartialFunction[Msg, Unit]. Creating and sharing channels Channels are created using the concrete Channel class. It e x t e n d s both InputChannel and OutputChannel.A channel can be shared either by making the channel visible in the scopes of multiple actors, or by sending it in a message. actor { vvvaaarouttt:OutputChannel[String] = null l child = actor { reac { case "go" => out !"hello" } } val channel = new Channel[String] out = channeeel ccchhhild!"go" annel.rec ive { case msg => println(msg.length) } } Listing 11.2 · Scope-based sharing of channels. The e x a m p l e in Listing 11.2 demonstrates scope-based sharing. Running this e x a m p l e prints the string "5" to the console. Note that the child actor has only access to out which is an OutputChannel[String]. The channel reference, which can also be used to receive messages, is hidden. However, care must be taken to ensure the output channel is initialized to a concrete channel before the child sends messages to it. This is done using the "go" message. When receiving from channel using channel.receive we can make use of the fact that msg is of type String; therefore, it provides a length member. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 11.5 Chapter 11 · API Overview 130 case class ReplyTo(out: OutputChannel[String]) val child = actor { react { case ReplyTo(out) => out !"hello" } } actor { val channeeel= new Channel[String] ccchhhild! ReplyTo(channnnel) annel.r ceive { case msg => printl (msg.length) } } Listing 11.3 · Sharing channels via messages. An alternative way to share channels is by sending them in messages. The e x a m p l e in The e x a m p l e in Listing 11.3 demonstrates this. The ReplyTo case class is a message type that we use to distribute a reference to an OutputChannel[String]. When the child actor receives a ReplyTo mes- sage it sends a string to its output channel. The second actor receives a message on that channel as before. 11.5 Remote Actors This section describes the remote actors API. Its main interface is the RemoteActor object in package scala.actors.remote. This object pro- vides methods to create and connect to remote actor instances. In the code snippets shown below we assume that all members of RemoteActor have been imported; the full list of imports that we use is as follows: iiimmmpppooorrrtttssscccaaalllaaa...aaaccctttooorrrsss..._ Actooor._ rrreeemmmttteee..._ import scala.actors. RemoteActor._ Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 11.5 Chapter 11 · API Overview 131 Starting r e m o t e actors A remote actor is uniquely identified by a Symbol. This symbol is unique to the JVM instance on which the remote actor is e x e c u t e d . A remote actor identified with name 'myActor can be created as follows. class MyActor extends Actor { def act(((){ alive 9000) register('myActor, self) // ... } } Note that a name can only be registered with a single (alive) actor at a time. F o r e x a m p l e , to register an actor A as 'myActor, and then register another actor B as 'myActor, one would first have to wait until A termi- nated. This requirement applies across all ports, so simply registering B on a different port as A is not sufficient. Connecting to r e m o t e actors Connecting to a remote actor is just as simple. T o obtain a remote reference to a remote actor running on machine myMachine, on port 8000, with name 'anActor, use select in the following manner: val myRemoteActor = select(Node("myMachine", 8000), 'anActor) The actor returned from select has type AbstractActor which pro- vides essentially the same interface as a regular actor, and thus supports the usual message send operations: myReeemoteActooor!"Hello!" rec ive { case resp nse => println("Responnnse:" + response) } myRemoteActor !? "What is theeemea ing of life?" match { cccaaassseee42 ===>>>ppprrriiinnntttlllnnn((("""Succss") oops Fail d: " + oops) Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Section 11.5 Chapter 11 · API Overview 132 } val future = myRemoteActor !! "What is the last digit of PI?" Note that select is lazy; it does not actually initiate any network con- nections. It simply creates a new AbstractActor instance which is ready to initiate a new network connection when needed (for instance, when is invoked). Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Bibliography [Agh90] Agha, Gul. “Concurrent Object-Oriented Programming.” Commu- nications of the ACM, 33(9):125–141, September 1990. [Arm95] Armstrong, J. L., M. C. W i l l i a m s , C. W i k s t r ö m , and S. R. V i r d i n g . Concurrent Programming in Erlang. Prentice Hall, second edition, 1995. [Dea08] Dean, Jeffrey and Sanjay Ghemawat. “MapReduce: simplified data processing on large clusters.” CACM, 51(1):107–113, 2008. [Goe06] Goetz, Brian, T i m Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes, and Doug Lea. J a v a Concurrency in Practice. Addison W e s l e y , 2006. ISBN 978-0321349606. [Gro99] Gropp, W i l l i a m , Ewing Lusk, and Anthony Skjellum. Using MPI: P o r t a b l e P a r a l l e l Programming with the Message–Passing Inter- face. The MIT Press, Cambridge, MA, second edition, 1999. [Hal09] Haller, Philipp and Martin Odersky. “Scala Actors: Unifying thread-based and event-based programming.” Theor. Comput. Sci, 410(2-3):202–220, 2009. [Hal10] Haller, Philipp and Martin Odersky. “Capabilities for Uniqueness and Borrowing.” In Proceedings of the 24th European Conference on Object-Oriented Programming (ECOOP’10), pages 354–378. Springer, June 2010. ISBN 978-3-642-14106-5. [HB77] Henry Baker, Carl Hewitt. “Laws for Communicating P a r a l l e l Pro- cesses.” T e c h n i c a l report, MIT Artificial Intelligence Laboratory, http://hdl.handle.net/1721.1/41962, 1977. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Bibliography 134 [Hew73] Hewitt, Carl, Peter Bishop, and Richard Steiger. “A Universal Modular ACTOR F o r m a l i s m for Artificial Intelligence.” In Pro- ceedings of the Third International J o i n t Conference on Artificial Intelligence (IJCAI’73), pages 235–245. 1973. [Hew77] Hewitt, Carl E. “Viewing Control Structures as P a t t e r n s of P a s s i n g Messages.” J o u r n a l of Artificial Intelligence, 8(3):323–364, 1977. [Hoa78] Hoare, C. A. R. “Communicating sequential processes.” Comm.ACM, 21(8):666–677, 1978. [Kay98] Kay, Alan. an email on messaging in Smalltalk/Squeak, 1998. The email is published on the web at http://lists.squeakfoundation.org/pipermail/squeak-dev/1998- October/017019.html. [Sut05] Sutter, Herb. “The free lunch is o v e r : A fundamental turn toward concurrency.” Dr. Dobb’s J o u r n a l , March 2005. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index About the Authors Philipp Haller Philipp Haller is a Ph.D. candidate in the School of Computer and Commu- nication Sciences at EPFL (Switzerland), working with Martin Odersky on the Scala programming language, libraries, and tools. He received his Dipl.- Inform. degree (with distinction) in 2006 from Karlsruhe Institute of T e c h - nology (Germany). Among his research interests are programming abstrac- tions for concurrency, as well as type systems to check their safety. Philipp created Scala Actors, a library for efficient, high-level concurrent program- ming. Frank Sommers Frank Sommers is an editor at Artima, and president of Autospaces, Inc., a company specializing in automotive finance software. After almost 15 years of working with Java, Frank started programming in Scala a few years ago, and became an instant fan of the language. Frank is an active writer in the area of information technology and computing. His main interests are par- allel and distributed computing, data management, programming languages, cluster and cloud computing, open-source software, and online user commu- nities. Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Index P a g e numbers followed by an n r e f e r to footnotes. Symbols ! (asynchronous message send) follows tradition of Erlang actors, 45 on trait AAAccctttooorrr, 45–46 !!!!(futures message send) on trait , 48 ? (synchronous message send) on trait Actor, 47 on trait ReplyReactor, 123 “MapReduce: simplified data processing on large clusters” (Dean and Ghemawat), 101n A Actor trait, 119 e x e c u t i o n state Blocked, 125 e x e c u t i o n state TimedBlocked, 125 actor-based programming defining message classes, 42 messages altering internal state, 43 subsequent behavior, 43 with Scala, 14 actors definition of, 28 DSL, 41 event-based v e r s u s thread-based, 51–52 model, 14 v e r s u s threads, 16–18 aaaccctttooorrrsss.. corePoooolSize, 55 maxPo lSize, 55 AJAX, 36 an email on messaging in Smalltalk/Squeak (Kay), 31n B Baker, Henry, 33 by-name parameters, 78 C “Capabilities for Uniqueness and Borrowing” (Haller and Odersky), 17n chat application key abstraction, 42 ChatRoom main responsibilities, 42 closures, 58, 59, 83 combinators andThen, 58 control-flow, 58 custom, 63 loopWhile, 58 “Communicating sequential processes” (Hoare), 16n, 18n 136 Index 137 “Concurrent Object-Oriented Programming” (Agha), 29n Concurrent Programming in Erlang (Armstrong et al.), 16n continuations, 28–30, 32 D daemon-style actors, 84–85 denial-of-service, 37 design tips and techniques, 38 deterministic actor e x e c u t i o n , 85–92 E e v e n t s activation, 34 actor creation, 34 arrival, 34 initial, 34 e x c e p t i o n handling, 64–67 futures, 76, 78 Exxxii tttclass on trait Actor, 125 e method on trait Reactor, 121 F fork-join parallelism, 32 forward method on trait RRReeeactor, 120 on trait plyReactor, 122 “The free lunch is o v e r : A fundamental turn toward concurrency, The” (Sutter), 15n futures concept of, 48 event-based, 59–63 e x c e p t i o n handling, 76–78 G getState method, 121 H Hewitt, Carl, 14, 33 I interfacing with e v e n t dispatch threads, 84 isSet method on trait Future, 127 J J a v a Concurrency in Practice (Goetz et al.), 16n K Kay, Alan, 31 L late binding, 30 “Laws for Communicating P a r a l l e l Processes” (Hewitt and Baker), 33n lifecycle, 39 link method on trait Actor, 124 linking actors, 69–73 remote actors, 97 M maintaining thread-bound properties event-based actors, 80–83 managed blocking, 88–93 MapReduce, 21–22 “MapReduce: Simplified data processing on large clusters” (Dean and Ghemawat), 21n message delays, 35 message processing, 41–49 defining act method, 43 invoking start, 43 obtaining next a v a i l a b l e message, 44 monitoring, 67–78 Moore’s Law Cover · Overview · Contents · Discuss · Suggest · Glossary · Index Index 138 applied to computing performance, 15 P principles of locality, 27 “send it and forget it.”, 36 R race conditions a v o i d e d by design, 17 react invoking event-based, 56 nested, 56 recursive, 57 sequential, 56 rrreeeaaaccctttand receive differences, 52–63 method, 120–122 Reactor trait, 119–122 andThen control structures, 125 continue control structures, 126 control structures, 126 llloooooopppcontrol structures, 126 While control structures, 126 rrreeeactWithinmethod on trait ReplyReactor, 123 ccceeeiiivvveeemethod Actor, 124 rrreee Within method, 49 gister method on classRemoteActor, 94 remote actors, 94–97 linking actors, 97 reeeppplllyyymethod on trait ReplyReactor, 123 R Reactor trait, 119, 122–124 restart method, 121 S “Scala Actors: Unifying thread-based and event-based programming” (Haller and Odersky), 14n scaling with concurrency, 15 SchedulerAdapter, 83–84 schedulers customizing, 79–88 ssseeelectmethod on classRemoteActor, 96 nnndermethod on trait ReplyReactor, 122 Si gleThreadedScheduler, 88 T termination, 68–69 propagating e x i t reason, 125 thread-per-actor approach, 51 time, 35 TimedSuspended state on trait ReplyReactor, 124 trapExit on trait Actor, 125 U unbounded undeterminism, 23 “Universal Modular ACTOR F o r m a l i s m for Artificial Intelligence, A”(Hewitt et al.), 14n unlink method on trait Actor, 124 Using MPI: P o r t a b l e P a r a l l e l Programming with the Message–Passing Interface (Gropp et al.), 16n Cover · Overview · Contents · Discuss · Suggest · Glossary · Index

下载文档,方便阅读与编辑

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 15 金币 [ 分享文档获得金币 ] 1 人已下载

下载文档

相关文档