字数:0 关键词: 分布式/云计算/大数据

Microsoft® Big Data Solutions Published by John Wiley & Sons, Inc. 10475 Crosspoint Boulevard Indianapolis, IN 46256 www.wiley.com Copyright © 2014 by John Wiley & Sons, Inc., Indianapolis, Indiana Published simultaneously in Canada ISBN: 978-1-118-72908-3 ISBN: 978-1-118-74209-9 (ebk) ISBN: 978-1-118-72955-7 (ebk) Manufactured in the United States of America 10 9 8 7 6 5 4 3 2 1 No part of this publication may be reproduced, stored in a retrieval system or transmitted in any form or by any means, electronic, mechanical, photocopying, recording, scanning or otherwise, except as permitted under Sections 107 or 108 of the 1976 United States Copyright Act, without either the prior written permission of the Publisher, or autho- rization through payment of the appropriate per-copy fee to the Copyright Clearance Center, 222 Rosewood Drive, Danvers, MA 01923, (978) 750-8400, fax (978) 646-8600. Requests to the Publisher for permission should be addressed to the Permissions Department, John Wiley & Sons, Inc., 111 River Street, Hoboken, NJ 07030, (201) 748-6011, fax (201) 748-6008, or online at http://www.wiley.com/go/permissions. Limit of Liability/Disclaimer of Warranty: The publisher and the author make no representations or warranties with respect to the accuracy or completeness of the contents of this work and specifi cally disclaim all warranties, including without limitation warranties of fi tness for a particular purpose. No warranty may be created or extended by sales or promotional materials. The advice and strategies contained herein may not be suitable for every situation. This work is sold with the understanding that the publisher is not engaged in rendering legal, accounting, or other professional services. If professional assistance is required, the services of a competent professional person should be sought. Neither the publisher nor the author shall be liable for damages arising herefrom. The fact that an organization or Web site is referred to in this work as a citation and/or a potential source of further information does not mean that the author or the publisher endorses the information the organization or website may provide or recommendations it may make. Further, readers should be aware that Internet websites listed in this work may have changed or disap- peared between when this work was written and when it is read. For general information on our other products and services please contact our Customer Care Department within the United States at (877) 762-2974, outside the United States at (317) 572-3993 or fax (317) 572-4002. Wiley publishes in a variety of print and electronic formats and by print-on-demand. Some material included with standard print versions of this book may not be included in e-books or in print-on-demand. If this book refers to media such as a CD or DVD that is not included in the version you purchased, you may download this material at http://booksupport.wiley.com. For more information about Wiley products, visit www.wiley.com. Library of Congress Control Number: 2013958290 Trademarks: Wiley and the Wiley logo are trademarks or registered trademarks of John Wiley & Sons, Inc. and/or its affi liates, in the United States and other countries, and may not be used without written permission. Microsoft is a registered trademark of Microsoft Corporation. All other trademarks are the property of their respective owners. John Wiley & Sons, Inc. is not associated with any product or vendor mentioned in this book. Executive Editor Robert Elliot Project Editor Jennifer Lynn Technical Editors Rohit Bakhshi John Hoang Josh Luedeman Production Editor Christine Mugnolo Copy Editor Keith Cline Editorial Manager Mary Beth Wakefi eld Freelancer Editorial Manager Rosemarie Graham Associate Director of Marketing David Mayhew Marketing Manager Ashley Zurcher Business Manager Amy Knies Vice President and Executive Group Publisher Richard Swadley Associate Publisher Jim Minatel Project Coordinator, Cover Todd Klemme Proofreader Sarah Kaikini, Word One New York Indexer Robert Swanson Cover Image ©traffi c_analyzer/iStockphoto.com Cover Designer Ryan Sneed/Wiley I am honored to dedicate this book to my author team who pulled together and created a wonderful project for the community they love as I do. — Adam Jorgensen For my beautiful and eternally patient wife, Jane, and our three children Lucy, Kate, and Oliver. I will love you all forever. — James Rowland-Jones To my lovely wife, Marlana, and my children, Kayla and Michael, thanks for the support and understanding during the late nights while I was writing. — John Welch To my family, thank you for your unconditional support throughout this process. I’d especially like to thank my wife Shannon for believing in me. — Brian Mitchell at PASS conferences, the Microsoft Business Intelligence conference, Software Development West (SD West), Software Management Conference (ASM/SM), and others. He has also contributed to multiple books on SQL Server, including Smart Business Intelligence Solutions with Microsoft SQL Server 2008 (Microsoft Press, 2009) and the SQL Server MVP Deep Dives (Manning Publications) series. John writes a blog on BI and SQL Server Information Services (SSIS) topics at http://agilebi.com/jwelch. He is active in open source projects that help ease the development process for Microsoft BI developers, including ssisUnit (http://ssisunit.codeplex.com), a unit testing framework for SSIS. Dan Clark is a senior BI consultant for Pragmatic Works. He enjoys learning new BI technologies and training others how to best implement the technology. Dan is particularly interested in how to use data to drive better decision making. Dan has published several books and numerous articles on .NET programming and BI development. He is a regular speaker at various developer/BI conferences and user group meetings, and enjoys interacting with the Microsoft developer and database communities. Chris Price is a senior consultant with Microsoft based out of Tampa, Florida. He has a Bachelor of Science degree in management information systems and a Master of Business Administration degree, both from the University of South Florida. He began his career as a developer, programming with everything from Visual Basic and Java to both VB.Net and C# as he worked his way into a software architect role before being bitten by the BI bug. Although he is still passionate about software development, his current focus is on ETL (extract, transform, and load), Data integration, data quality, MDM (master data manage- ment), SSAS (SQL Server Analysis Server), SharePoint, and all things big data. He regularly speaks at SQL Saturdays, PASS Summit, conferences, code camps, and other community events. He blogs frequently and has also authored mul- tiple books and whitepapers and has served as technical editor for a range of BI and big data topics. You can follow Chris on his blog at http://bluewatersql .wordpress.com/ or on Twitter at @BluewaterSQL. Brian Mitchell is the lead architect of the Microsoft Big Data Center of Expertise. Brian focuses exclusively on data warehouse/business intelligence (DW/BI) solutions, with the majority of his time focusing on SQL Server Parallel Data Warehouse (PDW) and HDInsight. He has spent more than 15 years work- ing with Microsoft SQL Server and Microsoft Business Intelligence. Brian is a Microsoft Certifi ed Master–SQL Server 2008. You can fi nd his blog on topics such as Big Data, SQL Server Parallel Data Warehouse, and Microsoft Business Intelligence at http://brianwmitchell.com. Brian earned his Master of Business Administration degree from the University of Florida. When he is not tinkering with SQL Server or Hadoop, Brian enjoys spending time exploring his adopted home state of Florida with his wife, Shannon, and their kids. vi About the Authors Josh Luedeman has been working with SQL Server for more than eight years. He is currently a solutions architect with Data Structures, Inc., where he is working with customers to help them utilize business intelligence (BI) tools and big data. He has worked in IT for more than 10 years, holding positions in application support, database administration, and BI. In these industries, Josh has held integral roles in Fortune 500 companies, major institutions of higher education, small-medium businesses, and startups. Josh is a speaker at software development and data conferences including Code On The Beach and multiple SQL Saturdays. He is originally from Corning, New York, and currently resides in Orlando, Florida, with his wife and children. Josh can be found online at www .joshluedeman.com, josh@joshluedeman.com, www.linkedin.com/in /joshluedeman, and @joshluedeman on Twitter. Michael Reed has a long history of designing innovative solutions to diffi cult business problems. During the last 14 years, he focused on database development and architecture, and more recently business intelligence and analytics. He is currently employed by Pragmatic Works as a Senor BI Consultant. Previously he was director of Insight and Analytics at a healthcare claim processor. Prior to that he held operations, data, and information delivery centric roles in Microsoft’s Online Services Division; specifi cally the AdCenter Behavioral Targeting group, which is the primary research unit for mining social behaviors at Microsoft supporting the Bing decision search engine and BingAds advertising services. In a prior life, he was co-owner of a multimillion dollar manufacturing busi- ness, grown from a startup, where he gained much of the business knowledge and insight he employs in his work today. viii About the Technical Editors x Contents Part II Setting Up for Big Data with Microsoft 37 Chapter 3 Confi guring Your First Big Data Environment 39 Getting Started 39 Getting the Install 40 Running the Installation 40 On-Premise Installation: Single-Node Installation 41 HDInsight Service: Installing in the Cloud 51 Windows Azure Storage Explorer Options 52 Validating Your New Cluster 55 Logging into HDInsight Service 55 Verify HDP Functionality in the Logs 57 Common Post-Setup Tasks 58 Loading Your First Files 58 Verifying Hive and Pig 60 Summary 63 Part III Storing and Managing Big Data 65 Chapter 4 HDFS, Hive, HBase, and HCatalog 67 Exploring the Hadoop Distributed File System 68 Explaining the HDFS Architecture 69 Interacting with HDFS 72 Exploring Hive: The Hadoop Data Warehouse Platform 75 Designing, Building, and Loading Tables 76 Querying Data 77 Confi guring the Hive ODBC Driver 77 Exploring HCatalog: HDFS Table and Metadata Management 78 Exploring HBase: An HDFS Column-Oriented Database 80 Columnar Databases 81 Defi ning and Populating an HBase Table 82 Using Query Operations 83 Summary 84 Chapter 5 Storing and Managing Data in HDFS 85 Understanding the Fundamentals of HDFS 86 HDFS Architecture 87 NameNodes and DataNodes 89 Data Replication 90 Using Common Commands to Interact with HDFS 92 Interfaces for Working with HDFS 92 File Manipulation Commands 94 Administrative Functions in HDFS 97 Moving and Organizing Data in HDFS 100 Moving Data in HDFS 100 Implementing Data Structures for Easier Management 101 Rebalancing Data 102 Summary 103 Contents xi Chapter 6 Adding Structure with Hive 105 Understanding Hive’s Purpose and Role 106 Providing Structure for Unstructured Data 107 Enabling Data Access and Transformation 114 Differentiating Hive from Traditional RDBMS Systems 115 Working with Hive 116 Creating and Querying Basic Tables 117 Creating Databases 117 Creating Tables 118 Adding and Deleting Data 121 Querying a Table 123 Using Advanced Data Structures with Hive 126 Setting Up Partitioned Tables 126 Loading Partitioned Tables 128 Using Views 129 Creating Indexes for Tables 130 Summary 131 Chapter 7 Expanding Your Capability with HBase and HCatalog 133 Using HBase 134 Creating HBase Tables 134 Loading Data into an HBase Table 136 Performing a Fast Lookup 138 Loading and Querying HBase 139 Managing Data with HCatalog 140 Working with HCatalog and Hive 140 Defi ning Data Structures 141 Creating Indexes 143 Creating Partitions 143 Integrating HCatalog with Pig and Hive 145 Using HBase or Hive as a Data Warehouse 149 Summary 150 Part IV Working with Your Big Data 151 Chapter 8 Eff ective Big Data ETL with SSIS, Pig, and Sqoop 153 Combining Big Data and SQL Server Tools for Better Solutions 154 Why Move the Data? 154 Transferring Data Between Hadoop and SQL Server 155 Working with SSIS and Hive 156 Connecting to Hive 157 Confi guring Your Packages 161 Loading Data into Hadoop 165 Getting the Best Performance from SSIS 167 Transferring Data with Sqoop 167 Copying Data from SQL Server 168 Copying Data to SQL Server 170 xii Contents Using Pig for Data Movement 171 Transforming Data with Pig 171 Using Pig and SSIS Together 174 Choosing the Right Tool 175 Use Cases for SSIS 175 Use Cases for Pig 175 Use Cases for Sqoop 176 Summary 176 Chapter 9 Data Research and Advanced Data Cleansing with Pig and Hive 177 Getting to Know Pig 178 When to Use Pig 178 Taking Advantage of Built-in Functions 179 Executing User-defi ned Functions 180 Using UDFs 182 Building Your Own UDFs for Pig 189 Using Hive 192 Data Analysis with Hive 192 Types of Hive Functions 192 Extending Hive with Map-reduce Scripts 195 Creating a Custom Map-reduce Script 198 Creating Your Own UDFs for Hive 199 Summary 201 Part V Big Data and SQL Server Together 203 Chapter 10 Data Warehouses and Hadoop Integration 205 State of the Union 206 Challenges Faced by Traditional Data Warehouse Architectures 207 Technical Constraints 207 Business Challenges 213 Hadoop’s Impact on the Data Warehouse Market 216 Keep Everything 216 Code First (Schema Later) 217 Model the Value 218 Throw Compute at the Problem 218 Introducing Parallel Data Warehouse (PDW) 220 What Is PDW? 221 Why Is PDW Important? 222 How PDW Works 224 Project Polybase 235 Polybase Architecture 235 Business Use Cases for Polybase Today 249 Speculating on the Future for Polybase 251 Summary 255 Contents xiii Chapter 11 Visualizing Big Data with Microsoft BI 257 An Ecosystem of Tools 258 Excel 258 PowerPivot 258 Power View 259 Power Map 261 Reporting Services 261 Self-service Big Data with PowerPivot 263 Setting Up the ODBC Driver 263 Loading Data 265 Updating the Model 272 Adding Measures 273 Creating Pivot Tables 274 Rapid Big Data Exploration with Power View 277 Spatial Exploration with Power Map 281 Summary 283 Chapter 12 Big Data Analytics 285 Data Science, Data Mining, and Predictive Analytics 286 Data Mining 286 Predictive Analytics 287 Introduction to Mahout 288 Building a Recommendation Engine 289 Getting Started 291 Running a User-to-user Recommendation Job 292 Running an Item-to-item Recommendation Job 295 Summary 296 Chapter 13 Big Data and the Cloud 297 Defi ning the Cloud 298 Exploring Big Data Cloud Providers 299 Amazon 299 Microsoft 300 Setting Up a Big Data Sandbox in the Cloud 300 Getting Started with Amazon EMR 301 Getting Started with HDInsight 307 Storing Your Data in the Cloud 315 Storing Data 316 Uploading Your Data 317 Exploring Big Data Storage Tools 318 Integrating Cloud Data 319 Other Cloud Data Sources 321 Summary 321 Chapter 14 Big Data in the Real World 323 Common Industry Analytics 324 Telco 324 Energy 325 xiv Contents Retail 325 Data Services 326 IT/Hosting Optimization 326 Marketing Social Sentiment 327 Operational Analytics 327 Failing Fast 328 A New Ecosystem of Technologies 328 User Audiences 330 Summary 333 Part VI Moving Your Big Data Forward 335 Chapter 15 Building and Executing Your Big Data Plan 337 Gaining Sponsor and Stakeholder Buy-In 338 Problem Defi nition 338 Scope Management 339 Stakeholder Expectations 341 Defi ning the Criteria for Success 342 Identifying Technical Challenges 342 Environmental Challenges 342 Challenges in Skillset 344 Identifying Operational Challenges 345 Planning for Setup/Confi guration 345 Planning for Ongoing Maintenance 347 Going Forward 348 The HandOff to Operations 348 After Deployment 349 Summary 350 Chapter 16 Operational Big Data Management 351 Hybrid Big Data Environments: Cloud and On-Premise Solutions Working Together 352 Ongoing Data Integration with Cloud and On-Premise Solutions 353 Integration Thoughts for Big Data 354 Backups and High Availability in Your Big Data Environment 356 High Availability 356 Disaster Recovery 358 Big Data Solution Governance 359 Creating Operational Analytics 360 System Center Operations Manager for HDP 361 Installing the Ambari SCOM Management Pack 362 Monitoring with the Ambari SCOM Management Pack 371 Summary 377 Index 379 xvi Introduction Our Team We have assembled a strong international team of authors to make sure that we can provide a sound perspective and knowledge transfer on the right topics (we’ll discuss those shortly). Those topics include: 1. Accelerated overview of Big Data, Hadoop, NoSQL, and key industry knowledge 2. Key problems people are trying to solve and how to identify them 3. Delivering big data in a Microsoft world 4. Tool and platform choice 5. Installation, confi guration, and exploration 6. Storing and managing big data 7. Working with, adding structure, and cleansing your data 8. Big data and SQL Server together 9. Analytics in the big data world 10. How this works in the cloud 11. Case studies and real world applications 12. Moving your organization forward in this new world This team includes members of Pragmatic Works, a global leader in information services, software, and training; Microsoft Research; Microsoft Consulting Services; Azure Customer Advisory Team; and some other industry fi rms making a big impact in this expanding space. All Kidding Aside Big data is coming on strong. You will have these solutions in your environ- ment within 24 months, and you should be prepared. This book is designed to help you make the transition with practical skills from a relational to a more “evolved” view of the data worlds. This includes solutions that will handle data that does not fi t nicely into a tabular structure, but is nonetheless just as or more important in some cases as the data that you have curated so carefully for so many years. You will learn some new terms as well. This will be almost as much a vocabu- lary lesson as a technical lesson. Introduction xvii Who Is This Book For? This book is for those data developers, power users, and executives looking to understand how these big data technologies will impact their world and how to properly approach solutions in this new ecosystem. Readers will need a basic understanding of data systems and a passion for learning new technologies and techniques. Some experience with developing database or application solutions will be helpful in some advanced topic areas. What You Need to Use This Book We have designed this book to make extensive use of cloud resources so, as the reader, you will need to have a newer model computer PC or Mac that can access the Internet reliably. In addition, you will want to be able to install additional pro- grams and tools as advised by the authors, so please ensure you have that access on the machine you’re using. Different chapters will have different tools or data sets, so please follow the authors’ instructions in those chapters to get the most out of your experience. Having access to a SQL Server database will be required in certain chapters, and if you wish to set up your environment on premise, then a virtualization technology such as Hyper-V, VMWare, or Virtual box is recommended. Chapter Overview Now we’ll go through the chapters in this text and discuss what you’ll be learning from each one. ■ Chapter 1: Industry Needs and Solutions No book on big data would be complete without some coverage of the history, origins, and use cases in this ecosystem. We also need to discuss the industry players and platforms that are in scope for the book. Other books spend 5 to 6 chapters rehashing this information; we have done it effi ciently for you so you can get to work on more fun topics! ■ Chapter 2: Microsoft’s Approach to Big Data Doing this in a Microsoft world is a little different that the traditional UNIX or Linux deployment. We chose this approach since we feel it makes this technology more accessible to millions of windows administrators, develop- ers and power users. Many of the folks were surveyed before this writing, we heard overwhelmingly that we needed a Windows-focused solution to help the largest population of enterprise users access this new technology. xviii Introduction ■ Chapter 3: Installing HDInsight In this chapter, you’ll get started confi guring your big data environment. ■ Chapter 4: HDFS, Hive, HBase and HCatalog These are some key data and metadata technologies. We’ll make sure you understand when to use each one and how to get the most out of them. ■ Chapter 5: Storing and Managing data in HDFS A distributed fi le system might be a new concept for most readers, so we are going to make sure we go through this core component of Hadoop and ensure you’re prepared for designing with this incredible feature. ■ Chapter 6: Adding Structure with Hive We need to go deeper into Hive because you’ll use it a lot. Let’s dive in with this chapter to make sure you understand commands and the logic behind using Hive effi ciently. ■ Chapter 7: Expanding your Capability with HBase and HCatalog Dealing with large tables and metadata requires some new tools and techniques. HBase and HCatalog will help you manage these types of challenges, and we’re going to take you through using them. Get ready to put the BIG in big data. ■ Chapter 8: Effective Big Data ETL with SSIS, Pig, and Sqoop We have to load this data, and there is no better way to do it than with our ETL expert authors. Come along while they take you through using favorite and familiar tools, along with some new ones, to load data quickly and effectively. ■ Chapter 9: Data Research and Advanced Data Cleansing with Pig and Hive Now we’ve installed, confi gured, explored, and loaded some data. Let’s get buys researching and cleansing this data with our new tools and platform. ■ Chapter 10: Data Warehouses and Hadoop Integration How do SQL Server and business intelligence fi t in with big data? Very closely. Most of the time they will work in tandem. We will show you when to use each solution and how they work together in scale-up and scale-out solutions. ■ Chapter 11: Visualizing Big Data with Microsoft BI Now that we have the analysis, how do we visualize this for our users? Do we have new tools? Do we use our familiar tools? Yes! Let’s do this together so we can understand how to combine these solutions for the best results for our users and customers. 4 Part I ■ What Is Big Data? What’s So Big About Big Data? The world has witnessed explosive, exponential growth in recent times. So, did we suddenly have a need for big data? Not exactly. Businesses have been tackling the capacity challenge for many years (much to the delight of storage hardware vendors). Therefore the big in big data isn’t purely a statement on size. Likewise, on the processing front, scale-out solutions such as high-performance computing and distributed database technology have been in place since the last millennium. There is nothing intrinsically new there either. People also often talk about unstructured data, but, really, this just refers to the format of the data. Could this be a reason we “suddenly” need big data? We know that web data, especially web log data, is born in an unstructured format and can be generated in signifi cant quantities and volume. However, is this really enough to be considered big data? In my mind, the answer is no. No one property on its own is suffi cient for a project or a solution to be considered a big data solution. It’s only when you have a cunning blend of these ingredients that you get to bake a big data cake. This is in line with the Gartner defi nition of big data, which they updated in Doug Laney’s publication, The Importance of Big Data: A Definition (Gartner, 2012): “High volume, high velocity, and/or high variety information assets that require new forms of processing to enable enhanced decision mak- ing, insight discovery and process optimization.” What we do know is that every CIO on the planet seems to want to start a big data project right now. In a world of shrinking budgets, there is this sudden desire to jump in with both feet into this world of big data and start prospecting for golden nuggets. It’s the gold rush all over again, and clearly companies feel like they might miss out if they hesitate. However, this is a picture that has been sharpening its focus for several years. In the buildup to this ubiquitous acceptance of big data, we’ve been blessed with plenty of industry terms and trends, web scale, new programming paradigms of “code fi rst,” and of course, to the total disgust of data modelers everywhere, NoSQL. Technologies such as Cassandra and MongoDB are certainly part of the broader ecosystem, but none have resonated as strongly with the market as Hadoop and big data. Why? In essence, unless you were Facebook, Google, Yahoo!, or Bing, issues like web scale really didn’t apply. It seems as though everyone is now building analytics platforms, and that, to be the king of geek chic, requires a degree in advanced statistics. The reason? Big data projects aren’t defi ned by having big data sets. They are shaped by big ideas, by big questions, and by big opportunities. Big data is not about one technology or even one platform. It’s so much more than that: It’s a mindset and a movement. Chapter 1 ■ Industry Needs and Solutions 5 Big data, therefore, is a term that underpins a raft of technologies (includ- ing the various Hadoop projects, NoSQL offerings, and even MPP Database Systems, for example) that have been created in the drive to better analyze and derive meaning from data at a dramatically lower cost and while delivering new insights and products for organizations all over the world. In times of recession, businesses look to derive greater value from the assets they have rather than invest in new assets. Big data, and in particular Hadoop, is the perfect vehicle for doing exactly that. A Brief History of Hadoop Necessity is the mother of invention, and Hadoop is no exception. Hadoop was created to meet the need of web companies to index and process the data tsu- nami courtesy of the newfangled Internetz. Hadoop’s origins owe everything to both Google and the Apache Nutch project. Without one infl uencing the other, Hadoop might have ended up a very different animal (joke intended). In this next section, we are going to see how their work contributed to making Hadoop what it is today. Google As with many pioneering efforts, Google provided signifi cant inspiration for the development that became known as Hadoop. Google published two landmark papers. The fi rst paper, published in October 2003, was titled “The Google File System,” and the second paper, “MapReduce: Simplifi ed Data Processing on Large Clusters,” published just over a year later in December 2004, provided the inspiration to Doug Cutting and his team of part-time developers for their project, Nutch. MapReduce was fi rst designed to enable Google developers to focus on the large-scale computations that they were trying to perform while abstracting away all the scaffolding code required to make the computation possible. Given the size of the data set they were working on and the duration of tasks, the developers knew that they had to have a model that was highly parallelized, was fault tolerant, and was able to balance the workload across a distributed set of machines. Of course, the Google implementation of MapReduce worked over Google File System (GFS); Hadoop Distributed File System (HDFS) was still waiting to be invented. Google has since continued to release thought-provoking, illuminating, and inspirational publications. One publication worthy of note is “BigTable: A Distributed Storage System for Structured Data.” Of course, they aren’t the only ones. LinkedIn, Facebook, and of course Yahoo! have all contributed to the big data mind share. 6 Part I ■ What Is Big Data? There are similarities here to the SIGMOD papers published by various par- ties in the relational database world, but ultimately it isn’t the same. Let’s look at an example. Twitter has open-sourced Storm—their complex event process- ing engine—which has recently been accepted into the Apache incubator pro- gram. For relational database vendors, this level of open sharing is really quite unheard of. For more details about storm head over to Apache: http://incubator .apache.org/projects/storm.html. Nutch Nutch was an open source crawler-based search engine built by a handful of part-time developers, including Doug Cutting. As previously mentioned Cutting was inspired by the Google publications and changed Nutch to take advantage of the enhanced scalability of the architecture promoted by Google. However, it wasn’t too long after this that Cutting joined Yahoo! and Hadoop was born. Nutch joined the Apache foundation in January 2005, and its fi rst release (0.7) was in August 2005. However, it was not until 0.8 was released in July 2006 that Nutch began the transition to Hadoop-based architecture. Nutch is still very much alive and is an actively contributed-to project. However, Nutch has now been split into two codebases. Version 1 is the legacy and provides the origins of Hadoop. Version 2 represents something of a re-architecture of the original implementation while still holding true to the original goals of the project. What Is Hadoop? Apache Hadoop is a top-level open source project and is governed by the Apache Software Foundation (ASF). Hadoop is not any one entity or thing. It is best thought of as a platform or an ecosystem that describes a method of distrib- uted data processing at scale using commodity hardware confi gured to run as a cluster of computing power. This architecture enables Hadoop to address and analyze vast quantities of data at signifi cantly lower cost than traditional methods commonly found in data warehousing, for example, with relational database systems. At its core, Hadoop has two primary functions: ■ Processing data (MapReduce) ■ Storing data (HDFS) With the advent of Hadoop 2.0, the next major release of Hadoop, we will see the decoupling of resource management from data processing. This adds a third primary function to this list. However, at the time of this writing, Yarn, the Apache project responsible for the resource management, is in alpha tech- nology preview modes. Chapter 1 ■ Industry Needs and Solutions 7 That said, a number of additional subprojects have been developed and added to the ecosystem that have been built on top of these two primary functions. When bundled together, these subprojects plus the core projects of MapReduce and HDFS become known as a distribution. Derivative Works and Distributions To fully understand a distribution, you must fi rst understand the role, naming, and branding of Apache Hadoop. The basic rule here is that only offi cial releases by the Apache Hadoop project may be called Apache Hadoop or Hadoop. So, what about companies that build products/solutions on top of Hadoop? This is where the term derivative works comes in. What Are Derivative Works? Any product that uses Apache Hadoop code, known as artifacts, as part of its construction is said to be a derivative work. A derivative work is not an Apache Hadoop release. It may be true that a derivative work can be described as “pow- ered by Apache Hadoop.” However, there is strict guidance on product naming to avoid confusion in the marketplace. Consequently, companies that provide distributions of Hadoop should also be considered to be derivative works. NOTE I liken the relationship between Hadoop and derivative works to the world of Xbox games development. Many Xbox games use graphics engines provided by a third party. The Unreal Engine is just such an example. What Is a Distribution? Now that you know what a derivative work is, we can look at distributions. A distribution is the packaging of Apache Hadoop projects and subprojects plus any other additional proprietary components into a single managed package. For example, Hortonworks provides a distribution of Hadoop called “Hortonworks Data Platform,” or HDP for short. This is the distribution used by Microsoft for its product, HDInsight. You may be asking yourself what is so special about that? You could certainly do this yourself. However, this would be a signifi cant undertaking. First, you’d need to download the projects you want, resolve any dependencies, and then compile all the source code. However, when you decide to go down this route, all the testing and integration of the various components is on you to manage and maintain. Bear in mind that the creators of distributions also employ the committers of the actual source and therefore can also offer support. As you might expect, distributions may lag slightly behind the Apache proj- ects in terms of releases. This is one of the deciding factors you might want to 8 Part I ■ What Is Big Data? consider when picking a distribution. Frequency of updates is a key factor, given how quickly the Hadoop ecosystem evolves. If you look at the Hortonworks distribution, known as Hortonworks Data Platform (HDP), you can see that there are a number of projects at different stages of development. The distribution brings these projects together and tests them for interoperability and stability. Once satisfi ed that the projects all hang together, the distributor (in this case, Hortonworks) creates the versioned release of the integrated software (the distribution as an installable package). The 1.3 version made a number of choices as to which versions to support. Today, though, just a few months later, the top-line Hadoop project has a release available, which is not part of HDP 1.3. This and other ecosystem changes will be consumed in the next release of the HDP distribution. To see a nice graphic of the Hortonworks distribution history, I will refer you to http://hortonworks.com/products/hdp-2/. Hadoop is a rapidly changing and evolving ecosystem and doesn’t rest on its laurels so including version history is largely futile. Hadoop Distributions Note that there are several Hadoop distributions on the market for you to choose from. Some include proprietary components; others do not. The following sec- tions briefl y cover some of the main Hadoop distributions. Hortonworks HDP Hortonworks provides a distribution of Apache Hadoop known as Hortonworks Data Platform (HDP). HDP is a 100% open source distribution. Therefore, it does not contain any proprietary code or licensing. The developers employed by Hortonworks contribute directly to the Apache projects. Hortonworks is also building a good track record for regular releases of their distribution, educational content, and community engagement. In addition, Hortonworks has established a number of strategic partnerships, which will stand them in good stead. HDP is available in three forms. The fi rst is for Hadoop 1.x, and the second is for Hadoop 2.0, which is currently in development. Hortonworks also offers HDP for Windows, which is a third distribution. HDP for Windows is the only version that runs on the Windows platform. MapR MapR is an interesting distribution for Hadoop. They have taken some radical steps to alter the core architecture of Hadoop to mitigate some of its single points of failure, such as the removal of the single master name node for an alternative 10 Part I ■ What Is Big Data? In this next section, we are going to delve a little deeper into these core Hadoop projects to build up our knowledge of the main building blocks. Once we’ve done that, we’ll be well placed to move forward with the next section, which will touch on some of the other projects in the Hadoop ecosystem. HDFS HDFS, one of the core components of Apache Hadoop, stands for Hadoop Distributed File System. There’s no exotic branding to be found here. HDFS is a Java-based, distributed, fault-tolerant fi le storage system designed for distribu- tion across a number of commodity servers. These servers have been confi gured to operate together as an HDFS cluster. By leveraging a scale-out model, HDFS ensures that it can support truly massive data volumes at a low and linear cost point. Before diving into the details of HDFS, it is worth taking a moment to discuss the fi les themselves. Files created in HDFS are made up of a number of HDFS data blocks or simply HDFS blocks. These blocks are not small. They are 64MB or more in size, which allows for larger I/O sizes and in turn greater throughput. Each block is replicated and then distributed across the machines of the HDFS cluster. HDFS is built on three core subcomponents: ■ NameNode ■ DataNode ■ Secondary NameNode Simply put, the NameNode is the “brain.” It is responsible for managing the fi le system, and therefore is responsible for allocating directories and fi les. The NameNode also manages the blocks, which are present on the DataNode. There is only one NameNode per HDFS cluster. The DataNodes are the workers, sometimes known as slaves. The DataNodes perform the bidding of the NameNode. DataNodes exist on every machine in the cluster, and they are responsible for offering up the machine’s storage to HDFS. In summary, the job of the DataNode is to manage all the I/O (that is, read and write requests). HDFS is also the point of integration for a new Microsoft technology called Polybase, which you will learn more about in Chapter 10, “Data Warehouses and Hadoop Integration.” MapReduce MapReduce is both an engine and a programming model. Users develop MapReduce programs and submit them to the MapReduce engine for process- ing. The programs created by the developers are known as jobs. Each job is a Chapter 1 ■ Industry Needs and Solutions 11 combination of Java ARchive (JAR) fi les and classes required to execute the MapReduce program. These fi les are themselves collated into a single JAR fi le known as a job fi le. Each MapReduce job can be broken down into a few key components. The fi rst phase of the job is the map. The map breaks the input up into many tiny pieces so that it can then process each piece independently and in parallel. Once complete, the results from this initial process can be collected, aggregated, and processed. This is the reduce part of the job. The MapReduce engine is used to distribute the workload across the HDFS cluster and is responsible for the execution of MapReduce jobs. The MapReduce engine accepts jobs via the JobTracker. There is one JobTracker per Hadoop cluster (the impact of which we discuss shortly). The JobTracker provides the scheduling and orchestration of the MapReduce engine; it does not actually process data itself. To execute a job, the JobTracker communicates with the HDFS NameNode to determine the location of the data to be analyzed. Once the location is known, the JobTracker then speaks to another component of the MapReduce engine called the TaskTracker. There are actually many TaskTracker nodes in the Hadoop cluster. Each node of the cluster has its own TaskTracker. Clearly then, the MapReduce engine is another master/slave architecture. TaskTrackers provide the execution engine for the MapReduce engine by spawning a separate process for every task request. Therefore, the JobTracker must identify the appropriate TaskTrackers to use by assessing which are avail- able to accept task requests and, ideally, which trackers are closest to the data. After the decision has been made, the JobTracker can submit the workload to the targeted TaskTrackers. TaskTrackers are monitored by the JobTracker. This is a bottom-up monitoring process. Each TaskTracker must “report in” via a heartbeat signal. If it fails to do so for any reason, the JobTracker assumes it has failed and reassigns the tasks accordingly. Similarly, if an error occurs during the processing of an assigned task, the TaskTracker is responsible for calling that in to the JobTracker. The decision on what to do next then lies with the JobTracker. The JobTracker keeps a record of the tasks as they complete. It maintains the status of the job, and a client application can poll it to get the latest state of the job. NOTE The JobTracker is a single point of failure for the MapReduce engine. If it goes down, all running jobs are halted, and new jobs cannot be scheduled. Important Apache Projects for Hadoop Now that we have a conceptual grasp of the core projects for Hadoop (the brain and heart if you will), we can start to fl esh out our understanding of the broader ecosystem. There are a number of projects that fall under the Hadoop umbrella. 12 Part I ■ What Is Big Data? Some will succeed, while others will wither and die. That is the very nature of open source software. The good ideas get developed, evolve, and become great—at least, that’s the theory. Some of the projects we are about to discuss are driving lots of innovation— especially for Hadoop 2.0. Hive is the most notable project in this regard. Almost all the work around the Hortonworks Stinger initiative is to empower SQL in Hadoop. Many of these changes will be driven through the Hive project. Therefore, it is important to know what Hive is and why it is getting so much attention. Hive Apache Hive is another key subproject of Hadoop. It provides data warehouse software that enables a SQL-like querying experience for the end user. The Hive query language is called Hive Query Language (HQL). (Clearly, the creators of Hive had no time for any kind of creative branding.) HQL is similar to ANSI SQL, making the crossover from one to the other relatively simple. HQL pro- vides an abstraction over MapReduce; HQL queries are translated by Hive into MapReduce jobs. Hive is therefore quite a popular starting point for end users because there is no need to learn how to program a MapReduce job to access and process data held in Hadoop. It is important to understand that Hive does not turn Hadoop into a relational database management system (RDBMS). Hive is still a batch-processing system that generates MapReduce jobs. It does not offer transactional support, a full type system, security, high concurrency, or predictable response times. Queries tend to be measured in minutes rather in than milliseconds or seconds. This is because there is a high spin-up cost for each query and, at the end of the day, no cost-based optimizer underpins the query plan like traditional SQL developers are used to. Therefore, it is important not to overstate Hive’s capabilities. Hive does offer certain features that an RDBMS might not, though. For example, Hive supports the following complex types: structs, maps (key/value pairs), and arrays. Likewise, Hive offers native operator support for regular expressions, which is an interesting addition. HQL also offers additional extensibility by allowing MapReduce developers to plug in their own custom mappers and reducers, allowing for more advanced analysis. The most recent and exciting developments for Hive have been the new Stinger initiatives. Stinger has the goal of delivering 100X performance improvement to Hive plus SQL compatibility. These two features will have a profound impact on Hadoop adoption; keep them on your radar. We’ll talk more about Stinger in Chapter 2, “Microsoft’s Approach to Big Data.” Pig Apache Pig is an openly extensible programmable platform for loading, manipu- lating, and transforming data in Hadoop using a scripting language called Pig Chapter 1 ■ Industry Needs and Solutions 13 Latin. Pig is another abstraction on top of the Hadoop core. It converts the Pig Latin script into MapReduce jobs, which can then be executed against Hadoop. Pig Latin scripts defi ne the fl ow of data through transformations and, although simple to write, can result in complex and sophisticated manipulation of data. So, even though Pig Latin is SQL-like syntactically, it is more like a SQL Server Integration Services (SSIS) Data Flow task in spirit. Pig Latin scripts can have multiple inputs, transformations, and outputs. Pig has a large number of its own built-in functions, but you can always either create your own or just “raid the piggybank” (https://cwiki.apache.org/confluence/display/PIG/PiggyBank) for community-provided functions. As previously mentioned, Pig provides its scalability by operating in a dis- tributed mode on a Hadoop cluster. However, Pig Latin programs can also be run in a local mode. This does not use a Hadoop cluster; instead, the process- ing takes place in a single local Java Virtual Machine (JVM). This is certainly advantageous for iterative development and initial prototyping. SQOOP SQOOP is a top-level Apache project. However, I like to think of Apache SQOOP as a glue project. It provides the vehicle to transfer data from the relational, tabular world of structured data stores to Apache Hadoop (and vice versa). SQOOP is extensible to allow developers to create new connectors using the SQOOP application programming interface (API). This is a core part of SQOOP’s architecture, enabling a plug-and-play framework for new connectors. SQOOP is currently going through something of a re-imagining process. As a result, there are now two versions of SQOOP. SQOOP 1 is a client application architecture that interacts directly with the Hadoop confi gurations and data- bases. SQOOP 1 also experienced a number of challenges in its development. SQOOP 2 aims to address the original design issues and starts from a server- based architecture. These are discussed in more detail later in this book. Historically, SQL Server had SQOOP connectors that were separate downloads available from Microsoft. These have now been rolled into SQOOP 1.4 and are also included into the HDInsight Service. SQL Server Parallel Data Warehouse (PDW) has an alternative technology, Polybase, which we discuss in more detail in Chapter 10, “Data Warehouses and Hadoop Integration.” HCatalog So, what is HCatalog? Simply put, HCatalog provides a tabular abstraction of the HDFS fi les stored in Hadoop. A number of tools then leverage this abstraction when working with the data. Pig, Hive, and MapReduce all use this abstraction to reduce the complexity and overhead of reading and writing data to Hadoop. 14 Part I ■ What Is Big Data? HDFS fi les can, in theory, be in any format, and the data blocks can be placed anywhere on the cluster. HCatalog provides the mechanism for mapping both the fi le formats and data locations to the tabular view of the data. Again, HCatalog is open and extensible to allow for the fact that some fi le formats may be pro- prietary. Additional coding would be required, but the fact that a fi le format in HDFS was previously unknown would not be a blocker to using HCatalog. Apache HCatalog is technically no longer a Hadoop project. It is still an important feature, but its codebase was merged with the Hive Project early in 2013. HCatalog is built on top of the Hive and leverages its command-line interface for issuing commands against the HCatalog. One way to think about HCatalog is as the master database for Hive. In that sense, HCatalog provides the catalog views and interfaces for your Hadoop “database.” HBase HBase is an interesting project because it provides NoSQL database function- ality on top of HDFS. It is also a column store, providing fast access to large quantities of data, which is often sparsely populated. HBase also offers trans- actional support to Hadoop, enabling a level of Data Modifi cation Language (DML) (that is, inserts, updates, and deletes). However, HBase does not offer a SQL interface; remember, it is part of the NoSQL family. It also does not offer a number of other RDBMS features, such as typed columns, security, enhanced data programmability features, and querying languages. HBase is designed to work with large tables, but you are unlikely to ever see a table like this in an RDBMS (not even in a SharePoint database). HBase tables can have billions of rows, which is not uncommon these days; but in conjunction with that, those rows can have an almost limitless number of columns. In that sense, there could be millions of columns. In contrast, SQL Server is limited to 1,024 columns. Architecturally, HBase belongs to the master/slave collection of distributed Hadoop implementations. It is also heavily reliant on Zookeeper (an Apache project we discuss shortly). Flume Flume is the StreamInsight of the Hadoop ecosystem. As you would expect, it is a distributed system that collects, aggregates, and shifts large volumes of event streaming data into HDFS. Flume is also fault tolerant and can be tuned for failover and recovery. However, in general terms, faster recovery tends to mean trading some performance; so, as with most things, a balance needs to be found. Chapter 1 ■ Industry Needs and Solutions 15 The Flume architecture consists of the following components: ■ Client ■ Source ■ Channel ■ Sink ■ Destination Events fl ow from the client to the source. The source is the fi rst Flume compo- nent. The source inspects the event and then farms it out to one or more channels for processing. Each channel is consumed by a sink. In Hadoop parlance, the event is “drained” by the sink. The channel provides the separation between source and sink and is also responsible for managing recovery by persisting events to the fi le system if required. Once an event is drained, it is the sink’s responsibility to then deliver the event to the destination. There are a number of different sinks available, including an HDFS sink. For the Integration Services users out there familiar with the term backpressure, you can think of the channel as the component that handles backpressure. If the source is receiving events faster than they can be drained, it is the channel’s responsibility to grow and manage that accumulation of events. A single pass through a source, channel, and sink is known as a hop. The components for a hop exist in a single JVM called an agent. However, Flume does not restrict the developer to a single hop. Complex multihop fl ows are perfectly possible with Flume. This includes creating fan-out and fan-in fl ows; failover routes for failed hops; and conditional, contextual routing of events. Consequently, events can be passed from agent to agent before reaching their ultimate destination. Mahout Mahout is all about machine learning. The goal of the project is to build scalable machine-learning libraries. The core of Apache Mahout is implemented on top of Hadoop using MapReduce. However, the project does not limit itself to that paradigm. At present, Mahout is focused on four use cases: ■ Recommendation mining: Recommendation mining is the driving force behind several recommendation engines. How many of you have seen something like this appear in your inbox: “Because you bought this New England Patriots shirt, you might also like this NFL football.” ■ Clustering: Clustering is the grouping of text documents to create topi- cally related groupings or categories. 16 Part I ■ What Is Big Data? ■ Classifi cation: Classifi cation algorithms sit on top of classifi ed documents and subsequently learn how to classify new documents. You could imagine how recruitment agents would love clustering and classifi cation for their buzzword bingo analysis. If Apache Mahout is able to reduce the number of calls received for the wrong job, that’s a win for everyone in my book. ■ Frequent item set mining: Frequent item set mining is a way to under- stand which items are often bucketed together (for example, in shopping basket analysis). Ambari Ambari is the system center of the Hadoop ecosystem. It provides all the provi- sioning, operational insight, and management for Hadoop clusters. Remember that Hadoop clusters can contain many hundreds or thousands of machines. Keeping them confi gured correctly is a signifi cant undertaking, and so having some tooling in this space is absolutely essential. Ambari provides a web interface for ease of management where you can check on all the Hadoop services and core components. The same web inter- face can also be used to monitor the cluster, confi guring notifi cation alerts for health and performance conditions. Job diagnostic information is also surfaced in the web UI, helping users better understand job interdependencies, historic performance, and system trends. Finally, Ambari can integrate with other third-party monitoring applications via its RESTful API. So when I say it is the system center of Hadoop, it literally is! Oozie Oozie is a Java web scheduling application for Hadoop. Often, a single job on its own does not defi ne a business process. More often than not, there is a chain of events, processing, or processes that must be initiated and completed for the result to have meaning. It is Oozie’s lot in life to provide this functionality. Simply put, Oozie can be used to compose a single container/unit of work from a collection of jobs, scripts, and programs. For those familiar with enterprise schedulers, this will be familiar territory. Oozie takes these units of work and can schedule them accordingly. It is important to understand that Oozie is a trigger mechanism. It submits jobs and such, but MapReduce is the executor. Consequently, Oozie must also solicit status information for actions that it has requested. Therefore, Oozie has callback and polling mechanisms built in to provide it with job status/comple- tion information. Chapter 1 ■ Industry Needs and Solutions 17 Zookeeper Distributed applications use Zookeeper to help manage and store confi guration information. Zookeeper is interesting because it steps away from the master/ slave model seen in other areas of Hadoop and is itself a highly distributed architecture and consequently highly available. What is interesting is that it achieves this while providing a “single view of the truth” for the confi guration information data that it holds. Zookeeper is responsible for managing and medi- ating potentially confl icting updates to this information to ensure synchronized consistency across the cluster. For those of you who are familiar with manag- ing complex merge replication topologies, you know that this is no trivial task! The Future for Hadoop You don’t have to look too far into the future to discern the future direction of Hadoop. Alpha code and community previews are already available for Hadoop 2.0, which is fantastic to see. Aside from this, the projects we’ve talked about in the previous section continue to add new features, and so we should also expect to see new V1 distributions from the likes of Hortonworks for the foreseeable future. Of course, one of the most exciting things to happen to Hadoop is the sup- port for Hadoop on Windows and Azure. The opportunity this presents for the market cannot be overstated. Hadoop is now an option for all data professionals on all major platforms, and that is very exciting indeed. So, what can we expect in Hadoop 2.0? Two projects that are worth highlight- ing here (at least in summary): YARN and Tez. Summary In this fi rst chapter, you learned all about what big data is, about the core com- ponents of the Hadoop ecosystem, and a little bit about its history and inspira- tion. The stage is set now for you to immerse yourself in this new and exciting world of big data using Hadoop. 20 Part I ■ What Is Big Data? Those of us who had been following Microsoft’s efforts in this space were all waiting for Microsoft to release a proprietary product for distributed scale-out compute (for example, the Microsoft Research project known as Dryad). However, it was not to be. Microsoft elected to invest in this partnership and work with the open source community to enable Hadoop to run on Windows and work with Microsoft’s tooling. It was more than a bold move. It was unprecedented. Later that week, Dave DeWitt commented in his keynote Q&A that the “mar- ket had already spoken” and had chosen Hadoop. This was a great insight into Microsoft’s rationale; they were too late to launch their own product. However, this is just the beginning of the story. Competition is rife, and although Hadoop’s core is open source, a number of proprietary products have emerged that are built on top of Hadoop. Will Microsoft ever build any proprietary components? No one knows. Importantly, though, the precedent has been set. As product companies look to monetize their investment, it seems inevitable that there will ultimately be more proprietary products built on top of Hadoop. Microsoft’s foray into the world of big data and open source solutions (OSS) has also overlapped with the even broader, even more strategic shift in focus to the cloud with Windows Azure. This has led to some very interesting consequences for the big data strategy that would have otherwise never materialized. Have you ever considered Linux to be part of the Microsoft data platform? Neither had I! With these thoughts in your mind, I now urge you to read on and learn more about this fascinating ecosystem. Understand Microsoft’s relationship with the open source world and get insight on your deployment choices for your Apache Hadoop cluster. NOTE If you want to know more about project Dryad, this site provides a great starting point: http://research.microsoft.com/en-us/projects/dryad/. You will notice some uncanny similarities. Competition in the Ecosystem Just because Hadoop is an open source series of projects doesn’t mean for one moment that it is uncompetitive. Quite the opposite. In many ways, it is a bit like playing cards but with everyone holding an open hand; everyone can see each other’s cards. That is, until they can’t. Many systems use open source technology as part of a mix of components that blend in proprietary extensions. These proprietary elements are what closes the hand and fuels the competition. We will see an example of this later in this chapter when we look at Cloudera’s Impala technology. Hadoop is no exception. To differentiate themselves in the market, distributors of Hadoop have opted to move in different directions rather than collaborate Chapter 2 ■ Microsoft’s Approach to Big Data 21 on a single project or initiative. To highlight how this is all playing out, let’s focus on one area: SQL on Hadoop. No area is more hotly contested or more important to the future of adoption of a distribution than the next generation of SQL on Hadoop. SQL on Hadoop Today To recap what you learned in Chapter 1, “Industry Needs and Solutions”: SQL on Hadoop came into being via the Hive project. Hive abstracts away the com- plexity of MapReduce by providing a SQL-like language known as Hive Query Language (HQL). Notice that it does not suddenly mean that Hadoop observes all the ACID (atomicity, consistency, isolation, durability) rules of a transaction. It is more that Hadoop offers through Hive a querying syntax that is familiar to end users. However, you want to note that Hive works only on data that resides in Hadoop. The challenge for Hive has always been that dependency on MapReduce. Owing to the tight coupling between the execution engine of MapReduce and the scheduling, there was no choice but to build on top of MR. However, Hadoop 2.0 and project YARN changed all that. By separating scheduling into its own project and decoupling it from execution, new possibilities have surfaced for the evolution of Hive. Hortonworks and Stinger Hortonworks has focused all its energy on Stinger. Stinger is not a Hadoop proj- ect as such; instead, it is an initiative to dramatically improve the performance and completeness of Hive. The goal is to speed up Hive by 100x. No mean feat. What is interesting about Stinger is that all the coding effort goes directly into the Hadoop projects. That way everyone benefi ts from the changes made. This completely aligns with Hortonworks’s commitment and charter to Hadoop. So what is Stinger? It consists of three phases. The fi rst two phases have already been delivered. Stinger Phase 1 Phase 1 was primarily aimed at optimizing Hive within its current architecture. Hence it was delivered in Hive 0.11 in May 2013, forming part of Hortonworks Data Platform (HDP) 1.3 release. Phase 1 delivered three changes of notable signifi cance: ■ Optimized RC fi le (ORCFile): Optimizations to the ORC File format have contributed enormously to Hive’s data access patterns. By adding meta- data at the fi le and block level, queries can now be run faster. In addition, 22 Part I ■ What Is Big Data? much like SQL Server’s column store technology, only the bytes from the required columns are read from HDFS; reducing I/O and again adding a further performance boost. NOTE ORCFile stands for Optimized Record Columnar File. This fi le format allows for the data to be partitioned horizontally (rows) and vertically (columns). In essence, it’s a column store for Hadoop. ■ SQL compatibility: Decimal as a data type was introduced. Truncate was also added. Windowing functions also made the list, so Hive picked up support for RANK, LAG & LEAD, FIRST & LAST, and ROW_NUMBER in addi- tion to the OVER clause. Some improvements were also made in the core syntax, so GROUP BY allowed aliases and ALTER VIEW was also included. ■ Query and join optimizations: As with most releases of database soft- ware, query optimizations are often featured, and Hive 0.11 was no exception. Hive had two major changes in this area. The fi rst was to remove redundant operators from the plan. It had been observed that these operators could be consuming up to 10% of the CPU in simple queries. The second improvement was to JOIN operators with the de- emphasis of the MAPJOIN hint. This was in part enabled by another change, which changed the default configuration of hive.auto .convert.join to true (that is, on). Stinger Phase 2 Phase 2 was implemented as part of Hive 0.12, which was released in October 2013. Note that this release followed only 5 months after phase 1. The community behind Stinger are moving at a fast pace. To continue with Stinger’s three-pronged focus on speed, scale, and SQL, phase 2 also needed to cut over to Hadoop 2.0. This enabled the engineers working on Hive to leverage YARN and lay the groundwork for Tez. NOTE Refer back to Chapter 1 for defi nitions of Hadoop projects YARN and Tez. Phase 2 included the following enhancements: ■ Performance: Queries got faster with Stinger phase 2 thanks to a number of changes. A new logical optimizer was introduced called the Correlation Optimizer. Its job is to merge multiple correlated MapReduce jobs into a single job to reduce the movement of data. ORDER BY was made a parallel operation. Furthermore, predicate pushdown was implemented to allow ORCFile to skip over rows, much like segment skipping in SQL Server. Optimizations were also added for COUNT (DISTINCT), with the hive. map.groupby.sorted confi guration property. Chapter 2 ■ Microsoft’s Approach to Big Data 23 ■ SQL compatibility: Two signifi cant data types were introduced: VARCHAR and DATE. GROUP BY support was enhanced to enable support for struct and union types. Lateral views were also extended to support an “outer” join behavior, and truncate was extended to support truncation of columns. New user-defi ned functions (UDFs) were added to work over the Binary data type. Finally partition switching entered the product courtesy of ALTER TABLE..EXCHANGE PARTITION. NOTE SQL Server does not support lateral views. That’s because SQL Server doesn’t support a data type for arrays and functions to interact with this type. To learn about lateral views, head over to https://cwiki.apache.org/confluence/ display/Hive/LanguageManual+LateralView. ■ End of HCatalog project: With Hive 0.12, HCatalog ceased to exist as its own project and was merged into Hive. NOTE HCatalog is defi ned in Chapter 1. Stinger Phase 3 Stinger phase 3 is underway, but will see Hadoop introduce Apache Tez, thus moving away from batch to a more interactive query/response engine. Vectorized queries (batch mode to SQL Server Query Processor afi cionados) and an in- memory cache are all in the pipeline. However, it is still the early days for this phase of the Stinger initiative. Cloudera and Impala Cloudera chose a different direction when defi ning their SQL in Hadoop strat- egy. Clearly, they saw the limitations of MapReduce and chose to implement their own engine: Impala. Cloudera took a different approach to Hortonworks when they built Impala. In effect, they chose to sidestep the whole issue of Hadoop’s legacy with MapReduce and started over. Cloudera created three new daemons that drive Impala: ■ Impala Daemon ■ Impala Statestore ■ Impala Catalog Service Impala Daemon The Impala daemon is the core component, and it runs on every node of the Hadoop cluster. The process is called impalad, and it operates in a decentralized, multimaster pattern; that is, any node can be the controlling “brain” for a given 24 Part I ■ What Is Big Data? query. As the coordinating node is decided for each query, a common single point of failure and bottleneck for a number of massively parallel-processing (MPP) systems is elegantly removed from the architecture. Note, however, that the Impala daemon you connect to when submitting your query will be the one that will take on the responsibility of acting as the coordinator. This could be load balanced by the calling application. However, it is not automatically load balanced. Once one node has been defi ned as the coordinator, the other nodes act as workhorses performing delegated tasks on data subsets as defi ned by the coor- dinator. Each workhorse operates over data and provides interim results back to the coordinator, who will be responsible for the fi nal result set. The Impala daemons are in constant contact with the Statestore daemon to see which nodes in the cluster are healthy and are accepting tasks. Impala Statestore The Statestore is another daemon known as statestored. Its job is to monitor all the Impala daemons, confi rming their availability to perform tasks and inform- ing them of the health of other Impala daemons in the cluster. It therefore helps to make sure that tasks are not assigned to a node that is currently unreachable. This is important because Impala sacrifi ces runtime resilience for speed. Unlike MapReduce, queries that experience a node failure are canceled; so, the sooner the cluster knows about an issue, the better. Note that only one Statestore daemon is deployed on the cluster. However, this is not an availability issue per se. This process is not critical to the operation of Impala. The cluster does become susceptible to runtime stability for query operation, but does not go offl ine. Impala Catalog Service The Catalog Service is the third daemon and is named catalogd. Its job is to distribute metadata changes to all nodes in the cluster. Again, only one Catalog Service daemon is in operation on the cluster, and it is commonly deployed on the same node as the Statestore owing to the fact that it uses the Statestore as the vehicle for transmitting its messages to the Impala daemons. The catalog service removes the need to issue REFRESH and INVALIDATE METADATA statements, which would otherwise be required when using Data Defi nition Language (DDL) or Data Modifi cation Language (DML) in Impala. By distributing metadata changes it ensures that any Impala daemon can act as a coordinator without any additional actions on the part of the calling application. 26 Part I ■ What Is Big Data? consider all the angles as we evaluate which is the most appropriate option for a given environment. This next section fi rst discusses a number of the consid- erations. The discussion then turns to possible topologies. Ultimately, you want a scorecard to help you make some objective decisions. Deployment Factors Which deployment option you choose will be dictated by several factors, many of which are intertwined like the fi bers of a shredded-wheat biscuit. It’s therefore worth keeping them all, as follows, in mind as we work through this section: ■ Elasticity ■ Flexibility ■ Scalability ■ Security ■ Proximity ■ Functionality ■ Usability ■ Manageability Elasticity Think of your elasticity requirement as a rubber band: Do you need to be able to stretch your performance requirement to enable faster processing or to cope with spikes/surges in demand? Elastic scale is the sweet spot for cloud services such as those offered by Windows Azure. I can alter the size and compute power of my cluster at will. With an on-premise service, I am always able to grow, albeit more slowly, but shrinking the topology isn’t possible. Once I’ve bought the kit, I am stuck with it for three years—even if I don’t want it any more. Also, ask: ■ Would you like to be able to reduce your outlay/capacity when there is little or no work to do? ■ Do you know your workload and could you characterize it? ■ Is it predictable and constant, or is it volatile in nature? ■ How quickly can you scale your target environment? ■ Is this even important to you? These are tough questions to answer, especially at the outset of a project. Each deployment option offers varying degrees of elasticity, and your understanding of your environment will be an important factor for your desired deployment option. Chapter 2 ■ Microsoft’s Approach to Big Data 27 Flexibility Closely tied to elasticity is the concept of fl exibility: ■ Are you sure of your processing requirements? ■ How dynamic are your requirements? ■ How complete is the vision driving your project? ■ Is it possible you may need to change your mind as you enter into a voy- age of discovery with big data? Different models offer the opportunity for greater fl exibility in terms of dynamic change. Buying hardware also tends to be a fi xed commitment with a three-year write-down. Scalability You can look at the scalability factor quite simplistically and answer the follow- ing question: How many data nodes do you need? However, this answer also drives a number of follow-up questions for you to consider: ■ Where will you put these nodes? ■ How will you manage and monitor them? ■ Who will manage and monitor them? Because Hadoop is a scale-out architecture, the fi rst question of quantity is really a trigger point to think about the broader issues associated with the scale of the deployment. In actuality, the answer to the scale question provides additional context into the decision making of other factors, particularly fl ex- ibility and elasticity. In terms of scale, there are also considerations that relate to limitations. For example, in HDInsight, Microsoft currently allows a maximum of 40 data nodes. However, this is merely an artifi cial cap placed on the service and can be lifted. Architecturally no limit applies. One might say the same about an on-premise deployment. Certainly, the largest clusters in the world are on premise. However, practicalities will often get in the way. In truth, the same challenges exist for Azure. There has to be capacity in the data center to take your request. However, I have to say, I quite like the idea of making this Microsoft’s problem. Security Hadoop doesn’t have a very sophisticated method of securing the data that is resident in the Hadoop Distributed File System (HDFS). The security models range from weak to none. Therefore, your approach to meeting your security needs is an important factor in your decision-making process. You might want to consider the network layer in addition to the operating system and physical 28 Part I ■ What Is Big Data? hardware when evaluating all these options. Other options include a “secure by default” confi guration, which may well be worth replicating if you want to lock down your deployment. Proximity When addressing the question of proximity, you must know where the data is born. This is relevant for a number of reasons, but the prime reason is latency. We do not want the source and analytical systems to be far apart, because if they are, this distance will add latency to the analysis. That latency can often be directly correlated back to cost; a short local network can often be signifi cantly cheaper and result in less impact that than a geographically dispersed network. The value of the insights from the data may depreciate significantly as the data ages. In these situations, therefore, we may want to keep in close proximity to the data to reduce the mean time to value when analyz- ing the data. In addition, the farther apart the systems are, the more expensive and poten- tially brittle the networking becomes. This is especially apparent in ultra-low latency network topologies where expensive Infi niBand cables may be used to move data at signifi cant velocity. For example, FDR Infi niBand networks can move data at 56Gbps. However, that performance comes at a price, so the shorter the network cables are the better. Consequently, and by way of simple example, if the data is born in the cloud, it will often make sense to provide analytics in the cloud. By doing so, your network will be local, and the performance between environments will be LAN Ethernet speed rather than Internet/VPN (virtual private network) speed. Your environment and total cost of ownership (TCO) will also benefi t because you will avoid data egress charges. Functionality Although perhaps not immediately obvious, you need to make sure that your target platform offers the functionality you want to use. Not all Hadoop distri- butions are created equally. A simple example is differing support for versions of Hive or the inclusion of HBase in a Hadoop distribution. That is not to say that you cannot add/upgrade Hadoop projects to your deployment. However, when you do this, you are stepping outside of the boundaries of the hardened distribution. Your choice of operating system also dictates the range of distributions of Hadoop available to you and therefore your deployment options. Chapter 2 ■ Microsoft’s Approach to Big Data 29 Usability Getting started with Hadoop can sometimes feel like a rather daunting pros- pect, especially if you are exclusively used to the Microsoft ecosystem. At its most extreme, you could download the source code from the Apache website, compile the code yourself, and then manually deploy the code to build the clus- ter. However, deployment can be as simple as confi guring a wizard. Usability therefore is a sliding scale, and the spin-up cost you can accept will help to drive you toward one solution vs. another. Whichever solution you decide on, you are probably going to want to roll your own PowerShell scripts to build your cluster so that it is a repeatable process. Manageability Operational considerations are often overlooked when considering deployment factors. Consider the following: ■ How will the system be monitored? ■ Which human resources will be needed to support the environment? ■ What availability requirements exist, and what disaster recovery strategy is in place? ■ How will security be addressed? These are all common operational questions that you need to answer. Deployment Topologies Now that we have an understanding of the factors that might infl uence a deploy- ment, we can focus on the topologies themselves before moving on to compare them with each other. In this next section we’ll compare the following options: ■ On-Premise Hadoop ■ Infrastructure as a Service Hadoop ■ Platform as a Service Hadoop Hadoop on Premise You can always follow the traditional path, which is to build your Hadoop cluster on premise. Most Hadoop clusters in the world today are built using Linux as the operating system. However, as you learned in Chapter 1, Hortonworks has a distribution for Windows. The biggest challenge when picking Hadoop on premise as your deployment option is knowing how to size it. How many data nodes will you really need? 30 Part I ■ What Is Big Data? Of course, after you have done that, you then need to procure all the hardware and rack and confi gure it. You will also have taken on the management and monitoring of the cluster and so will need to fi gure that out as well. You might choose to use the Ambari project for this purpose. Alternatively, you could stay within the Microsoft ecosystem and use System Center to drive your monitoring. An Ambari SCOM Management Pack has been created so that you can monitor Hadoop using the same infrastructure as the rest of your enterprise. Either way, you need to ensure that you have integrated the new infrastructure into your environment and make sure that all the feeds of data can access the new cluster. So, why follow this model? Speaking personally, it often comes down to fl ex- ibility, proximity, security, and functionality. However, proximity is one factor to emphasize here. If the data is born on premise, and in signifi cant volume, it may make greater sense to retain the data in an on-premise environment. Pushing large volumes of data to the cloud might be impractical and might introduce too much latency prior to the analysis. That said, this issue may be tempered once the initial data load has been accomplished. Subsequent loads will contain only delta changes which would hopefully reduce the burden of the latency. Remember, though, the second part of proximity, which pertains to integra- tion with other data environments. Business users don’t actually care where or what store their data is; they just want to query (and that might be a query across environments). PDW Integration Querying across environments was one of the three key design goals for project Polybase, one of the stand-out features in SQL Server Parallel Data Warehouse (PDW) 2012. Polybase provides the glue between Hadoop and the data warehouse for the parallel import and export of data. However, it also enables a unifi ed heterogeneous querying platform for end users. In other words, I can write the following and it just works: SELECT COUNT(*) , SUM(s.Value) AS Total_Sales , p.Category_name , d.CalendarMonth_name FROM dbo.hdfs_Sales s JOIN dbo.pdw_Product p ON s.Product_key = p.Product_key JOIN dbo.pdw_Date d ON s.Date_ = p.Date_key WHERE d.CalendarYear_nmbr = 2012 GROUP BY p.Category_name , d.CalendarMonth_name In the preceding code, I am able to query data held in Hadoop and join it to data held in PDW with a single logical declarative statement. This is important because it enables consumers of data to work with data irrespective of the data source. They are just tables of data. That said, project Polybase is not without its restrictions. In the release-to- manufacturing (RTM) version of PDW 2012, Polybase only currently supports Chapter 2 ■ Microsoft’s Approach to Big Data 31 a delimited fi le format and works with a limited number of distributions: HDP on Windows, HDP on Linux, and Cloudera. Furthermore, the RTM version does not leverage any of the compute resources of the Hadoop cluster. PDW is simply importing (at great speed) all the data held in Hadoop and holding it in temporary tables inside PDW. This model will evolve over time, and we should look forward in the future to the automatic generation of MapReduce jobs as query optimizations. One might imagine that a slightly rewritten query like the following one might trigger a MapReduce job to enable the where clause to be applied to the query as part of the Hadoop subtree of the query plan: SELECT COUNT(*) , SUM(s.Value) AS Total_Sales , p.Category_name , d.CalendarMonth_name FROM dbo.hdfs_Sales s JOIN dbo.pdw_Product p ON s.Product_key = p.Product_key JOIN dbo.pdw_Date d ON s.Date_key = p.Date_key WHERE s.Date_key >= 20120101 AND s.Date_key < 20130101 GROUP BY p.Category_name , d.CalendarMonth_name Exciting times lie ahead for PDW with Polybase integration into Hadoop. We will dive into PDW and Polybase in much greater detail in Chapter 10, “Data Warehouses and Hadoop Integration.” HDInsight When HDInsight was announced as being generally available (GA), the team also announced an HDInsight emulator. This is designed for developer scenarios in an on-premise environment. Because it is targeted for development, it can be deployed in a single node confi guration only. However, you could also opt for a custom one-data-node confi guration of an HDInsight cluster using the Azure benefi ts accompanying an MSDN subscription or Azure trial. For some simple instructions on getting started with the emulator, head over to the following article: http://www.windowsazure.com/en-us/manage/services/ hdinsight/get-started-with-windows-azure-hdinsight-emulator/. If you don’t like either of these options, Hortonworks offers either a sandbox virtual machine or you can build a one-node development environment on Windows. The advantage of this is that as HDInsight uses HDP under the hood you can be confi dent of developing portable code. Either way, you certainly aren’t short of choices! Hadoop in the Cloud The single biggest transformation to occur to IT in recent times has been the advent of the cloud. By leveraging cloud computing, we can take advantage of a Chapter 2 ■ Microsoft’s Approach to Big Data 33 HDInsight is a secure-by-default confi guration. It disables access to the cluster via Remote Desktop by default and also provides (for free) a secure gateway virtual machine that performs the authorization/authentication and exposes endpoints on port 443. Similarly, you can also retrieve Ambari metrics using the REST API via the secure gateway on port 443. HDInsight is using HDP as its base distribution, but limitations apply. For example, at the moment, HBase is not included in HDInsight. However, on the plus side, it means complete ubiquity. You can take your data set and export it from HDInsight if you want and install it in an on-premise or IAAS installation running HDP, and it will just work. In late 2013, Microsoft released HDInsight to general availability. It currently is not installed in all Azure data centers, but I expect that to change as the space is provisioned in each data center. Infrastructure as a Service (IAAS) Much of what can be achieved currently with HDInsight and PAAS can also be achieved using a combination of IAAS and scripting. You can automate the process of creating your environment and also elastically adjust the resources each time you spin up a Hadoop cluster. Granted, you will have to roll your own scripts to build this environment, but you will have ultimate fl exibility in your deployment during the installation process. You may fi nd that it takes you more time to confi gure and build your IAAS environment as opposed to using the PAAS option with HDInsight. Note, however, that some additional tasks do become your responsibility. For example, the secure node in HDInsight isn’t automatically provisioned, and neither is the metastore. Also, if you want to save money and keep hold of your data, you will need to have your own automated process of detaching disks, deleting virtual machines and performing any additional cleanup. The same clearly is also true (in the inverse) for creating the cluster. Interestingly enough, the IAAS option opens up the opportunity to run Linux or Windows virtual machines inside of Windows Azure. Therefore, you can actually run any fl avor of Hadoop you like on Windows Azure including the latest and greatest versions of Hortonworks or even Cloudera’s Impala. You simply aren’t constrained to Windows as an operating system on the Microsoft data platform. If you said that to me a few years ago, I’d have looked at you somewhat quizzically. However, the world has simply moved on. Cloud plat- forms are more interested in managing the compute resources at massive scale than in eliminating workloads based on technology choices. Deployment Scorecard So what have we learned? See Figure 2.1. Chapter 2 ■ Microsoft’s Approach to Big Data 35 We’ve learned that some of this is relative. Security, for example, is not a great situation for any option, because it is not a strength of Hadoop. Likewise, proximity depends on where the data is actually born. When data is born in the cloud, proximity of cloud offerings would be better (assuming the same service provider was used). However, for proximity I’ve scored this based on a more general ability to integrate with other disparate data sources throughout out an enterprise. At the time of this writing, the majority of this kind of enterprise data sits on premise, which explains the scoring. Cloud deployments work well for the following: ■ Elastic scale ■ Kicking the tires ■ Prototyping On our scorecard, shown in Figure 2.1, what shines through is that the cloud offers that elusive fl exibility and elastic scale that encourages experimentation and works brilliantly for “bursty” workloads. However, unless the data is con- sistently being born in the cloud (a mighty big i f ), the proximity question will probably be an issue. Remember proximity is a two-part measure: ■ The distance to the system managing the data (i.e., the bandwidth question) ■ The distance to other enterprise data sources (i.e., the integrated analysis dilemma) The magnitude of this fi rst element may be mitigated if the burden is only endured for an initial or historic load. If the subsequent deltas are of a manage- able size, then the concerns may be signifi cantly reduced. The second element is a ratio. If there is little or no integrated analysis with on-premise data sources, then the proximity issue is again less of a concern. If there is a signifi cant amount of integrated analysis, this may prove to be a cloud deployment deal breaker. On-premise deployments work well for the following: ■ Custom deployments ■ Data born on premise ■ Secure data An on-premise deployment is best suited for data that is born on premise and that perhaps also needs to be integrated with other data sources that may also reside locally. Having your own environment, although fl exible, is also a commitment of both compute and human resources. It therefore suits predict- able workloads, which can be more readily sized. 36 Part I ■ What Is Big Data? Summary What have you learned? The world isn’t quite what it once seemed. In the time it has taken to move from SQL Server 2008 to SQL Server 2012 the world has been turned upside down. Microsoft’s data platform now includes and embraces Linux deployments of open source technology running Apache Hadoop. What’s more, we have a variety of options available in terms of our deployment choices. What we choose is largely down to business need and data value (as it should be). You should now understand the factors that could infl uence your deployment choice and how to evaluate the options before you. You should also have a better understanding of the Hadoop ecosystem and some of the drivers infl uencing its future direction. 40 Part II ■ Setting Up for Big Data with Microsoft cluster in Windows Azure with four nodes to understand the vagaries of the cloud environment. This chapter assumes that the on-premise cluster is being built in a Hyper-V environment or other similar virtualization technology for your initial development environment. (Later in this book, Chapter 16, “Operational Big Data Management,” shows what an enterprise class cluster may look like when built so that you have a guideline for a production-class Hadoop cluster.) Hadoop is an enterprise solution that requires server-class software to run. Therefore, the fi rst thing you need for the installation of the HDP is a copy of one of the following: 1. Windows Server 2008 R2 (64-bit) 2. Windows Server 2012 (64-bit) NOTE Windows 7 and Windows 8 are not supported. But one of the best reasons to upgrade from Windows 7 to Windows 8 is that it includes Windows Hyper-V, which means that you can have all the benefi ts of your client desktop but drive enterprise software from the Hyper-V environment. For the development environment, this chapter uses Windows Server 2012 Standard Edition (64-bit) with a graphical user interface (GUI). You should allo- cate at least 2GB of memory for the virtual machine (VM) of your HDP server. Getting the Install Because HDInsight and the HDP are a collaboration between Microsoft and Hortonworks, you can get information about the product in multiple places. The Microsoft website has a number of resources about Microsoft’s overall plan for big data and HDInsight in particular. You can fi nd the information at http:// www.microsoft.com/bigdata. Here you will fi nd information about HDInsight Service that runs on Windows Azure (Microsoft’s cloud service) and more about the Microsoft and Hortonworks partnership. In addition, Hortonworks provides a plethora of material for their HDP on Windows on their website at http:// hortonworks.com/products/hdp-windows/. Here you will fi nd the download link to install HDP and plenty of documentation and support forums for HDP on Windows. Running the Installation Now it is time for you to install Hadoop for the first time. In this sec- tion you are going to install all the prerequisites for the HDP, configure 42 Part II ■ Setting Up for Big Data with Microsoft Microsoft .NET Framework 4.0 Now you need to install Microsoft.NET Framework 4.0. If you are installing HDP on a Windows 2012 server, you can skip this step because it is already installed with the base operating system. If you are installing on Windows Server 2008 R2, this is a required step. Use the instructions provided here to download and install Microsoft.NET Framework 4.0: http://www.microsoft. com/en-us/download/confirmation.aspx?id=17851. Java JDK 6u31 To begin, you need to download Java JDK located here: http://www.oracle.com/ technetwork/java/javase/downloads/java-archive-downloads-javase6-419409 .html#jdk-6u31-oth-JPR. You want to choose the Windows x64 (64-bit) version, which is at the bottom of the table. Ensure that you accept the license at the top of the table before clicking the link. You need to sign in with your Oracle account. If you do not have one, sign up for one to proceed. Then, complete the following steps: 1. Install the Java JDK to a directory path that has no whitespace characters in its path; the default path will not work! For example, C:\program files\ Java\ is not a valid path. It is recommended to use C:\Software\JAVA\, which is a valid path (see Figure 3.1). Figure 3.1: Java directory path. 2. If prompted, register your Java JDK. Chapter 3 ■ Configuring Your First Big Data Environment 43 3. Create a system variable named JAVA_HOME. The value of this variable is the full path to the installation directory defi ned earlier for JDK. 4. Open the Control Panel’s System pane and click Advanced system set- tings. Click the Advanced tab. 5. Click the Environment Variables button. Under System variables, click New. Enter the variable name as JAVA_HOME. 6. Enter the variable value, as the installation path for the JDK. If you fol- lowed the earlier direction, this variable value will be C:\Software\JAVA (see Figure 3.2). Figure 3.2: Setting the JAVA_HOME path. 7. Click OK. 8. Click Apply Changes. Python 2.7 You must install Python 2.7. Download Python from http://www.python.org/ download/. Choose Python 2.7.6 Windows Installer (Windows binary, does not include source). Install Python using all the default confi gurations. Then, complete the following steps: 1. Update the PATH environment variable. Using Administrator privileges, open the Control Panel’s System pane and click Advanced system settings. Click the Advanced tab. 44 Part II ■ Setting Up for Big Data with Microsoft 2. Click the Environment Variables button. Under System Variables, fi nd PATH and click Edit. 3. In the Edit windows, modify PATH by appending the installation path for your Python directory to the value of PATH. For example, if the Python executable is installed at C:\Python27\, you must append this value to PATH (see Figure 3.3). Figure 3.3: Adding Python to your default path. 4. To validate your settings, from a command shell or PowerShell window, type Python. If you set your path properly, you will get a response shown in Figure 3.4. At this point, take a snapshot of the VM image and name it something like HDP_1-3_PreReqs_Installed. Server Confi guration Now it’s time to begin confi guring your server. You need the hostname of the server you are installing to for later confi guration tasks. On the desktop of your server, right-click and select New and the select Text Document. Name it HDP Chapter 3 ■ Configuring Your First Big Data Environment 45 Configuration. You will use this document for a few pieces of information you will need later. Figure 3.4: Verifying Python. Next, complete the following steps: 1. Get your hostname. At a command prompt on the cluster host, execute the following command: Hostname. You will receive back the hostname of your server. Copy and paste this text into the HDP confi guration fi le you created and save the fi le. 2. Now you confi gure a fi rewall. For the purposes of a development envi- ronment, the easiest thing to do is to disable the fi rewall completely and open all ports. 3. Open the Control Panel. Type firewall into the search box. Click Check Firewall Status. Click Turn Windows Firewall on or off. Click Turn off Windows Firewall for all available networks. If your corporate fi rewall policies require you to have fi rewalls turned on, you must open up all of the necessary ports. One way to do it is to open an administrative command prompt (Shift+right-click, Run as Administrator) and type the following command for each port: netsh advfirewall firewall add rule name=AllowRPCCommunication dir=in action=allow protocol=TCP localport=135 Table 3.1 through Table 3.4 list the default ports used by the various ser- vices. Make sure the appropriate ports are opened before you install HDP. 46 Part II ■ Setting Up for Big Data with Microsoft Table 3.1: Default Ports Used by the Hadoop Distributed File System (HDFS) SERVICE SERVERS DEFAULT PORTS USED PROTOCOL DESCRIPTION NameNode web UI Master nodes (NameNode and any backup NameNodes) 50070 HTTP Web UI to look at cur- rent status of HDFS, explore fi le system 50470 HTTPS Secure HTTP service NameNode metadata service 8020/9000 IPC File system metadata operations DataNode All slave nodes 50075 HTTP DataNode web UI to access the status, logs, and so on 50475 HTTPS Secure HTTP service 50010 Data transfer 50020 IPC Metadata operations Secondary NameNode Secondary NameNode and any backup secondary NameNode 50090 HTTP Checkpoint for NameNode metadata Table 3.2: Default Ports Used by MapReduce SERVICE SERVERS DEFAULT PORTS USED PROTOCOL DESCRIPTION JobTracker web UI Master nodes (JobTracker node and any backup JobTracker node ) 50030 HTTP Web UI for JobTracker JobTracker Master nodes (JobTracker node) 8021 IPC For job submissions TaskTracker web UI and shuffl e All slave nodes 50060 HTTP DataNode web UI to access status, logs, and so on History server web UI 51111 HT TP Web UI for job history Chapter 3 ■ Configuring Your First Big Data Environment 47 Table 3.3: Default Ports Used by Hive SERVICE SERVERS DEFAULT PORTS USED PROTOCOL DESCRIPTION HiveServer2 HiveServer2 machine (usually a utility machine) 10001 Thrift Service for program- matically (Thrift/JDBC) connecting to Hive HiveServer Hive Server machine (usually a utility machine) 10000 Thrift Service for program- matically (Thrift/JDBC) connecting to Hive Hive Metastore 9083 Thrift Service for program- matically (Thrift/JDBC) connecting to Hive metadata Table 3.4: Default Port Used by WebHCat SERVICE SERVERS DEFAULT PORTS USED PROTOCOL DESCRIPTION WebHCat server Any utility machine 50111 HTTP Web API on top of HCatalog and other Hadoop services Table 3.5: Default Ports Used by HBase SERVICE SERVERS DEFAULT PORTS USED PROTOCOL DESCRIPTION HMaster Master nodes (HBase master node and any backup HBase master node) 60000 HMaster Info web UI Master nodes (HBase mas- ter node and backup HBase master node if any) 60010 HTTP The port for the HBaseMaster web UI. Set to –1 if you do not want the info server to run. Region server All slave nodes 60020 Continues 48 Part II ■ Setting Up for Big Data with Microsoft SERVICE SERVERS DEFAULT PORTS USED PROTOCOL DESCRIPTION Region server All slave nodes 60030 HTTP ZooKeeper All ZooKeeper nodes 2888 Port used by ZooKeeper peers to talk to each other. See here for more information. ZooKeeper All ZooKeeper nodes 3888 Port used by ZooKeeper peers to talk to each other. Refer to http://zookeeper.apache .org/doc/r3.4.3/ zookeeperStarted.html#sc_ RunningReplicatedZooKeeper 2181 Property from ZooKeeper’s confi g zoo.cfg. The port at which the clients will connect. 4. Download HDP from http://public-repo-1.hortonworks.com/HDP- Win/1.3/hdp- 5. Go to your downloads folder and extract the zipped fi le. Within the extracted folder, you should fi nd four fi les (see Figure 3.5). Figure 3.5: Installation files for Hortonworks Data Platform. 6. Open up cluster properties to make confi guration changes to it. Here, you want to copy your server name from your HDP Confi guration fi le on your desktop and replace the server names for all the nodes under #Hosts and for the DB_HOSTNAME under #Database Host. Additionally, be sure to change the Log and Data directories to the appropriate drive for your server, most likely the C drive. The fi nal clusterproperties.txt fi le Table 3.5 (continued) Chapter 3 ■ Configuring Your First Big Data Environment 49 should look similar to the fi le shown in Figure 3.6 (with your hostname for your single node cluster). Save your fi le. Figure 3.6: Setting clusterproperties.txt values. 7. Within your HDP Confi guration fi le on your desktop, type the following command for installation of HDP. It is recommended to keep a copy of this command here in case you need multiple installation attempts. Replace any location values (HDP.msi, clusterproperties.txt) if they differ from the example: Msiexec /i "C:\Users\Administrator\Downloads\hdp-\ hdp-\hdp-" /lv "hdp.log" HDP_LAYOUT="C:\Users\Administrator\Downloads\hdp-\ hdp-\clusterproperties.txt" HDP_DIR="C:\hdp\hadoop" DESTROY_DATA="yes" 8. From an elevated command prompt with administrative privileges, run the preceding command. A successful installation will give you back the 50 Part II ■ Setting Up for Big Data with Microsoft dialog box shown in Figure 3.7. Otherwise, you will have some investigat- ing to do, as explained later in the chapter. Figure 3.7: Successful installation. 9. On the desktop of your server, you will have three new icons. Click the Hadoop command prompt. This opens up a command prompt to the Hadoop install directory. Now, navigate back a directory: cd .. 10. Next we want to start all the services associated with your HDP installation: start_local_hdp_services This job will take a couple minutes to complete. In the next step, we evaluate the success of our installation with a smoke test that validates the installation. Ideally, you will get a message like the one shown in Figure 3.8 that validates the starting of all services. Figure 3.8: Successful starting of all services. 11. From the same command prompt, enter the following: Run-SmokeTests This command runs a variety of tests against your installation to verify that MapReduce, Hive, Pig, Hcat, ZooKeeper and other services are running as expected. It does this by actually creating tables, describing them, querying them, and dropping them from your installation. This command takes several minutes to run and the output will provide you with the confi dence that you have installed HDP correctly and can begin using your installation. Chapter 3 ■ Configuring Your First Big Data Environment 51 Once complete, read through the output in your command shell and look for any errors. If you fi nd none, congratulations, you have successfully installed HDP onto a single node “cluster.” HDInsight Service: Installing in the Cloud HDInsight Service is Microsoft’s cloud-based Hadoop solution built from the HDP. HDInsight Service is a great solution for both test and production data. For test, there is no easier way to spin up a Hadoop cluster and start loading data and running jobs. For production, HDInsight is a wonderful elastic solu- tion for organizations that want their clusters to grow both with their size and complexity of their data. You will now set up the storage containers needed for your data to reside in Azure Storage Vault (ASV) and then create an HDInsight cluster: 1. After signing in to your Windows Azure portal, click the Storage icon and click the New icon in the bottom-left corner. Create a new storage container using a unique name. Choose a location that is appropriate for you (see Figure 3.9). Later when you create your HDInsight Service cluster, you will choose the same location as you want to collocate your data with your computer nodes. Figure 3.9: Configuring your Windows Azure storage. 2. Click the HDInsight icon and click Create an HDInsight Cluster or click New in the bottom-left corner of the web page. You have two choices at this point: to either use the Quick Create or Custom Create paths to create a new cluster. The Quick Create path is great for getting a development cluster up and running swiftly. 3. Choose a unique cluster name. You get a green check box when you choose a unique name; otherwise, you get a red explanation point telling you that you have chosen a name already in use. Chapter 3 ■ Configuring Your First Big Data Environment 53 might need to access your data. You can always regenerate access keys, which will force users to reset them in their client tool. Figure 3.10: Configuring your HDInsight Service Cluster. Confi guring Azure Storage Explorer To confi gure Azure Storage Explore, click Add Account and enter the storage account name and storage account key from your Manage Access Keys page in Windows Azure. Click Add Storage Account when complete (see Figure 3.11). Figure 3.11: Connecting Azure Storage Explorer to your storage account. With Storage Explorer, you can both explore previously created contain- ers while also providing capabilities to add and delete containers or upload and download fi les. In other words, it works and feels much like Windows Explorer. 54 Part II ■ Setting Up for Big Data with Microsoft ClumsyLeaf’s CloudXplorer Another popular option is ClumsyLeaf Software’s CloudXplorer. Although not free like Storage Explorer, CloudXplorer is a popular option because of its modern interface and simple functionality (see Figure 3.12). You can fi nd an evaluation copy at http://clumsyleaf.com/products/cloudxplorer. You can add new accounts to CloudXplorer very similarly to Storage Explorer. Click the Accounts icon on the Windows ribbon and walk through the steps of creating a new account using the account name and secret key from your Manage Access Keys page in Windows Azure. Figure 3.12: CloudXplorer’s clean user interface. AZCopy As you can imagine, neither of the previous solutions is exactly what you want when developing a production system where data movement is one of the keys to success. A command-line tool for data movement to and from Windows Azure storage is needed. AZCopy to the rescue. It is available on GitHub (https:// github.com/downloads/WindowsAzure/azure-sdk-downloads/AzCopy.zip). AZCopy is similar to Robocopy in functionality except that it can natively connect to your ASV storage. Among its many features, AZCopy enables you to copy nested directories of fi les, use wildcards, and is restartable, which means that you can resume a copy process from where it left off. In addition, AZCopy has a verbose logging mode that is essential to an enterprise-class process. The following example copies all the fi les in recursive mode from the local- data folder to the container called newcontainer in ASV. You would replace key with a copy of your secret key from your Azure storage account: AzCopy C:\localdata https://sqlpdw.blob.core.windows.net/newcontainer/ /destkey:key /S Chapter 3 ■ Configuring Your First Big Data Environment 55 AZCopy can easily be wrapped in a robust program written by you or called from a SQL Server Integration Services (SSIS) package as needed. It could also be called from a SQLAgent job for scheduling of data movement both to or from your Azure storage account. Validating Your New Cluster Now that you have created your new cluster, it is time to validate it. Validating that everything is installed and working as expected is vital to your understanding of the system and gives you confi dence that what you hand over is ready for use. Logging into HDInsight Service After you have successfully created your HDInsight cluster, you may want to verify its functionality. HDInsight includes some sample data that enables you to do this very quickly. From the HDInsight tab, choose your cluster (see Figure 3.13). Figure 3.13: HDInsight Service interface. Click Confi guration on the top menu. Next, click Enable Remote. A pop-up box will ask you for a username and password for connecting to the HDInsight NameNode. Finally, you will need to provide an expiration, after which the remote access expires. You should now have a Connect button as shown in Figure 3.14. 56 Part II ■ Setting Up for Big Data with Microsoft It is at the bottom of your confi guration page, and clicking the Connect button will allow you to receive the Remote Desktop (RDP) fi le that will allow you to access the NameNode. Go ahead and click Connect. Figure 3.14: Clicking the Connect button. In Internet Explorer, a request to open or save the fi le will appear on screen. Choose Open, and an RDP session will begin. Log in to the NameNode with your admin password you chose during the cluster creation process. You’ll see here that the desktop of the NameNode in HDInsight Service looks very similar to the single-node cluster we built with our HDP on-premise solution. You can use all the same commands in this environment that you can in the on-premise version. Generally you shouldn’t have to verify HDInsight Service installations because these are highly engineered solutions that have been tested and executed thousands of times. If you happen to run into a problem, you will most likely either just delete that and create a new one or contact Microsoft Support through the Azure website. However, installing HDP locally will require a bit more vigilance on your part. Chapter 3 ■ Configuring Your First Big Data Environment 57 Verify HDP Functionality in the Logs Chances are decent that the fi rst few times you attempt to install HDP, you will get an error. This is due to the many pre-installation steps that are vital to the successful installation of Hadoop. Now’s not the time to get frustrated, but to follow what the logs tell you and make the adjustments necessary to try the installation again. Here we will walk you through where to fi nd the informa- tion you’ll need to be successful. Installation Logs If you have any failures during the installation of HDP, check the installation log fi les located at C:\HadoopInstallFiles. Specifi cally, you’ll want the fi le located at C:\HadoopInstallFiles\HadoopSetupTools\hdp- .install.txt. If the package did not install correctly, details in this fi le can help guide you in fi xing the solution. Generally, the issues revolve around not pay- ing attention to the details of the installation instructions, such as creating the appropriate environmental variables, making all the changes in the confi guration fi le, or disabling the fi rewall. If the error seems to involve one of these steps in the pre-installation phase, go back and double-check that you did everything correctly. Make sure that all paths and names are exactly as they are supposed to be, because this is where most people make mistakes. Individual Service Logs If you had a successful installation of HDP but are having trouble starting the services, look into the product log directory that was specifi ed in the cluster- properties.txt fi le during installation. Recall that the example in this book had them located at C:\hadoop\logs. Each one of the services has a separate folder. Each folder has multiple fi les in it, and you might have to open a few to determine the correct one for a particular problem. For example, if you are having a problem starting the Hive service, you navigate to the C:\hadoop\logs\hive\hive.txt fi le. See Figure 3.15. 58 Part II ■ Setting Up for Big Data with Microsoft Figure 3.15: Connecting to the RDP session. Common Post-setup Tasks Once you have successfully created your cluster and verifi ed its success, you should be itching to get some data on it and start kicking the tires. In the next few steps, we’ll load some real data into Hadoop and then check out a couple of the most useful tools, Hive and Pig. (You’ll learn more about Hive and Pig in Chapter 6, “Adding Structure with Hive,” and Chapter 8, “Effective Big Data ETL with SSIS, Pig, and Sqoop.”) Loading Your First Files Now that you have successfully installed HDP, it is time to get some data loaded into HDFS so that you can verify the functionality of the system. A favorite data set for playing around in HDP (and Hive in particular) is an airline data set that shows all the fl ight and on-time information for airline fl ights within the United States from 1987 to 2008. You can fi nd the original fi les at http:// stat-computing.org/dataexpo/2009/the-data.html. Basic File system Operations The HDFS is available and ready to be loaded with data. Using the fi le system command fs, you can list and create directories, read and move fi les, and delete data. Use the hadoop fs -help command to get all the functionality available. Chapter 3 ■ Configuring Your First Big Data Environment 59 Start by loading the airline data into HDFS. In the following example, a folder C:\SourceData was created on the HDP server, but it can be a share anywhere in your network. Four of the fi les from the airline data set have been downloaded into the SourceData folder for loading into HDFS. First, create a folder in HDFS to store the data. Then complete the following steps to import the data into HDFS: 1. Open the Hadoop command console from the desktop and enter the following command: hadoop fs -mkdir flightinfo 2. Import the all fi les from the SourceData folder into the flightinfo folder: hadoop fs -put c:\SourceData\*.* flightinfo 3. To verify the fi les were copied as expected, you can run the -ls command: hadoop fs -ls flightinfo The output of the preceding steps should look like Figure 3.16. Figure 3.16: Verifying files in HDFS. A few of the fi le system commands that you will use often are listed in Table 3.6. For a full list of available commands, enter hadoop fs -help at the prompt in the Hadoop command console. Table 3.6: Common File system Commands FILE SYSTEM COMMANDS DESCRIPTION cat Copies source paths to stdout copyFromLocal Similar to put command, except that the source is restricted to a local fi le reference count Counts the number of fi les within a directory cp Copies fi les from source to destination get Copies fi les to the local fi le system (opposite of put) mv Moves fi les from source to defi nition within HDFS rm Deletes specifi ed fi les tail Displays last KB of the fi le to stdout 60 Part II ■ Setting Up for Big Data with Microsoft Verifying Hive and Pig Two tools that you will fi nd yourself using often are Hive and Pig. Both of these tools are higher-level languages that allow developers to work with data stored in HDFS without having to write Java programs. Hive is a SQL-like language that allows developers to apply a table-like structure to the data so that they can invoke Hive queries in much the same manner that they invoke SQL Queries. These HiveQL queries are translated into MapReduce jobs that are submitted to Hadoop for execution. Pig is another abstracted language, which is scripting in nature. Because of the scripting nature of Pig, it is very useful as an extract, translate, and load (ETL) tool for data movement and manipulation. The next couple of sections introduce these two tools. Verifying Hive To verify Hive, follow these steps: 1. Open the Hadoop command console and enter the following: cd.. This brings us up one level in the folder structure. 2. Next, enter the following: cd hive-\bin Alternatively, type hive and then press Tab; it will fi ll in the rest of the folder name for you and then add \bin). 3. Then enter the following: hive This brings up the Hive prompt, and the fact that it comes up is the fi rst thing that lets us know that the service is running as expected. To fi nd out what objects already exist in your Hive database, enter the fol- lowing command: show tables; There may be a few tables listed from the previous smoke tests that were run. NOTE Make sure to include the semicolon after each command; otherwise, the Hive prompt will simply go to the next line and await further commands. To create your own table based on the airline data that you loaded earlier, run the following command. Note that you may need to replace “administrator” in the location path with your username that you used to log into Windows: Chapter 3 ■ Configuring Your First Big Data Environment 61 CREATE EXTERNAL TABLE flightinfo ( Year int, Month int, DayofMonth int, DayOfWeek int, DepTime int, CRSDepTime int, ArrTime int, CRSArrTime int, UniqueCarrier string, FlightNum int, TailNum string, ActualElapsedTime int, CRSElapsedTime int, AirTime int, ArrDelay int, DepDelay int, Origin string, Dest string, Distance int, TaxiIn int, TaxiOut int, Cancelled int, CancellationCode string, Diverted string, CarrierDelay int, WeatherDelay int, NASDelay int, SecurityDelay int, LateAircraftDelay int )ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION /user/administrator/flightinfo'; This should take very little time to fi nish. Once completed, you can run a query against the table, such as the following: SELECT COUNT (*) FROM flightinfo; A successful query will look like Figure 3.17. Figure 3.17: Results of Hive query. As you can see, it provides a great deal of information beyond just the answer to the query (28, 745, 465). Your answer may differ depending on which and how 62 Part II ■ Setting Up for Big Data with Microsoft many of the fl ight info fi les you loaded into HDFS. Hive also tells us the time it took for the query to complete, how many reads and writes occurred, and the name of the job used in the process. The name of the job is vitally important if you need to troubleshoot for errors. Finally, you can drop the table: DROP TABLE IF EXISTS flightinfo; And exit Hive: exit; Open up a new Hadoop command prompt and enter the following: hadoop fs -ls flightinfo Notice that the data is still there. Hive external tables such as what we created earlier are simply metadata explanations of the data residing in HDFS. You can create and drop tables without affecting the underlying data; this is one of the great powers of Hive that you will learn more about in Chapter 10, “Adding Structure with Hive.” Verifying Pig Pig is a procedural scripting language that you will learn more about in Chapter 8, “Effective Big Data ETL with SSIS, Pig, and Sqoop.” To quickly test Pig, you are going to run a word-count program that you often see in MapReduce examples. Any *.txt fi le will do, but one good example is the Davinci.txt fi le available from the examples in the HDInsight Service Samples directory. You can get to this directory by logging in to the Windows Azure portal and clicking Manage Cluster. Samples is one of the choices on the screen, and WordCount is a choice on the next screen. On this screen, is the Davinci.txt download link. Now, complete the following steps to verify Pig: 1. Download Davinci.txt or any text fi le into a new folder C:\PigSource. Put the data into HDFS: hadoop fs -mkdir wordcount 2. Import the all fi les from the SourceData folder into the flightinfo folder: hadoop fs -put c:\PigSource\davinci.txt wordcount/ 3. To verify the fi le was copied as expected, run the -ls command: hadoop fs -ls wordcount Now let’s log in to the Pig console. 4. Navigate to the hadoop\pig-\bin directory and enter the following: pig Chapter 3 ■ Configuring Your First Big Data Environment 63 You will fi nd yourself at the grunt prompt: Grunt> 5. To get used to the interactive console, type each one of the following lines at the Grunt prompt. Press Enter after each line and the Grunt command should reappear for the next line: myinput = LOAD 'wordcount/davinci.txt' USING TextLoader(); words = FOREACH myinput GENERATE FLATTEN(TOKENIZE($0)); grouped = GROUP words BY \$0; counts = foreach grouped generate GROUP, count(words); ordered = ORDER counts BY \$0; STORE ordered INTO 'output/pigout' USING PigStorage; The fi nal line will kick off a MapReduce job and store the output in the pigout folder. 6. Run the following command to read the output of the Pig job: hadoop fs -cat output/pigout/part-r-00000 The output is a list of every word in the document and the count of the times it was used. You will get more exposure to Pig in later chapters, but this should give you a good idea of the procedural methods used by Pig to move data through a pipeline and transform it to a useful fi nal state. Of course, you could have written the preceding code as a Pig script and saved the text fi le and simply called it from Pig. You will learn how to do this in Chapter 8. Summary In this chapter, you learned how to install Hortonworks Data Platform into a single-node cluster, how to confi gure HDInsight Service in Windows Azure, and how to use the tools available in each to quickly validate their installs. You were also introduced to moving data into HDFS and the tools available to move data into your Azure storage. Finally, this chapter gave you a primer in Hive and Pig so that you can quickly evaluate that they are running as expected in your environment. 68 Part III ■ Storing and Managing Big Data Exploring the Hadoop Distributed File System Originally created as part of a web search engine project called Apache Nutch, HDFS is a distributed fi le system designed to run on a cluster of cost-effective commodity hardware. Although there are a number of distributed fi le systems in the marketplace, several notable characteristics make HDFS really stand out. These characteristics align with the overalls goals as defi ned by the HDFS team and are enumerated here: ■ Fault tolerance: Instead of assuming that hardware failure is rare, HDFS assumes that failures are instead the norm. To this end, an HDFS instance consists of multiple machines or servers that each stores part of the data. Because the data is distributed, HDFS can quickly detect faults and failures and subsequently automatically and transparently recover. ■ High throughput: Where most fi le systems strive for low-latency opera- tions, HDFS is more focused on high throughput, even at the expense of latency. This characteristic means that HDFS can stream data to its clients to support analytical processing over large sets of data and favors batch over interactive operations. With forward-looking features like caching and tiered storage, it will no longer be the case that HDFS is not good for interactive operations. ■ Support for large data sets: It’s not uncommon for HDFS to contain fi les that range in size from several gigabytes all the way up to several terabytes and can include data sets in excess of tens of millions of fi les per instance (all accomplished by scaling on cost-effective commodity hardware). ■ Write-once read-many (WORM) principle: This is one of the guiding principles of HDFS and is sometimes referred to as coherency. More simply put, data fi les in HDFS are written and when closed are never updated. This simplifi cation enables the high level of throughput obtained by HDFS. ■ Data locality: In a normal application scenario, a process requests data for a source; the data is then transferred from the source over a network to the requestor who can then process it. This time tested and proven process often works fi ne on smaller data sets. As the size of the data set grows, however, bottlenecks and hotspots begin to appear. Server resources and networks can quickly become overwhelmed as the whole process breaks down. HDFS overcomes this limitation by providing facilities or interfaces Chapter 4 ■ HDFS, Hive, HBase, and HCatalog 69 for applications to move the computation to the data, rather than moving the data to the computation. As one of the more critical pieces of the Hadoop ecosystem, it’s worth spend- ing a little extra time to understand the HDFS architecture and how it enables the aforementioned capabilities. Explaining the HDFS Architecture Before discussing machine roles or nodes, let’s look at the most fundamental concept within HDFS: the block. You may already be familiar with the block concept as it is carried over from the fi le system found on your own computer. Blocks, in this context, are how fi les are split up so that they can be written to your hard drive in whatever free space is available. A lot of functional similarities exist between your fi le system blocks and the HDFS block. HDFS blocks split fi les, some which may be larger than any single drive, so that they can be distributed throughout the cluster and subsequently written to each node’s disk. HDFS blocks are also much larger than those in use on your local fi le system, defaulting to an initial size of 64MB (but often being allocated much larger). Within an HDFS cluster, two types or roles of machines or servers make up what is often referred to as the master/slave architecture. The fi rst, called the NameNode, functions as the master or controller for the entire cluster. It’s responsible for maintaining all the HDFS metadata and drives the entire fi le system namespace operation. There can be only one single NameNode per cluster, and if it is lost or fails, all the data in the HDFS cluster is gone. The second type of role within an HDFS cluster is the DataNode. And although there is only one NameNode, there are usually many DataNodes. These nodes primarily interact with HDFS clients by taking on the responsibility to read and write or store data or data blocks. This makes scaling in your cluster easy, as you simply add additional DataNodes to your cluster to increase capacity. The DataNode is also responsible for replicating data out when instructed to do so by the NameNode (more on HDFS replication shortly). HDFS Read and Write Operations To get a better understanding of how these parts or pieces fit together, Figure 4.1 and Figure 4.2 illustrate how a client reads from and writes to an HDFS cluster. Chapter 4 ■ HDFS, Hive, HBase, and HCatalog 73 client interactions), this chapter introduces only two of those most commonly used on the Microsoft platform: the HDFS command shell and WebHDFS. This is not intended to discount the other available options; instead, it is intended to help you get your feet wet as you get started with HDFS. Hadoop File System (FS) Shell Whether you realized it or not, you’ve already seen the HDFS command shell in action. As you worked through the Hortonworks Data Platform On-Premise introduction, the commands used to both upload and retrieve data from HDFS were all handled by the File System (FS) shell. Figure 4.4: HDFS shell The FS shell is actually part of Hadoop and uses commands that primarily align with UNIX commands. The FS shell commands all take the following format: hadoop fs - The CMD is a specifi c fi le system command, and ARGS are the arguments needed to execute the command. The FS shell commands all use uniform resource indicators (URIs) in the format of [schema]://[authority]/[path]. For HDFS, these paths look like this: hdfs://node/mydata. The hdfs:// prefi x is optional and is often left off for simplicity. 74 Part III ■ Storing and Managing Big Data Table 4.1 lists some of the most common commands and shows usage examples. Table 4.1: Hadoop File System Shell Commands FS COMMAND DESCRIPTION USAGE cp Copies a fi les from a source to a destination hadoop fs -cp /user/hadoop /file1 /user/hadoop/file2 get Copies a fi le to the local fi le system hadoop fs -get /user/hadoop /file localfile ls and lsr Lists the fi les con- taining the specifi ed directory; can be used recursively hadoop fs -ls /user/hadoop/ hadoop fs -lsr /user/hadoop/ mkdir Creates an HDFS directory hadoop fs -mkdir /user/hadoop /dir1 put Copies a fi le from the local fi le system to HDFS hadoop fs -put localfile /user /hadoop/hadoopfile rm and rmr Removes a fi le or directory; can be used recursively hadoop fs -rm /user/hadoop/dir hadoop fs -rmr /user/hadoop/dir WebHDFS The Hadoop FS shell is simple to get started with and straightforward to use for data management operations, but it has one potential weakness: The shell commands are just that, shell commands. Therefore, they require access to a machine with Hadoop installed to execute the commands. NOTE WebHDFS is just one of the many approaches for working with HDFS. There are advantages and disadvantages associated with each option. An alternative to this approach, and one that overcomes this limitation, is WebHDFS. WebHDFS is an HTTP-based REST (Representational State Transfer) API that fully implements the same fi le system commands found in the FS shell. Accessing this API is accomplished by embedding commands into HTTP URL requests and taking advantage of the standard HTTP operations (GET, POST, PUT, DELETE). To better illustrate, consider the following example to open a fi le via WebHDFS: http://:/webhdfs/v1/?op=OPEN Chapter 4 ■ HDFS, Hive, HBase, and HCatalog 75 To deconstruct this URL for better understanding, note the following: ■ The and arguments point to the location of the NameNode. ■ The argument is the fi le being requested. ■ The op querystring parameter passes in the fi le system operation. An HTTP GET request is issued using the example URL. After the path and loca- tion metadata is queried from the NameNode, an HTTP 307 temporary redirect is returned to the requestor, as shown here: HTTP/1.1 307 TEMPORARY_REDIRECT Location: http://:/webhdfs/v1/?op=OPEN... Content-Length: 0 The redirect contains the actual path to the DataNode that hosts the data fi le or block. The client can then follow the redirect to stream the data directly from the source. As previously mentioned, the WebHDFS offers a complete implementation of the FS shell commands, Table 4.2 list some of the fi le system commands and their WebHDFS equivalents. Table 4.2: WebHDFS Access Commands FILE SYSTEM COMMAND WEBHDFS EQUIVALENT mkdir PUT "http://:/?op=MKDIRS" rm DELETE "http://:/webhdfs/ v1/?op=DELETE" ls "http://:/webhdfs/ v1/?op=LISTSTATUS" Now that you are familiar with the basic concepts behind HDFS, let’s look at some of the other functionality that is built on top of HDFS. Exploring Hive: The Hadoop Data Warehouse Platform Within the Hadoop ecosystem, HDFS can load and store massive quantities of data in an effi cient and reliable manner. It can also serve that same data back up to client applications, such as MapReduce jobs, for processing and data analysis. Although this is a productive and workable paradigm with a developer’s background, it doesn’t do much for an analyst or data scientist trying to sort through potentially large sets of data, as was the case with Facebook. 76 Part III ■ Storing and Managing Big Data Hive, often considered the Hadoop data warehouse platform, got its start at Facebook as their analyst struggled to deal with the massive quantities of data produced by the social network. Requiring analysts to learn and write MapReduce jobs was neither productive nor practical. Instead, Facebook developed a data warehouse-like layer of abstraction that would be based on tables. The tables function merely as metadata, and the table schema is projected onto the data, instead of actually moving potentially mas- sive sets of data. This new capability allowed their analyst to use a SQL-like language called Hive Query Language (HQL) to query massive data sets stored with HDFS and to perform both simple and sophisticated summarizations and data analysis. Designing, Building, and Loading Tables If you are familiar with basic T-SQL data defi nition language (DDL) commands, you already have a good head start in working with Hive tables. To declare a Hive table, a CREATE statement is issued similar to those used to create tables in a SQL Server database. The following example creates a simple table using primitive types that are commonly found elsewhere: CREATE EXTERNAL TABLE iislog ( date STRING, time STRING, username STRING, ip STRING, port INT, method STRING, uristem STRING, uriquery STRING, timetaken INT, useragent STRING, referrer STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; Two important distinctions need to be pointed out with regard to the preced- ing example. First, note the EXTERNAL keyword. This keyword tells Hive that for this table it only owns the table metadata and not the underlying data. The opposite of this keyword (and the default value) is INTERNAL, which gives Hive control of both the metadata and the underlying data. The difference between these two options is most evident when the table is dropped using the DROP TABLE command. Because Hive does not own the data for an EXTERNAL table, only the metadata is removed, and the data continues to live on. For an INTERNAL table, both the table metadata and data are deleted. The second distinction in the CREATE statement is found on the fi nal line of the command: ROW FORMAT DELIMITED FIELDS TERMINATED BY ','. This Chapter 4 ■ HDFS, Hive, HBase, and HCatalog 77 command instructs Hive to read the underlying data fi le and split the columns or fi elds using a comma delimiter. This is indicative that instead of storing data in an actual table structure, the data continues to live in its original fi le format. At this point, in this example, although we have defi ned a table schema, no data has been explicitly loaded. To load data after the table is created, you could use the following command to log all of your IIS web server log fi les that exist in the logs directory: load data inpath '/logs' overwrite into table iislog; This demonstration only scratches the surface of the capabilities in Hive. Hive supports a robust set of features, including complex data types (maps, structs, and arrays), partitioning, views, and indexes. These features are beyond the scope of this book, but they certainly warrant further exploration if you intend to use this technology. Querying Data Like the process used previously to create a Hive table, HQL can be subse- quently used to query data out for the purposes of summarization or analysis. The syntax, as you might expect, is almost identical to that use to query a SQL Server database. Don’t be fooled, though. Although the interface looks a lot like SQL, behind the scenes Hive does quite a bit of heavy lifting to optimize and convert the SQL-like syntax to one or more MapReduce jobs that is used to satisfy the query: SELECT * FROM iislog; This simple query, much like its counterparts in the SQL world, simply returns all rows found in the iislog table. Although this is not a sophisticated query, the HQL supports both basic operations such as sorts and joins to more sophisti- cated operations, including group by, unions, and even user-defi ned functions. The following example is a common example of a group by query to count the number of times each URI occurs in the web server logs: SELECT uristem, COUNT(*) FROM iislog GROUP BY (uristem); Confi guring the Hive ODBC Driver A wealth of third-party tools, such as Microsoft Excel, provide advanced analytic features, visualizations, and toolsets. These tools are often a critical part of a data analyst’s day-to-day job, which make the Hive Open Database Connectivity 78 Part III ■ Storing and Managing Big Data (ODBC) driver one of the most important pieces of Hive. You can download the Hive ODBC driver from http://www.microsoft.com/en-us/download/details .aspx?id=40886. It allows any ODBC-compliant application to easily tap into and integrate with your big data store. Confi guring the Hive ODBC driver (see Figure 4.5) is handled using the ODBC Data Sources confi guration tool built in to the Microsoft Windows operating system. After confi guration is complete, Hive tables can be accessed directly from not only analytics tools such as Microsoft Excel but also from other ODBC- compliant tools such as SQL Server Integration Services (SSIS). Figure 4.5: Hive ODBC configuration Exploring HCatalog: HDFS Table and Metadata Management In the previous examples, the source use for our Hive table is an HDFS path. This is common within the Hadoop ecosystem. While referencing these paths directly works fi ne and is perfectly acceptable in many scenarios, what it does is bind your Hive table or Pig job to a specifi c data layout within HDFS. If this data layout were to change during an activity like data maintenance or simply because the size of the data outgrew the initial HDFS organizational structure, your script or job would be broken. This would require you to revisit every script or job that referenced this data, which in large systems could be potentially unpleasant. Chapter 4 ■ HDFS, Hive, HBase, and HCatalog 79 This scenario is just one of the reasons the Apache HCatalog project was created. HCatalog started as an abstraction of the Hive metadata management functionality (currently is part of the larger Apache Hive project) and is intended to allow for shared metadata across the Hadoop ecosystem. Table defi nitions and even data type mappings can be created and shared, so users can work with data stored in HDFS without worrying about the underly- ing details such as where or how the data is stored. HCatalog currently works with MapReduce, Hive, and of course, Pig; and as an abstraction of the Hive platform, the syntax for creating tables is identical, except that we have to specify the data location during creation of the table: CREATE EXTERNAL TABLE iislog ( date STRING, time STRING, username STRING, ip STRING, port INT, method STRING, uristem STRING, uriquery STRING, timetaken INT, useragent STRING, referrer STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION '/data/logs'; Our iislog table can now be referenced directly one or more times in Hive by simply using the table name, as seen previously. Because HCatalog is integrated across platforms, the same table can also be referenced in a Pig job. Let’s look fi rst at an example of a simple Pig Latin script that references the data location directly and includes the column schema defi nition: A = load '/data/logs' using PigStorage() as (date:chararray, time:chararray, username:chararray, ip:chararray, port:int, method:chararray, uristem:chararray, uriquery:chararray, timetaken:int, useragent:chararry, referrer:chararray); You can compare and contrast the code samples to see how HCatalog simpli- fi es the process by removing both the data storage location and schema from the script: A = load 'iislog' using HCatLoader(); In this example, if the underlying data structure were to change and the location of the logs were moved from the /data/logs path to /archive/2013/ weblogs, the HCatalog metadata could be updated using the ALTER statement. Chapter 4 ■ HDFS, Hive, HBase, and HCatalog 81 Columnar Databases If you are familiar with relational database systems, you are without a doubt familiar with the traditional column and row layout used. To demonstrate the differences, let’s look at a concrete example. Consider the entity-relationship diagram (ERD) shown in Figure 4.6. It uses a pretty common approach to model a one-to-many relationship between cus- tomers and addresses. Like you’ve probably been taught throughout the years, it is highly normalized and follows good relational database design principles. Figure 4.7 illustrates a populated customer and address model based on the ERD found in Figure 4.6. Figure 4.6: Entity-relationship diagram Figure 4.7: Traditional database structure Relational designs and databases do not easily scale and cannot typically handle the volumes, variety, and velocity associated with big data environ- ments. This is where NoSQL databases such as HBase were designed to excel, yet they represent a very different way of thinking about and ultimately stor- ing your data. 82 Part III ■ Storing and Managing Big Data HBase is a columnar database, which means that instead of being organized by rows and columns, it is organized by column families, which are sets of related columns. Restructuring the data presented in Figure 4.7 using the columnar approach results in a layout that although similar is actually very different (see Figure 4.8). Figure 4.8: Columnar database structure The columnar layout has many advantages over a relational model in the context of handling big data, including the following: ■ Can handle very large (even massive) quantities of data through a process known as sharding ■ Allows fl exible data formats that can vary from row to row ■ Typically scales linearly For a more thorough discussion on columnar database capabilities and HBase in general, check out the HBase website at http://hbase.apache.org/. Defi ning and Populating an HBase Table HBase is installed and confi gured for you as part of the Hortonworks Data Platform. You can work with HBase directly from the HBase command shell. To defi ne a table, you specify a table name and the column family or families. In the following example, a basic customer table with a single column family for addresses is created: create 'customer', 'address' Chapter 4 ■ HDFS, Hive, HBase, and HCatalog 83 Take note that no schema or columns were defi ned for the actual table or the column family. This is intentional because the actual columns contained within an address could vary. At this point, you may be questioning why this is a desirable behavior. There are actually a number of use cases; the easiest involves international addresses. Consider a U.S. address that has parts like address1, address2, city, state, and ZIP code. Other regions and countries do not necessarily follow this same format, so the fl exibility is desirable in this scenario. Now, let’s take a quick look at how we put data into our customer table using the put statement: put 'customer', 'row01', 'address:street', '123 Main St.' put 'customer', 'row01', 'address:city', 'Tampa' put 'customer', 'row01', 'address:state', 'Florida' put 'customer', 'row01', 'address:country', 'United States of America' put 'customer', 'row01', 'address:zip', '34637' The format of this command specifi es the table name (customer), the row identifi er (row01), the column family and column name (address:[xxx]), and fi nally the value. When the commands complete, the data is available for query operations. NOTE To drop an HBase table, you must fi rst disable it, and then drop it. The syntax to perform this operation is as follows: disable 'customer' drop 'customer' Using Query Operations To retrieve the data from the HBase table you just created, there are two fun- damental methods available through the HBase shell. The scan command indiscriminately reads the entire contents of your table and dumps it to the console window: scan 'customer' For small tables, this command is useful for verifying that the table is con- fi gured and set up correct. When working with a larger table, it is preferable to use a more targeted query. The get command accomplishes this: get 'customer' 'row01' To use the get command, you specify the table name (customer) followed by the row key (row01). This returns all the column families and associated columns for the given key. 84 Part III ■ Storing and Managing Big Data A more thorough discussion of HBase is beyond the scope of this book, but it’s worth noting that HBase supports updating and deleting rows. There are also robust and fully functional Java and REST APIs available for integrating HBase outside the Hadoop ecosystem. Summary Hadoop offers a number of different methods or options for both storing and retrieving your data for processing and analytical purposes. HDFS is the Hadoop fi le system that provides a reliable and scalable platform for hosting your data. HDFS is also well suited to serve your data to other tools within the Hadoop ecosystem, like MapReduce and even Pig jobs. Hive, the Hadoop Data Warehousing platform, is also built on top of HDFS. Using the HQL, you can easily project a schema onto your data and then use SQL-like syntax to summarize and analyze your data. To build more robust solutions, you can implement HCatalog to abstract the details of how your big data is stored. HCatalog provides table and metadata management that is built on top of Hive and integrates well into the Hadoop stack of tools. Finally, when it becomes necessary to provide real-time, random read/write capabilities to your data, the Apache HBase project provides a columnar NoSQL database implementation also built on top of HDFS. Together, these tools and platforms give you many different options to tackle a number of different use cases in the diverse big data space. 86 Part III ■ Storing and Managing Big Data without understanding its internal workings, you can use HDFS without wor- rying about the low-level details. Understanding the Fundamentals of HDFS HDFS’s origin can be traced back to the Google File System (GFS) (http:// research.google.com/archive/gfs.html), which was designed to handle Google’s needs for storing and processing large amounts of data. Google released a paper on its fi le system in 2003, and when Doug Cutting began working on Hadoop, he adopted the GFS approach for HDFS. The design goals for HDFS were threefold: ■ To enable large-scale data sets to be stored and processed ■ To provide reliable data storage and be fault tolerant ■ To provide support for moving the computation to the data HDFS addresses these goals well and offers a number of architectural features that enable these goals to be met. These goals also logically lead to some constraints, which HDFS had to address. (The next section breaks down these features and how they address the goals and constraints of a distributed fi le system designed for large data sets.) HDFS is implemented using the Java language. This makes it highly portable because modern operating systems offer Java support. Communication in HDFS is handled using Transmission Control Protocol/Internet Protocol (TCP/IP), which also enables it to be very portable. In HDFS terminology, an installation is usually referred to as a cluster. A cluster is made up of individual nodes. A node is a single computer that par- ticipates in the cluster. So when someone refers to a cluster, it encompasses all the individual computers that participate in the cluster. HDFS runs beside the local fi le system. Each computer still has a standard fi le system available on it. For Linux, that may be ext4, and for a Windows server, it’s usually going to be New Technology File System (NTFS). HDFS stores its fi les as fi les in the local fi le system, but not in a way that enables you to directly interact with them. This is similar to the way SQL Server or other relational database management systems (RDBMSs) use physical fi les on disk to store their data: While the fi les store the data, you don’t manipulate the fi les directly. Instead, you go through the interface that the database provides. HDFS uses the native fi le system in the same way; it stores data there, but not in a directly useable form. HDFS uses a write-once, read-many access model. This means that data, once written to a fi le, cannot be updated. Files can be deleted and then rewritten, but not updated. Although this might seem like a major limitation, in practice, Chapter 5 ■ Storing and Managing Data in HDFS 87 with large data sets, it is often much faster to delete and replace than to perform in-place updates of data. Now that you have a better understanding of HDFS, in the next sections you look at the architecture behind HDFS, learn about NameNodes and DataNodes, and fi nd out about HDFS support for replication. HDFS Architecture Traditionally, data has been centralized rather than spread out. That worked well over the past few decades, as the capability to store ever-increasing amounts of data on a single disk continued to grow. For example, in 1981, you could purchase hard drives that stored around 20MB at a cost of approximately $ 180 per MB. By 2007, you could get a drive that stored 1TB at cost of about $ 0.0004 per MB. Today, storage needs in big-data scenarios continue to outpace the capacity of even the largest drives (4TB). One early solution to this problem was simply to add more hard drives. If you wanted to store 1 petabyte (1,024TB) of informa- tion, you would need 256 4TB hard drives. However, if all the hard drives were placed in the same server, it introduced a single point of failure. Any problem that affected the server could mean the drives weren’t accessible, and so the data on the drives could be neither read nor written. The single computer could also introduce a performance bottleneck for access to the hard drives. HDFS was designed to solve this problem by supporting distribution of the data storage across many nodes. Because the data is spread across multiple nodes, no single computer becomes a bottleneck. By storing redundant copies of the information (discussed in more detail in the section “Data Replication”), a single point of failure is also removed. This redundancy also enables the use of commodity hardware. (Commodity means nonspecialized, off-the-shelf components.) Special hardware or a unique confi guration is not needed for a computer to participate in an HDFS cluster. Commodity hardware tends to be less expensive than more specialized com- ponents and can be acquired at a wider variety of vendors. Many of today’s server-class computers include a number of features designed to minimize downtime. This includes things like redundant power supplies, multiple network interfaces, and hard drive controllers capable of managing pools of hard drives in redundant arrays of independent/inexpensive disk (RAID) setups. Thanks to the data redundancy inherent in HDFS, it can minimize the need for this level of hardware, allowing the use of less-expensive computers. NOTE Just because HDFS can be run on commodity hardware and manages redun- dancy doesn’t mean that you should ignore the reliability and performance of the computers used in an HDFS cluster. Using more reliable hardware means less time spent replacing broken components. And, just like any other application, HDFS will 88 Part III ■ Storing and Managing Big Data benefi t from more computing resources to work with. In particular, NameNodes (dis- cussed next in the “NameNodes and DataNodes” section) benefi t from reliable hard- ware and high-performing components. As already stated, the data being stored in HDFS is spread out and replicated across multiple machines. This makes the system resilient to the failure of any individual machine. Depending on the level of redundancy confi gured, the system may be able to withstand the loss of multiple machines. Another area where HDFS enables support for large data sets is in computa- tion. Although HDFS does not perform computation directly, it does support moving the computations closer to the data. In many computer systems, the data is moved from a server to another computer, which performs any needed computations. Then the data may be moved back to the original server or moved to yet another server. This is a common pattern in applications that leverage a relational database. Data is retrieved from a database server to a client computer, where the applica- tion logic to update or process the data is applied. Finally, the data is saved to the database server. This pattern makes sense when you consider that the data is being stored on a single computer. If all the application logic were performed on the database server, a single computationally intensive process could block any other user from performing his or her work. By offl oading application logic to client computers, it increases the database server’s capability to serve data and spreads the computation work across more machines. This approach works well for smaller data sets, but it rapidly breaks down when you begin dealing with data sets in the 1TB and up range. Moving that much data across the network can introduce a tremendous amount of latency. In HDFS, though, the data is spread out over many computers. By moving the computations closer to the data, HDFS avoids the overhead of moving the data around, while still getting the benefi t of spreading computation over a larger number of computers. Although HDFS is not directly responsible for performing the computation work, it does provide an interface that allows applications to place their com- puting tasks closer to the data. MapReduce is a prime example of this. It works hand in hand with HDFS to distribute computational tasks to DataNodes so that the work can be performed as close to the data as possible. Only the results of the data actually need to be transmitted across the network. NOTE You can think of moving the computation closer to the data as being similar to the way that many relational database engines optimize queries by moving fi lter criteria as close to the original data as possible. For example, if you are joining two Chapter 5 ■ Storing and Managing Data in HDFS 89 tables, and then fi ltering results by a column from one table, the engine will often move that fi lter to the initial retrieval of data from the table, thus reducing the amount of data it has to join with the second table. NameNodes and DataNodes HDFS uses two primary types of nodes. A NameNode acts as a master node, managing the fi le system and access to fi les from clients. DataNodes manage the physical storage of data. Each HDFS cluster has a single NameNode. The NameNode acts as the coor- dinator for the cluster and is responsible for managing and communicating to all the DataNodes in a cluster. It manages all the metadata associated with the cluster. One of its primary responsibilities is to manage the fi le system namespace, which is the layer that presents the distributed data stored in HDFS as though it is in a traditional fi le system organized as folders and fi les. The NameNode manages any fi le system namespace operations, such as creating or deleting fi les and folders. The fi le system namespace presents the appearance that the data is stored in a folder/fi le structure, but the data is actually split into multiple blocks that are stored on DataNodes. The NameNode controls which blocks of data map to which DataNodes. These blocks of data are usually 64MB, but the setting is confi gurable. The DataNode is responsible for the creation of blocks of data in its physical storage and for the deletion of those blocks. It is also responsible for creation of replica blocks from other nodes. The NameNode coordinates this activity, telling the DataNode what blocks to create, delete, or replicate. DataNodes communicate with the NameNode by sending a regular “heartbeat” communication over the network. This heartbeat indicates that the DataNode is operating correctly. A block report is also delivered with the heartbeat and provides a list of all the blocks stored on the DataNode. The NameNode maintains a transaction history of all changes to the fi le sys- tem, known as the EditLog. It also maintains a fi le, referred to as the FsImage, that contains the fi le system metadata. The FsImage and EditLog fi les are read by the NameNode when it starts up, and the EditLog’s transaction history is applied to the FsImage. This brings the FsImage up-to-date with the latest changes recorded by the NameNode. Once the FsImage is updated, it is written back to the fi le system, and the EditLog is cleared. At this point, the NameNode can begin accepting requests. This process (shown in Figure 5.1) is referred to as checkpointing, and it is run only on startup. It can have some performance impact if the NameNode has accumulated a large EditLog. 90 Part III ■ Storing and Managing Big Data NameNode Startup Apply Transactions to Fslmage Save Fslmage Clear EditLog Start Accepting Requests Read Fslmage Read EditLog Transactions Figure 5.1: The checkpointing process The NameNode is a crucial component of any HDFS cluster. Without a func- tioning NameNode, the data cannot be accessed. That means that the NameNode is a single point of failure for the cluster. Because of that, the NameNode is one place that using a more fault-tolerant hardware setup is advisable. In addition, setting up a Backup node may help you recover more quickly in the event of a NameNode failure. The Backup node maintains its own copy of the FsImage and EditLog. It receives all the fi le system transactions from the NameNode and uses that to keep its copy of the FsImage up to date. If the NameNode fails catastrophically, you can use the Backup node’s copy of the FsImage to start up a new NameNode more quickly. NOTE Despite their name, Backup nodes aren’t a direct backup to a NameNode. Rather, they manage the checkpointing process and retain a backup copy of the FsImage and EditLog. A NameNode cannot fail over to a Backup node automatically. NOTE Hadoop 2.0 includes several improvements for improving the availability of NameNodes, with support for Active and Standby NameNodes. These new options will make it much easier to have a highly available HDFS cluster. Data Replication One of the critical features of HDFS is its support for data replication. This is critical for creating redundancy in the data, which allows HDFS to be resilient to the failure of one or more nodes. Without this capability, HDFS would not Chapter 5 ■ Storing and Managing Data in HDFS 91 be reliable to run on commodity hardware, and as a result, would require sig- nifi cantly more investment in highly available servers. Data replication also enables better performance for large data sets. By spread- ing copies of the data across multiple nodes, the data can be read in parallel. This enables faster access and processing of large fi les. By default, HDFS replicates every fi le three times. However, the replication level can also be specifi ed per fi le. This can prove useful if you are storing transient data or data that can be re-created easily. This type of data might not be replicated at all, or only replicated once. The fi le is replicated at the block level. Therefore, a single fi le may be (and likely is) made up of blocks stored on multiple nodes. The replicas of these blocks may be stored on still more nodes. Replicas are created as the client writes the data to HDFS. The fi rst DataNode to receive the data gets it in small chunks. Each chunk is written to the DataNode’s local storage and then transferred to the next DataNode. The receiving DataNode carries out the same process, forwarding the processed chunks to the next DataNode. That process is repeated for each chunk, for each DataNode, until the required number of replicas has been created. Because a node can be receiving a chunk to process at the same time that it is sending another chunk to the next node, the process is said to be pipelined. A key aspect of the data replication capabilities in HDFS is that the replica placement is optimized and is continuing to be improved. The replication pro- cess is rack aware; that is, it understands how the computers are physically organized. For data centers with large numbers of computers, it is common to use network racks to hold the computers. Often, each rack has its own network switch to handle network communication between computers in the rack. This switch would then be connected to another switch, which is connected to other network racks. This means that communications between computers in the same rack is generally going to be faster than communications between computers in different racks. HDFS uses its rack awareness to optimize the placement of replicas within a cluster. In doing so, it balances the need for performance with the need for availability in the case of a hardware failure. In the common scenario, with three replicas, one replica is stored on a node in the local rack. The other two replicas will be stored in a remote rack, on two different nodes in the rack. This approach still delivers good read performance, because a client reading the fi le can access two unique racks (with their own network connection) for the contents of the fi le. It also delivers good write performance, because writing a replica to a node in the same rack is signifi cantly faster than writing it to a node in a different rack. This also balances availability; the replicas are located in two separate racks and three nodes. Rack failures are less common than node 92 Part III ■ Storing and Managing Big Data failures, so replicating across fewer racks doesn’t have an appreciable impact on availability. NOTE The replica placement approach is subject to change, as the HDFS develop- ers consider it a work in progress. As they learn more about usage patterns, they plan to update the policies to deliver the optimal balance of performance and availability. HDFS monitors the replication levels of fi les to ensure the replication factor is being met. If a computer hosting a DataNode were to crash, or a network rack were taken offl ine, the NameNode would fl ag the absence of heartbeat messages. If the nodes are offl ine for too long, the NameNode stores forwarding requests to them, and it also checks the replication factors of any data blocks associated with those nodes. In the event that the replication factor has fallen below the threshold set when the fi le was created, the NameNode begins replication of those blocks again. Using Common Commands to Interact with HDFS This section discusses interacting with HDFS. Even though HDFS is a distributed fi le system, you can interact with it in a similar way as you do with a traditional fi le system. However, this section covers some key differences. The command examples in the following sections work with the Hortonworks Data Platform environment setup in Chapter 3, “Installing HDInsight.” Interfaces for Working with HDFS By default, HDFS includes two mechanisms for working with it. The primary way to interact with it is by the use of a command-line interface. For status checks, reporting, and browsing the fi le system, there is also a web-based interface. Hadoop is a Java script that can run several modules of the Hadoop system. The two modules that are used for HDFS are dfs (also known as FsShell) and dfsadmin. The dfs module is used for most common fi le operations, such as add- ing or moving fi les. The dfsadmin module is used for administrative functions. You can open the command prompt on your Windows Hortonworks Data Platform (HDP) server by double-clicking the Hadoop Command Line shortcut on the desktop or by running cmd.exe from the Start menu. If you start from the Hadoop Command Line shortcut, it will set your current directory to the Hadoop location automatically, as shown in Figure 5.2. Chapter 5 ■ Storing and Managing Data in HDFS 93 Figure 5.2: The HDFS command prompt When running commands from the command-line interface, you must use the following format: hadoop MODULE -command arguments The command line starts with hadoop, the executable that will interpret the remaining items on the command line. MODULE designates which Hadoop module should be run. Recall that when interacting with the HDFS fi le system this will be dfs or dfsadmin. -command indicates the specifi c command in the module that should be run, and the arguments are any specifi c values necessary to execute the command successfully. NOTE You can usually fi nd the full list of commands supported by any module by running the module with no command, as follows: hadoop dfs The same holds true for commands. Running a command without arguments lists the help for that command: hadoop dfs -put 94 Part III ■ Storing and Managing Big Data The web interface is useful for viewing the status of the HDFS cluster. However, it does not support making modifi cations to either the cluster or to the fi les contained in it. As shown in Figure 5.3, the information provided includes the current space used by the cluster and how much space is still available. It also includes information on any unresponsive nodes. It enables you to browse the fi le system, as well, by clicking the Browse the fi le system link. Figure 5.3: Web interface for HDFS File Manipulation Commands Most direct interaction with HDFS involves fi le manipulation—creating, delet- ing, or moving fi les. Remember that HDFS fi les are write-once, read-many, so there are no commands for updating fi les. However, you can manipulate the metadata associated with a fi le, such as the owner or group that has access to it. NOTE Unlike some fi le systems, HDFS does not have the concept of a working direc- tory or a cd command. Most commands require that you provide a complete path to the fi les or directory you want to work with. 96 Part III ■ Storing and Managing Big Data HDFS using the put command. This example uses the sample data fi les created in this chapter: hadoop dfs -put C:\MSBigDataSolutions\SampleData1.txt /user/MSBigDataSolutions This command loads a single fi le from the local fi le system (C:\MSBigData Solutions\SampleData1.txt) to a directory in HDFS (/user/MSBigDataSolutions). You can use the following command to verify the fi le was loaded correctly: hadoop dfs -ls /user/MSBigDataSolutions put can load multiple fi les to HDFS simultaneously. You do that by using a folder as the source path, in which case all fi les in the folder are uploaded. You can also do so by using wildcards in the source system path: hadoop dfs -put C:\MSBigDataSolutions\SampleData_* /user/MSBigDataSolutions Two other commands are related to put. copyFromLocal works exactly like the put command, and is simply an alias for it. moveFromLocal also functions like put, with the difference that the local fi le is deleted after the specifi ed fi le(s) are loaded into HDFS. Once the fi les are in HDFS, you have a couple of ways to retrieve them. One option is the cat command. cat displays the contents of the fi le to the screen, or it can be redirected to another output device: hadoop dfs -cat /user/MSBigDataSolutions/SampleData1.txt You can also use the text command to display information. The only differ- ence is that text attempts to convert the fi le to a text format before displaying it. However, because most data in HDFS is text already, cat will usually work. To get the contents of a fi le back to the local fi le system from HDFS, use the get command: hadoop dfs -get /user/MSBigDataSolutions/SampleData1.txt C:\MSBigDataSolutions\Output Just like the put command, get can work with multiple fi les simultaneously, either by specifying a folder or a wildcard: hadoop dfs -get /user/MSBigDataSolutions/SampleData_* C:\MSBigDataSolutions\Output get also has two related commands. copyToLocal works exactly like the get command and is simply an alias for it. moveToLocal also functions like get, with the difference that the HDFS fi le will be deleted after the specifi ed fi le(s) are copied to the local fi le system. Chapter 5 ■ Storing and Managing Data in HDFS 97 Copying and moving fi les and directories within HDFS can be done with the cp and mv commands, respectively: hadoop dfs -cp /user/MSBigDataSolutions /user/Backup hadoop dfs -mv /user/MSBigDataSolutions /user/Backup2 You can delete a fi le in HDFS with the rm command. rm does not remove directories, though. For that, you must use the rmr command: hadoop dfs -rm /user/MSBigDataSolutions/SampleData1.txt hadoop dfs -rmr /user/MSBigDataSolutions NOTE HDFS is case sensitive. mydirectory and MYDIRECTORY are treated by HDFS as two separate directories. Because HDFS automatically creates directories for you when using some commands (the put, cp, and mv commands, for example), pay- ing attention to case is important, as it can be easy to accidentally create directories. rmr is useful to clean up these directories. Administrative Functions in HDFS Security in HDFS follows a straightforward model, where fi les have an owner and a group. A given fi le or directory maintains permissions for three scenarios, which it checks in the following order: 1. The owner identity. This is a single-user account, and it tracks the assigned owner of the fi le or directory. If the user account accessing the fi le matches the owner identity, the user gets the owner’s assigned permissions. 2. The group identity. This is a group account, and any user account with membership in the group gets the group permissions. 3. All other users. If the user account does not match the owner identity, and the user account is not a member of the group, the user will use these permissions. NOTE Permissions in Azure Data Storage work a bit diff erently. Permissions are managed in the storage account. The HDInsight cluster has full permissions to all containers in the storage account that is set as its default storage. It can also access containers in other storage accounts. If the target container is the public container, or it has the public-access level, the HDInsight cluster will have read-access without addi- tional confi guration. If the target container uses the private-access level, however, you have to update the core-site.xml within the HDInsight cluster to provide the key to access the container. 98 Part III ■ Storing and Managing Big Data In the current release of HDFS, the host operating system manages user identity. HDFS uses whatever identity the host reports to determine the user’s identity. On a Windows server, the user identity reported to HDFS will be the equivalent of the whoami command from a command prompt. The group membership will be the same groups as reported by running net user [username] command. NOTE HDFS also has a super-user account. The super user has access to view and modify everything, because permission checks are eff ectively bypassed for the super user. The account used to start the NameNode process is always set as the super user. Note that if the NameNode process is started under a diff erent user identity, that account will then be the super user. This can be convenient for development pur- poses, because a developer starting a local NameNode to work against will automati- cally be the super user. However, for a production environment, a consistent, secured account should be used to start the NameNode. Files and folders support a simple set of permissions: ■ Read (r) permission: An account has permission to read the contents of the fi le or directory. ■ Write (w) permission: An account has permission to change or modify the fi le or folder. ■ Execute (x) permission: An account can enumerate the contents of a directory. This permission applies to directories only. To modify the permissions applied to a fi le, you can use the chmod com- mand. To add permissions, use the plus (+) sign followed by the appropriate permission letters. For example, to give all users read/write permissions to the SampleData_4.txt fi le, you use the following command: hadoop dfs -chmod +rw /user/MSBigDataSolutions/SampleData_4.txt To remove the permissions, use the minus ( -) sign: hadoop dfs -chmod -rw /user/MSBigDataSolutions/SampleData_4.txt To control which user the permissions apply to, you can prefi x the permis- sion with u, g, or o, which respectively stand for the user who owns the fi le, the group assigned to the fi le, or all other users. The following command adds read/write permissions back, but only to the owner of the fi le: hadoop dfs -chmod u+rw /user/MSBigDataSolutions/SampleData_4.txt You can modify the owner and group associated with a fi le or directory by using chown and chgrp, respectively. To change the owner, you must be running the command as the super-user account: hadoop dfs -chown NewOwner /user/MSBigDataSolutions/SampleData_4.txt Chapter 5 ■ Storing and Managing Data in HDFS 99 To change the group associated with a fi le or a directory, you must be either the current owner or the super user: hadoop dfs -chgrp NewGroup /user/MSBigDataSolutions/SampleData_4.txt You can apply all the preceding commands recursively to a directory structure by using -R as an argument to the command. This applies permissions to be changed easily for a large group of fi les. The following command applies the read/write permission to all fi les in the MSBigDataSolutions folder: hadoop dfs -chmod -R +rw /user/MSBigDataSolutions NOTE The chmod, chown, and chgrp commands are common commands in UNIX-based systems, but are not found on the Windows platform. HDFS implements versions of these commands internally, and for the most part, they function like their UNIX counterparts. chmod, in particular, supports a number of options, such as speci- fying the permission set in octal notation, that aren’t immediately obvious. You can fi nd more documentation on advanced uses of chmod at http://en.wikipedia. org/wiki/Chmod. Managing deleted fi les in HDFS is normally transparent to the user. However, in some cases, it can require intervention by an administrator. By default, deleting fi les from HDFS does not result in immediate removal of the fi le. Instead, the fi le is moved to the /trash folder. (If a fi le is deleted accidentally, it can be recovered by simply moving it back out of the /trash folder using the -mv command.) By default, the /trash folder is emptied every six hours. To explicitly manage the process, you can run the expunge command to force the trash to be emptied: hadoop dfs -expunge You can access other administrative functions in HDFS through the dfsad- min module. dfsadmin can be used for a number of different activities, most of which are fairly uncommon. One useful command it offers is report. This com- mand returns a summary of the space available to the HDFS cluster, how much is actually used, and some basic indicators of replication status and potential corruption. Here is an example: hadoop dfsadmin -report When you need to manipulate the available nodes for maintenance, a useful command is refreshNodes, which forces the name node to reread the list of available DataNodes and any exclusions: hadoop dfsadmin -refreshNodes Generally, HDFS will correct any fi le system problems that it encounters, assuming that the problem is correctable. In some cases, though, you might want to explicitly check for errors. In that case, you can run the fsck command. 100 Part III ■ Storing and Managing Big Data This checks the HDFS fi le system and reports any errors back to the user. You can also use the fsck command to move corrupted fi les to a specifi c folder or to delete them. This command runs the fi le system check on the /user directory: hadoop fsck /user Overall, HDFS is designed to minimize the amount of administrative over- head involved. This section has focused on the core pieces of administrative information to provide you with enough information to get up and running without overwhelming you. For more details on administering it, you may want to review the document at http://hadoop.apache.org/docs/stable/ hadoop-project-dist/hadoop-hdfs/HdfsDesign.html. Moving and Organizing Data in HDFS HDFS manages the data stored in the Hadoop cluster without any necessary user intervention. In fact, a good portion of the design strategies used for HDFS were adopted to support that goal: a system that minimizes the amount of adminis- tration that you need to be concerned with. If you will be working with small clusters, or data on the smaller end of big data, you can safely skip this section. However, there are still scenarios in Hadoop where you can get better perfor- mance and scalability by taking a more direct approach, as this section covers. Moving Data in HDFS As your big data needs grow, it is not uncommon to create additional Hadoop clusters. Additional clusters are also used to keep workloads separate and to man- age single-point-of-failure concerns that arise from having a single NameNode. But what happens if you need access to the same data from multiple clusters? You can export the data, using the dfs -get command to move it back to a local fi le system and the dfs -put command to put into the new cluster. However, this is likely to be slow and take a large amount of additional disk space during the copying process. Fortunately, a tool in HDFS makes this easier: distcp (Distributed Copy). distcp enables a distributed approach to copying large amounts of data. It does this by leveraging MapReduce to distribute the copy process to multiple DataNodes in the cluster. The list of fi les to be copied is placed in a list, along with any related directories. Then the list is partitioned by the available nodes, and each node becomes responsible for copying its assigned fi les. distcp can be executed by running the distcp module with two arguments: the source directory and the target directory. To reference a different cluster, you use a fully qualifi ed name for the NameNode: Chapter 5 ■ Storing and Managing Data in HDFS 101 hadoop distcp hdfs://mynamenode:50010/user/MSBigDataSolutions \ hdfs://mybackupcluster:50010/user/MSBigDataSolutions distcp can also be used for copying data inside the same cluster. This is useful if you need to copy a large amount of data for backup purposes. NOTE If you are using HDInsight with ASV, you will not have as much need to move data between clusters. That is because containers in Azure Storage can be shared between clusters; there’s no need to copy it. However, you may still need to copy data from one container to another. You can do this from the Azure Storage Explorer (http://azurestorageexplorer.codeplex.com) if you would like a graphical user interface (GUI). You can also use the same HDFS commands (including distcp) to work with ASV; just use the appropriate qualifi er and reference to the container (for example, asv:///MyAsvContainer/MyData/Test.txt). Implementing Data Structures for Easier Management HDFS, being a fi le system, is organized into directories. Many commands work with directories as well as with fi les, and a number of them also support the -R parameter for applying the command recursively across all child directories. Security can also be managed more easily for folders than for individual fi les. Given this, it is very effective to map your data fi les into a folder structure that refl ects the use and segmentation of the data. Using a hierarchical folder structure that refl ects the source, usage, and application for the data supports this. Consider, for example, a company that manages several websites. The website traffi c logs are being captured into HDFS, along with user activity logs. Each activity log has its own distinct format. When creating a folder structure for storing this information, you would consider whether it is more important to segment the data by the site that originated it or by the type of data. Which aspect is more important likely depends on the business needs. For this example, sup- pose that the originating site is the most critical element, because this company keeps their website information heavily separated and secured. You might use a folder structure like this one: /user/CompanyWebsiteA/sitelogs /user/CompanyWebsiteA/useractivity /user/CompanyWebsiteB/sitelogs /user/CompanyWebsiteB/useractivity ... By structuring the folders in this manner, you can easily implement security for each folder at the website level, to prevent unauthorized access. Conversely, if security were not a critical element, you might choose to reverse the order 102 Part III ■ Storing and Managing Big Data and store the format of the data fi rst. This would make it easier to know what each folder contains and to do processing that spans all websites. NOTE ASV doesn’t support a directory hierarchy. Instead, you have a container, and it stores key/value pairs for the data. However, ASV does allow the forward slash (/) to be used inside a key name (for example, CompanyWebsiteA/sitelogs/ sitelog1.txt). By using the forward slash, the key keeps the appearance of a folder-based structure. You can easily modify the folder structures by using the dfs -cp and -mv commands. This means that if a particular folder structure isn’t working out, you can try new ones. Rebalancing Data Generally, HDFS manages the placement of data across nodes very well. As discussed previously, it attempts to balance the placement of data to ensure a combination of reliability and performance. However, as more data is added to a Hadoop cluster, it is normal to add more nodes to it. This can lead to the cluster being out of balance; that is, some nodes have more data and, therefore, more activity than other nodes. It can also lead to certain nodes having most of the more recently added data, which can create some issues, because newly added data is often more heavily accessed. You can rebalance the cluster by using the balancer tool, which is simple to use. It takes one optional parameter, which defi nes a threshold of disk usage variance to use for the rebalancing process. The default threshold level is 10%, as shown here: hadoop balancer -threshold .1 The balancer determines an average space utilization across the cluster. Nodes are considered over- or underutilized if their space usage varies by more than the threshold from the average space utilization. The balance runs until one of the following occurs: ■ All the nodes in the cluster have been balanced. ■ It has exceeded three iterations without making progress on balancing. ■ The user who started the balancer aborts it by pressing Ctrl+C. Balancing the data across nodes is an important step to maintaining the performance of the cluster, and it should be carried out whenever there are signifi cant changes to the nodes in a cluster. Chapter 5 ■ Storing and Managing Data in HDFS 103 Summary In this chapter, the background of the HDFS fi le system has been covered, along with some of the underlying details, including how NameNodes and DataNodes interact to store information in HDFS. The basic commands for working with and administering an HDFS fi le system—such as ls for listing fi les, get and put for moving fi les in and out of HDFS, and rm for removing unnecessary fi les—have been covered. In addition, some advanced administrative topics, like balancing and data movement, which are important for maintaining your HDFS cluster, have been covered. In the next chapter, these topics will be built on with a discussion of how the Hive application runs on top of the HDFS fi le system while presenting the appearance of a traditional RDBMS to applications. 106 Part III ■ Storing and Managing Big Data different or restrictive compared to a relational database. It’s important to remem- ber that Hive attempts to bridge some of the gap between Hadoop Distributed File System (HDFS) data store and the relational world, while providing some of the benefi ts of both technologies. By keeping that perspective, you’ll fi nd it easier to understand how and why Hive functions as it does. Hive is not a full relational database, and limitations apply to the relational database management system (RDBMS) functionality it supports. The differences that are most likely to impact someone coming from a relational perspective are covered in this chapter. Although complete coverage of the administration and confi guration of Hive is beyond the scope for this chapter, the discussion here does include basic commands for creating tables and working with data in Hive. Understanding Hive’s Purpose and Role Hadoop was developed to handle big data. It does an admirable job of this; but in creating a new platform to solve this problem, it introduced a new chal- lenge: people had to learn a new and different way to work with their data. Instead of using Structured Query Language (SQL) to retrieve and transform data, they had to use Java and MapReduce. Not only did this mean that data professionals had to learn a new skillset, but also that the SQL query tools that IT workers and business users traditionally used to access data didn’t work against Hadoop. Hive was created to address these needs and make it easier for people and tools to work with Hadoop data. It does that by acting as an interpreter for Hadoop; you give Hive instructions in Hive Query Language (HQL), which is a language that looks very much like SQL, and Hive translates that HQL into MapReduce jobs. This opens up Hadoop data to tools and users that understand SQL. In addition to acting as a translator, Hive also answers another common challenge with data in Hadoop. Files stored in Hadoop do not have to share a common data format. They can be text fi les delimited by commas, control characters, or any of a wide variety of characters. It’s not even necessary that they be delimited text fi les. They can be fi les that use binary format, XML, or any of a combination of different formats. Hive enables you to deliver the data to users in a way that adheres to a defi ned schema or format. Hive addresses these issues by providing a layer on top of Hadoop data that resembles a traditional relational database. In particular, Hive is designed to support the common operations for data warehousing scenarios. NOTE Although Hive looks like a relational database, with tables, columns, indexes, and so on, and much of the terminology is the same, it is not a relational database. Chapter 6 ■ Adding Structure with Hive 107 Hive does not enable referential integrity, it does not enable transactions, and it does not grant ACID (atomicity, consistency, isolation, and durability) properties to Hadoop data stores. Providing Structure for Unstructured Data Users and the tools they use for querying data warehouses generally expect tabular, well-structured data. They expect the data to be delivered in a row/ column format, and they expect consistency in the data values returned. Take the example of a user requesting a data set containing all the sales transac- tions for yesterday. Imagine the user’s reaction if some rows in the data set contained 10 columns, some contained 8 columns, and some contained 15. The user would also be very surprised to fi nd that the unit cost column in the data set contained valid numeric values on some rows, and on others it might contain alpha characters. Because Hadoop data stores don’t enforce a particular schema, this is a very real scenario when querying Hadoop. Hive helps with this scenario by enabling you to specify a schema of columns and their types for the information. Then, when the data is queried through Hive, it ensures that the results conform to the expected schema. These schemas are declared by creating a table. The actual table data is stored as fi les in the Hadoop fi le system. When you request data from the table, Hive translates that request to read the appropriate fi les from the Hadoop fi le system and returns the data in a format that matches the table defi nition provided. The table defi nitions are stored in the Hive metadata store, or metastore. By default, the metastore is an embedded Derby database. This metastore is a relational database that captures the table metadata (the name of the table, the columns and data types it contains, and the format that the underlying fi les are expected to be in). NOTE In a default Hive setup, the Derby database used for the metastore may be confi gured for single-user access. If you are just testing Hive or running a local instance for development, this may be fi ne. However, for Hive implementation in a production environment, you will want to upgrade the metastore to a multiple-user setup using a more robust database. One of the more common databases used for this is MySQL. However, the metastore can be any Java Database Connectivity (JDBC)- compliant database. If you are using the Hortonworks' HDP 1.3 Windows distribution, SQL Server can be used as a supported metastore. Hive v0.11 also includes HiveServer2. This version of Hive improves support for multi-user concurrency and supports additional authentication methods, while providing the same experience as the standard Hive server. Again, for a production environment, HiveServer2 may be a better fi t. The examples used in this chapter run against Hive Server and HiveServer2. 108 Part III ■ Storing and Managing Big Data Another area of difference between Hive and many relational databases is its support for different data types. Due to the unstructured data that it must support, it defi nes a number of data types that you won’t fi nd in a traditional relational database. Hive Data Types Table 6.1 lists the data types supported by Hive. Many of these data types have equivalent values in SQL Server, but a few are unique to Hive. Even for the data types that appear familiar, it is important to remember that Hive is coded as a Java application, and so these data types are implemented in Java. Their behavior will match the behavior from a Java application that uses the same data type. One immediate difference you will notice is that STRING types do not have a defi ned length. This is normal for Java and other programming languages, but is not typical for relational databases. Table 6-1: Hive Data Types TYPE DESCRIPTION EXAMPLES SQL SERVER EQUIVALENT STRING String enclosed by single or double quotation marks. 'John Smith' or "John Smith" varchar(n), nvarchar(n) TINYINT 1-byte signed integer in the range of -128 to 127. 10 tinyint SMALLINT 2-byte signed integer in the range of -32,768 to 32,767. 32000 smallint INT 4-byte signed integer in the range of -2,147,483,648 to 2,147,483,647. 2000000 int BIGINT 8-byte signed inte- ger in the range of -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807. 20000000 bigint BOOLEAN Boolean true or false. TRUE FALSE bit FLOAT 4-byte single-precision fl oat- ing point. 25.189764 real DOUBLE 8-byte double-precision fl oating point. 25.1897645126 float(53) DECIMAL A 38-digit precision number. 25.1897654 decimal, numeric Chapter 6 ■ Adding Structure with Hive 109 TYPE DESCRIPTION EXAMPLES SQL SERVER EQUIVALENT TIMESTAMP UNIX timestamp that can be in one of three forms: Integer: Represents the number of seconds from the UNIX epoch date and time (January 1, 1970 12:00 AM). Floating point: Represents second off set from UNIX epoch with nanosecond precision. String: JDBC-compliant time- stamp format YYYY-MM-DD HH:MM:SS.fffffffff. 123412123 123412123.1234567 '2013-01-01 12:00:00' datetime2 datetime2(7) DATE A date in YYYY-MM-DD format. {{2012-01-01}} date BINARY A series of bytes. binary(n) STRUCT Defi nes a column that con- tains a defi ned set of addi- tional values and their types. struct('John', 'Smith') MAP Defi nes a collection of key/ value pairs. map('first', 'John', 'last', 'Smith') ARRAY Defi nes a sequenced collec- tion of values. array('John', 'Smith') UNION Similar to sql_variant types. They hold one value at a time, but it can be any one of the defi ned types for the column. Varies depending on column sql_variant NOTE DATE types are new for Hive 0.12. The Hortonworks Data Platform (HDP) 1.3 release is built using Hive 0.11, so this data type cannot be used with it yet. The types that are unique to Hive are MAP, ARRAY, and STRUCT. These types are supported in Hive so that it can better work with the denormalized data that is often found in Hadoop data stores. Relational database tables are typically normalized; that is, a row holds only one value for a given column. In Hadoop, though, it is not uncommon to fi nd data where many values are stored in a row for a “column.” This denormalization of the data makes it easier and faster to write the data, but makes it more challenging to retrieve it in a tabular format. Hive addresses this with the MAP, ARRAY, and STRUCT types, which let a devel- oper fl atten out the denormalized data into a multicolumn structure. The details 110 Part III ■ Storing and Managing Big Data of querying this denormalized data are discussed later in this chapter. For now, we will review the data types that support these structures. A STRUCT is a column that contains multiple defi ned fi elds. Each fi eld can have its own data type. This is comparable to structs in most programming languages. In Hive, you can declare a STRUCT for a full name using the following syntax: STRUCT To access the individual fi elds of the STRUCT type, use the column name followed by a period and the name of the fi eld: FullName.FirstName An ARRAY is a column that contains an ordered sequence of values. All the values must be of the same type: ARRAY Because it is ordered, the individual values can be accessed by their index. As with Java and .NET languages, ARRAY types use a zero-based index, so you use an index of 0 to access the fi rst element, and an index of 2 to access the third element. If the preceding Full Name column were declared as an ARRAY, with fi rst name in the fi rst position, middle name in the second position, and last name in the third position, you would access the fi rst name with index 0 and last name with index 2: FullName[0], FullName[2] A MAP column is a collection of key/value pairs, where both the key and values have data types. The key and value do not have to use the same data type. A MAP for Full Name might be declared using the following syntax: MAP In the Full Name case, you would populate the MAP column with the following key/value pairs: 'FirstName', 'John' 'MiddleName', 'Doe' 'LastName', 'Smith' You can access MAP column elements using the same syntax as you use with an ARRAY, except that you use the key value instead of the position as the index. Accessing the fi rst and last names would be done with this syntax: FullName['FirstName'], FullName['LastName'] After looking at the possible data types, you may be wondering how these are stored in Hive. The next section covers the fi le formats that can be used to store the data. Chapter 6 ■ Adding Structure with Hive 111 File Formats Hive uses Hadoop as the underlying data store. Because the actual data is stored in Hadoop, it can be in a wide variety of formats. As discussed in Chapter 5, “Storing and Managing Data in HDFS,” Hadoop stores fi les and doesn’t impose any restrictions in the content or format of those fi les. Hive offers enough fl ex- ibility that you can work with almost any fi le format, but some formats require signifi cantly more effort. The simplest fi les to work with in Hive are text fi les, and this is the default format Hive expects for fi les. These text fi les are normally delimited by specifi c characters. Common formats in business settings are comma-separated value fi les or tab-separated value fi les. However, the drawback of these formats is that commas and tabs often appear in real data; that is, they are embedded inside other text, and not intended as delimiters in all instances. For that reason, Hive by default uses control characters as delimiters, which are less likely to appear in real data. Table 6.2 describes these default delimiters. Table 6-2: Hive Default Delimiters for Text Files DELIMITER OCTAL CODE DESCRIPTION \n \012 New line character; this delimits rows in a text fi le. ^A \001 Separates columns in each row. ^B \002 Separates elements in an ARRAY, STRUCT, and key/value pairs in a MAP. ^C \003 Separates the key from the value in a MAP column. NOTE The default delimiters can be overridden when the table is created. This is useful when you are dealing with text fi les that use diff erent delimiters, but are still formatted in a very similar way. The options for that are shown in the section “Creating Tables” in this chapter. What if one of the many text fi les that is accessed through a Hive table uses a different value as a column delimiter? In that case, Hive won’t be able to parse the fi le accurately. The exact results will vary depending on exactly how the text fi le is formatted, and how the Hive table was confi gured. However, it’s likely that Hive will fi nd less than the expected number of columns in the text fi le. In this case, it will fi ll in the columns it fi nds values for, and then output null values for any “missing” columns. 112 Part III ■ Storing and Managing Big Data The same thing will happen if the data values in the fi les don’t match the data type defi ned on the Hive table. If a fi le contains alphanumeric characters where Hive is expecting only numeric values, it will return null values. This enables Hive to be resilient to data quality issues with the fi les stored in Hadoop. Some data, however, isn’t stored as text. Binary fi le formats can be faster and more effi cient than text formats, as the data takes less space in the fi les. If the data is stored in a smaller number of bytes, more of it can be read from the disk in a single-read operation, and more of it can fi t in memory. This can improve performance, particularly in a big data system. Unlike a text fi le, though, you can’t open a binary fi le in your favorite text editor and understand the data. Other applications can’t understand the data either, unless they have been built specifi cally to understand the format. In some cases, though, the improved performance can offset the lack of portability of the binary fi le formats. Hive supports several binary formats natively for fi les. One option is the Sequence File format. Sequence fi les consist of binary encoded key/value pairs. This is a standard fi le format for Hadoop, so it will be usable by many other tools in the Hadoop ecosystem. Another option is the RCFile format. RCFile uses a columnar storage approach, rather than the row-based approach familiar to users of relational systems. In the columnar approach, the values in a column are compressed so that only the distinct values for the column need to be stored, rather than the repeated values for each row. This can help compress the data a great deal, particularly if the column values are repeated for many rows. RCFiles are readable through Hive, but not from most other Hadoop tools. A variation on the RCFile is the Optimized Record Columnar File format (ORCFile). This format includes additional metadata in the fi le system, which can vastly speed up the querying of Hive data. This was released as part of Hive 0.11. NOTE Compression is an option for your Hadoop data, and Hive can decompress the data as needed for processing. Hive and Hadoop have native support for com- pressing and decompressing fi les on demand using a variety of compression types, including common formats like Zip compression. This can be an alternative that allows to you get the benefi ts of smaller data formats while still keeping the data in a text format. If the data is in a binary or text format that Hive doesn’t understand, custom logic can be developed to support it. The next section discusses how these can be implemented. 114 Part III ■ Storing and Managing Big Data Hive has robust support for both standard and complex data types, stored in a wide variety of formats. And as highlighted in the preceding section, if sup- port for a particular fi le format is not included, it can be added via third-party add-ons or custom implementations. This works very well with the type of data that is often found in Hadoop data stores. By using Hive’s ability to apply a tabular structure to the data, it makes it easier for users and tools to consume. But there is another component to making access much easier for existing tools, which is discussed next. Enabling Data Access and Transformation Traditional users of data warehouses expect to be able to query and transform the data. They use SQL for this. They run this SQL through applications that use common middleware software to provide a standard interface to the data. Most RDBMS systems implement support for one or more of these middleware interfaces. Open Database Connectivity (ODBC) is a common piece of software for this and has been around since the early 1990s. Other common interfaces include the following: ■ ADO.NET (used by Microsoft .NET-based applications) ■ OLE DB ■ Java Database Connectivity (JDBC) ODBC, being one of the original interfaces for this, is well supported by exist- ing applications, and many of the other interfaces provide bridges for ODBC. Hive provides several forms of connectivity to Hadoop data through Thrift. Thrift is a software framework that supports network service communication, including support for JDBC and ODBC connectivity. Because ODBC is broadly supported by query access tools, it makes it much easier for business users to access the data in Hadoop using their favorite analysis tools. Excel is one of the common tools used by end users for working with data, and it supports ODBC. (Using Excel with Hadoop is discussed further in Chapter 11, “Visualizing Big Data with Microsoft BI.”) In addition to providing ODBC data access, Hive also acts as a translator for the SQL. As mentioned previously, many users and developers are familiar with writing SQL statements to query and transform data. Hive can take that SQL and translate it into MapReduce jobs. So, rather than the business users having to learn Java and MapReduce, or learn a new tool for querying data, they can leverage their existing knowledge and skills. Hive manages this SQL translation by providing Hive Query Language (HQL). HQL provides support for common SQL language operations like SELECT for retrieving information and INSERT INTO to load data. Although HQL is not Chapter 6 ■ Adding Structure with Hive 115 ANSI SQL compliant, it implements enough of the standard to be familiar to users who have experience working with RDBMS systems. Diff erentiating Hive from Traditional RDBMS Systems This chapter has discussed several of the ways that Hive emulates a relational database. It’s also covered some of the ways in which it differs, including the data types and the storage of the data. Those topics are worth covering in a bit more depth because they do have signifi cant impact on how Hive functions and what you should expect from it. In a relational database like SQL Server, the database engine manages the data storage. That means when you insert data into a table in a relational data- base, the server takes that data, converts it into whatever format it chooses, and stores it in data structures that it manages and controls. At that point, the server becomes the gatekeeper of the data. To access the data again, you must request it from the relational database so that the server can retrieve it from the internal storage and return it to you. Other systems cannot access or change the data directly without going through the server. Hive, however, uses Hadoop as its data storage system. Therefore, the data sits in HDFS and is accessible to anyone with access to the fi le system. This does make it easier to manage the data and add new information, but you must be aware that other processes can manipulate the data. One of the primary differences between Hive and most relational systems is that data in Hive can only be selected, inserted, or deleted; there is no update capability. This is due to Hive using Hadoop fi le storage for its data. As noted in Chapter 8, “Effective Big Data ETL with SSIS, Pig, and Sqoop,” Hadoop is a write-once, read-many fi le system. If you need to change something in a fi le, you delete the original and write a new version of the fi le. Because Hive manages table data using Hadoop, the same constraints apply to Hive. There are also no row-based operations. Instead, everything is done in bulk mode. Another key difference is that the data structure is defi ned up-front in tra- ditional relational databases. The columns of a table, their data types, and any constraints on what the column can hold are set when the table is created. The database server enforces that any data written to the table conforms to the rules set up when the table was created. This is referred to as schema on write; the relational database server enforces the schema of the data when it is written to the table. If the data does not match the defi ned schema, it will not be inserted into the table. Because Hive doesn’t control the data and can’t enforce that it is written in a specifi c format, it uses a different approach. It applies the schema when the data is read out of the data storage: schema on read. As mentioned, if the num- ber of columns in the fi le is less than what is defi ned in Hive, null values are 116 Part III ■ Storing and Managing Big Data returned for the missing columns. If the data types don’t match, null values are returned for those columns as well. The benefi t of this is that Hive queries rarely fail due to bad data in the fi les. However, you do have to ensure that the data coming back is still meaningful and doesn’t contain so many null values that it isn’t useful. Working with Hive Like many Hadoop tools, Hive leverages a command-line interface (CLI) for interaction with the service. Other tools are available, such as the Hive Web Interface (HWI) and Beeswax, a user interface that is part of the Hue UI for working with Hadoop. For the examples in this chapter, though, the command line is used. NOTE Beeswax and Hue are not yet supported in HDInsight or the Hortonworks HDP 1.3 distribution, but it is under development. In the meantime, the CLI is the pri- mary means of interacting directly with Hive. You can launch the CLI by navigating to the Hive bin folder (c:\hdp\hadoop\ hive-\bin in the Hortonworks HDP 1.3 distribution with the default setup). Once there, run the CLI by executing the hive.cmd applica- tion. After the CLI has been run, you’ll notice that the prompt changes to hive (see Figure 6.1). Figure 6.1: Hive CLI Interface When executing Hive commands through the CLI, you must put a semicolon on each line of code that you want to execute. You can enter multiple lines, and the CLI will buffer them, until a semicolon is entered. At that point, the CLI executes all the proceeding commands. Chapter 6 ■ Adding Structure with Hive 117 A useful feature of the Hive CLI is the ability to run hadoop dfs commands without exiting. The Hive CLI uses an alias so that you can simply reference dfs directly, without the hadoop keyword, and it will execute the dfs command and return the results. For example, running the following code from the Hive prompt returns a recursive directory listing from the Hadoop fi le system: dfs -lsr The Hive CLI also supports basic autocomplete; that is, if you start typing a keyword or function and press the Tab key, it tries to complete the word. For example, if you type in cre and press Tab, it will complete it to create. If you press Tab on a new line, it prompts you if you want a list of all 427 possibili- ties. The CLI also maintains a history of previous commands entered. You can retrieve these values by pressing the up- and down-arrow keys. If you have a previously entered command selected, you can press the Enter key to run it again. Creating and Querying Basic Tables This section covers the basics of creating and organizing tables in Hive, as well as how to query them. If you are comfortable with SQL, you should fi nd the commands familiar. Creating Databases Hive databases are essentially a way to organize tables. They are similar to the concept of a schema in SQL Server. In fact, Hive supports SCHEMA as a synonym for the DATABASE keyword. Databases are most often used to organize tables when there are multiple groups using the Hive server. When a database is created, Hive creates a folder in the Hadoop fi le system, by default using the same name as specifi ed for the database, with.db appended to it. Any objects that are created using that database will be stored in the database’s directory. If you don’t specify a database when you create a table, it will be created in the default database. To create a database, use the CREATE DATABASE command, followed by the name of the database: CREATE DATABASE MsBigData; NOTE Many commands in Hive support IF EXISTS or IF NOT EXISTS clauses. Generally, you can use IF NOT EXISTS when creating objects, and IF EXISTS when removing them. These are used to check whether the target object is in the correct state before executing the command. For example, running a CREATE DATABASE foo; command when a foo database already exists will result in an error. However, if you use CREATE DATABASE IF NOT EXISTS foo;, no error will be produced, and the state of the database won’t be modifi ed. 118 Part III ■ Storing and Managing Big Data If you want to see the directories created in Hadoop for the databases, you can run this command: dfs -lsr /hive/warehouse;. /hive/warehouse is the default location for Hive in Hadoop storage. If you want to place the fi les in a different location, you can also directly specify the directory for the database using the LOCATION clause: CREATE DATABASE MsBigDataAlt LOCATION '/user/MyNewDb'; NOTE The default directory for Hive metadata storage can be changed in the hive-site.xml fi le, along with many of the properties that control how Hive behaves. This fi le is located in the Hive conf folder, located at c:\hdp\hadoop\ hive-\conf in a standard HDP setup. Be careful when making changes to this fi le, though; any errors can cause Hive not to start correctly. After creating a few databases, you may be wondering how to view what’s been created and how to remove databases you don’t need. The SHOW DATABASES command lists the databases, and DESCRIBE DATABASE provides the location of the database: SHOW DATABASES; DESCRIBE DATABASE MsBigData; DROP DATABASE removes a database. This also removes the directory associ- ated with the database. By default, Hive does not let you drop a database that contains tables. If you are sure that you want to remove the database and its tables, you can use the CASCADE keyword. This tells Hive to remove all contents of the directory. DROP DATABASE also supports the IF EXISTS clause: DROP DATABASE MsBigDataAlt; DROP DATABASE MsBigDataAlt CASCADE; Finally, you can use the USE command to control what database context will be used if you don’t specify one. You’ll fi nd this convenient if you work in a particular database most of the time: USE MsBigData; Creating Tables The basics of creating a table in Hive are similar to typical SQL, but there are a number of extensions, particularly for dealing with different fi le and record formats. A basic table can be created with the following: CREATE TABLE MsBigData.customer ( name STRING, city STRING, state STRING, Chapter 6 ■ Adding Structure with Hive 119 postalCode STRING, purchases MAP ); This table holds some basic customer information, including a list of the customer purchases in a MAP column, where the key is the product name, and value is the amount paid. To copy the schema for an existing table, you can use the LIKE keyword: CREATE TABLE IF NOT EXISTS MsBigData.customer2 LIKE MsBigData.customer; You can use the SHOW command to list the tables in either the current database or other databases. The DESCRIBE command can also be used with tables: SHOW TABLES; SHOW TABLES IN default; DESCRIBE MsBigData.customer; NOTE You might have noticed that no primary or foreign keys are defi ned on these tables, nor any NOT NULL or other column constraints. Hive doesn’t support these options because it doesn’t have any way to enforce constraints on the data. In a relational system, these constraints help enforce data quality and consistency and are generally enforced when the data is inserted into a table (schema on write). Hive doesn’t control the data, so it can’t enforce the constraints. Tables can be removed by using the DROP TABLE command, and renamed using the ALTER TABLE statement. Columns can be renamed or have their types changed using CHANGE COLUMN, and they can be added or deleted using ADD COLUMNS and REPLACE COLUMNS, respectively. Replacing columns deletes any column not included in the new column list: DROP TABLE MsBigData.customer2; ALTER TABLE customer RENAME TO customer_backup; ALTER TABLE customer_backup CHANGE COLUMN name fullname STRING; ALTER TABLE customer_backup ADD COLUMNS ( country STRING); ALTER TABLE customer_backup REPLACE COLUMNS ( name STRING, city STRING, state STRING, postalCode STRING, purchases MAP); WARNING Using ALTER TABLE to modify a table changes the metadata for the table. It does not modify the data in the fi les. This option is useful for correcting mis- takes in the schema for a table, but any data issues have to be cleaned up separately. 120 Part III ■ Storing and Managing Big Data As discussed in the “Custom File and Record Formats” section, Hive gives you control over the record format. In the preceding CREATE TABLE statement, the Hive defaults are used; it expects text fi les in delimited format, with Ctrl-A (octal 001) as a fi eld delimiter. To control that format, Hive supports explicitly declaring the format options. The preceding table, with explicit delimiters defi ned, would look like this: CREATE TABLE MsBigData.customer ( name STRING, city STRING, state STRING, postalCode STRING, purchases MAP ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' COLLECTION ITEMS TERMINATED BY '\002' MAP KEYS TERMINATED BY '\003' LINES TERMINATED BY '\n' STORED AS TEXTFILE; The fi le format is controlled by the STORED AS portion of the statement. To use the SEQUENCEFILE fi le format, you replace STORED AS TEXTFILE with STORED AS SEQUENCEFILE. To use custom fi le formats, you specify the INPUTFORMAT and OUTPUTFORMAT options directly. For example, here is the specifi cation for the RCFile format. The value in the string is the class name for the fi le format to be used: STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.RCFileOutputFormat' The row format options are controlled by the ROW FORMAT portion. The delim- ited SerDe is the default. To specify a custom SerDe, use the SERDE keyword followed by the class name of the SerDe. For example, the RegexSerDe can be specifi ed as follows: ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe' Another important option in table creation is the EXTERNAL option. By default, when you create a table without specifying EXTERNAL, it is created as a managed table. This means that Hive considers itself the manager of the table, including any data created in it. The data for the table will be stored in a subdirectory under the database folder, and if the table is dropped, Hive will remove all the data associated with the table. However, if you use CREATE EXTERNAL TABLE to create the table, Hive creates the metadata for the table, and allows you to query it, but it doesn’t consider itself the owner of the table. If the table is dropped, the metadata for it will be Chapter 6 ■ Adding Structure with Hive 121 deleted, but the data will be left intact. External tables are particularly useful for data fi les that are shared among multiple applications. Creating the Hive table defi nition allows it to be queried using the power of Hive, but it makes it clear that the data is shared with other applications. When you use the EXTERNAL keyword, you must also use the LOCATION option: CREATE EXTERNAL TABLE MsBigData.customer ( name STRING, city STRING, state STRING, postalCode STRING, purchases MAP ) LOCATION 'user/MyCustomerTable'; You can the LOCATION option with managed tables, as well, but it’s not necessary unless you want a table that Hive manages that is also stored in a directory that Hive doesn’t manage. For clarity, it’s recommended that LOCATION be used only with external tables. WARNING Be aware that, regardless of whether the table is managed or exter- nal, the data is still accessible through the Hadoop fi le system. Files can be added or deleted by anyone with access to Hadoop. So, even for managed tables, Hive doesn’t really take full control of the data fi les. Adding and Deleting Data Remember from the earlier discussion about differences between Hive and rela- tion systems that Hive uses Hadoop for storage, so it does not support row-level operations. You can’t insert, update, or delete individual rows. However, because Hive is designed for big data, you would want to perform bulk operations in any case, so this isn’t a signifi cant restriction. Perhaps the simplest way to add data to a Hive table is to write or copy a prop- erly formatted fi le to the table’s directory directly, using HDFS. (Commands for copying fi les directly in HDFS are covered in Chapter 5, “Storing and Managing Data in HDFS.”) You can load data from existing fi les into a table using the LOAD DATA com- mand. This is similar to using a BULK INSERT statement in SQL Server. All the data in the specifi ed location will be loaded into the table. However, in SQL Server, BULK INSERT references a single data fi le. LOAD DATA is usually pointed at a directory, so that all fi les in the directory can be imported. Another important difference is that, while SQL Server verifi es the data in a bulk load, Hive only 122 Part III ■ Storing and Managing Big Data verifi es that the fi le format matches the table defi nition. It does not check that the record format matches what has been specifi ed for the table: LOAD DATA LOCAL INPATH 'C:/MsBigData/TestData/customers' OVERWRITE INTO TABLE MsBigData.customer; In the preceding statement, OVERWRITE indicates that any fi les in the table’s directory should be deleted before loading the new data. If it is left out, the data fi les will be added to the fi les in the directory. The LOCAL keyword indicates that the data will be copied from the local fi le system into the Hive directory. The original copy of the fi les will be left in the local fi le system. If the LOCAL keyword is not included, the path is resolved against the HDFS, and the fi les are moved to the Hive directory, rather than being copied. What if you want to insert data into one table based on the contents of another table? The INSERT statement handles that: INSERT INTO TABLE customer SELECT * FROM customer_import The INSERT statement supports any valid SELECT statement as a source for the data. (The format for the SELECT statement is covered in the next section.) The data from the SELECT statement is appended to the table. If you replace the INTO keyword with OVERWRITE, the contents of the table are replaced. NOTE Several variations of these statements can be used with partitioned tables, as covered in the section “Loading Partitioned Tables,” later in this chapter. There is also the option to create managed tables in Hive based on selecting data from another table: CREATE TABLE florida_customers AS SELECT * FROM MsBigData.Customers WHERE state = 'FL'; NOTE Hive doesn’t support temp tables. You can create tables and populate them easily using the CREATE TABLE .. AS syntax, but you must manage the lifetime of the table yourself. After a table has been loaded, you may want to export data from it. You can use the INSERT .. DIRECTORY command for this. OVERWRITE indicates that the target directory should be emptied before the new fi les are written, and LOCAL indicates that the target is a directory on the local fi le system. Omitting them has the same behavior as it had with the LOAD DATA command: INSERT OVERWRITE LOCAL DIRECTORY 'c:\MsBigData\export_customer' SELECT name, purchases FROM customer WHERE state = 'FL'; Chapter 6 ■ Adding Structure with Hive 123 You can also export to multiple directories simultaneously. Be aware that each record that meets the WHERE clause conditions will be exported to the specifi ed location, and each record is evaluated against every WHERE clause. It is possible, depending on how the WHERE clause is written, for the same record to be exported to multiple directories: FROM customer c INSERT OVERWRITE DIRECTORY '/tmp/fl_customers' SELECT * WHERE c.state = 'FL' INSERT OVERWRITE DIRECTORY '/tmp/ca_customers' SELECT * WHERE c.state = 'CA'; Querying a Table Writing queries against Hive is fairly straightforward if you are familiar with writing SQL queries. Instead of focusing on the everyday SQL, this section focuses on the aspects of querying Hive that differ from most relational databases. The basic SELECT statement is intact, along with familiar elements such as WHERE clauses, table and column aliases, and ORDER BY clauses: SELECT c.name, c.city, c.state, c.postalCode, c.purchases FROM MsBigData.customer c LIMIT 100 WHERE c.state='FL' ORDER BY c.postalCode; NOTE One useful diff erence to note is the LIMIT clause. This restricts the query to an upper limit of rows that it can return. If you are used to SQL Server, you might be familiar with the TOP clause. LIMIT works in the same way, but it doesn’t support per- centage based row limits. LIMIT can prove very handy when you are exploring data and don’t want to process millions or billions of rows in your Hive tables. When you run the SELECT statement, you’ll notice that the results are as expected, with the exception of the purchases column. Because that column represents a collection of values, Hive fl attens it into something that it can return as a column value. It does this using Java Script Object Notation (JSON), a standard format for representing objects: John Smith Jacksonville FL 32226 {"Food":456.98,"Lodging":1245.45} This might be useful to get a quick look at a table’s data, but in most instances you will want to extract portions of the value out. Querying individual ele- ments of complex types is fairly straightforward. For MAP types, you reference the key value: SELECT c.name, c.city, c.state, c.postalCode, c.purchases['Lodging'] 124 Part III ■ Storing and Managing Big Data If purchases were an ARRAY, you would use the index of the value you are interested in: SELECT c.name, c.city, c.state, c.postalCode, c.purchases[1] And if purchases were a STRUCT, you would use the fi eld name: SELECT c.name, c.city, c.state, c.postalCode, c.purchases.Lodging You can use this syntax in any location where you would use a regular column. Calculations and functions are used in the same way as you would in most SQL dialects. For example, this SELECT statement returns the sum of lodging purchases for any customer who purchased over 100 in food: SELECT SUM(c.purchases['Lodging']) FROM MsBigData.customer c WHERE c.purchases['Food'] > 100; NOTE One interesting feature of Hive is that you can use regular expressions in the column list of the SELECT. For example, this query returns the name column and all columns that start with “address” from the specifi ed table: SELECT name, 'address.*' FROM shipments; You can also use the functions RLIKE and REGEXP, which function in the same way as LIKE but allow the use of regular expressions for matching. Some functions that are of particular interest are those that deal with complex types, because these don’t have equivalent versions in many relational systems. For example, there are functions for determining the size of a collection. There are also functions that generate tables as output. These are the opposite of aggre- gating functions, such as SUM, which take multiple rows and aggregate them into a single result. Table generating functions take a single row of input and produce multiple rows of output. These are useful when dealing with complex types that need to be fl attened out. However, they must be used by themselves in SELECT column lists. Table 6.3 describes the table-generating functions, along with other functions that work with complex types. Table 6-3: Functions Related to Complex Types NAME DESCRIPTION size(MAP | ARRAY) Returns the number of elements in the MAP or ARRAY passed to the function map_keys(MAP) Returns the key values from a MAP as an ARRAY map_values(MAP) Returns the values from a MAP as an ARRAY array_contains(ARRAY, value) Returns true if the array contains the value, false if it does not Chapter 6 ■ Adding Structure with Hive 125 NAME DESCRIPTION sort_array(ARRAY) Sorts and returns the ARRAY by the natural order of the elements explode(MAP | ARRAY) Returns a row for each item in the MAP or ARRAY inline(ARRAY) Explodes an array of STRUCTs into a table NOTE There are also functions for parsing URLs and JSON objects into tables of information that can prove extremely useful if you need to deal with this type of data. For a complete, current list of Hive operators and functions, a good resource is the Hive wiki: https://cwiki.apache.org/confluence/display/Hive/ LanguageManual+UDF. Hive supports joining tables, but only using equi-join logic. This restriction is due to the distributed nature of the data, and because Hive has to translate many queries to MapReduce jobs. Performing non-equi-joins across distributed data sets is extremely resource intensive, and performance would often be unreasonably poor. For the same reason, ORs cannot be used in JOIN clauses. INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER JOINs are supported. These function like their SQL equivalents. When processing a SELECT with both JOIN and WHERE clauses, Hive evaluates the JOIN fi rst, then the WHERE clause is applied on the joined results. During join operations, Hive makes the assumption that the largest table appears last in the FROM clause. Therefore, it attempts to process the other tables fi rst, and then streams the content of the last table. If you keep this in mind when writing your Hive queries, you will get better performance. You can use a query hint to indicate which table should be streamed, too: SELECT /*+ STREAMTABLE(bt) */ bt.name, bt.transactionAmount, c.state FROM bigTable bt JOIN customer c ON bt.postalCode = c.PostalCode When you are using ORDER BY, be aware that this requires ordering of the whole data set. Because this operation cannot be distributed across multiple nodes, it can be quite slow. Hive offers the alternative SORT BY. Instead of sort- ing the entire data set, SORT BY lets each node that is processing results sort its results locally. The overall data set won’t be ordered, but the results from each node will be sorted: SELECT c.name, c.city, c.state, c.postalCode, c.purchases FROM MsBigData.customer c SORT BY c.postalCode; 126 Part III ■ Storing and Managing Big Data You can use SORT BY in conjunction with DISTRIBUTE BY to send related data to the same nodes for processing so that there is less overlap from sort- ing on multiple nodes. In the following example, the data is distributed to nodes, based on the state, and then the postal codes are sorted per state for each node: SELECT c.name, c.city, c.state, c.postalCode, c.purchases FROM MsBigData.customer c DISTRIBUTE BY c.state; SORT BY c.state, c.postalCode; Now that you have explored the basic operations in Hive, the next section will address the more advanced features, like partitioning, views, and indexes. Using Advanced Data Structures with Hive Hive has a number of advanced features. These are primarily used for perfor- mance and ease of use. This section covers the common ones. Setting Up Partitioned Tables Just like most relational databases, Hive supports partitioning, though the implementation is different. Partitioned tables are good for performance because they help Hive narrow down the amount of data it needs to process to respond to queries. The columns used for partitioning should not be included in the other col- umns for the table. For example, using the customer table example from earlier, a logical partition choice would be the state column. To partition the table by state, the state column would be removed from the column list and added to the PARTITIONED BY clause: CREATE TABLE MsBigData.customer ( name STRING, city STRING, postalCode STRING, purchases MAP ) PARTITIONED BY (state STRING); There can be multiple partition columns, and the columns in the PARTITIONED BY list cannot be repeated in the main body of the table, because Hive consid- ers those to be ambiguous columns. This is because Hive stores the partition column values separately from the data in the fi les. As discussed previously, Hive creates a directory to store the fi les for managed tables. When a managed Chapter 6 ■ Adding Structure with Hive 127 table is partitioned, Hive creates a subdirectory structure in the table directory. A subdirectory is created for each partition value, and only data fi les with that partition value are stored in those folders. The directory structure would look something like this: …/customers/state=AL …/customers/state=AK ….. …/customers/state=WI …/customers/state=WY You can also use the SHOW PARTITIONS command to see what the partitions look like for the table. The partitions are created automatically as data is loaded into the table. When this table is queried with a WHERE clause like state = 'AL', Hive only has to process fi les in the …/customers/state=AL folder. Partitioning can drastically impact performance by reducing the number of folders that need to be scanned to respond to queries. However, to benefi t from it, the queries have included the partition columns in the WHERE clause. One of the available options for Hive is a “strict” mode. In this mode, any query against a partitioned table must include partitioned columns in the WHERE clause. This can be enabled or disabled by setting the hive.mapred.mode to strict or nonstrict, respectively. External partitioned tables are managed a bit differently. The partitioned col- umns are still declared using the PARTITIONED BY clause. However, because Hive doesn’t manage the directory structure for external tables, you must explicitly set the available partitions and the directory it maps to using the ALTER TABLE statement. For example, if the customer example were an external table, adding a partition for the state of Alabama (AL) would look like this: ALTER TABLE MsBigData.customer ADD PARTITION(state = 'AL') LOCATION 'hdfs://myserver/data/state/AL'; Notice that in this mode you have complete fl exibility with the directory structure. This makes it useful when you have large amounts of data coming from other tools. You can get the performance benefi ts of partitioning while still retaining the original directory structure of the data. To remove a partition, you can use the DROP PARTITION clause: ALTER TABLE MsBigData.customer DROP PARTITION(state = 'AL') LOCATION 'hdfs://myserver/data/state/AL'; Moving a partition can be done using the SET LOCATION option: ALTER TABLE MsBigData.customer PARTITION(state = 'AL') SET LOCATION 'hdfs://myserver/data/new_state/AL'; 128 Part III ■ Storing and Managing Big Data Loading Partitioned Tables When loading data into a partitioned table, you must tell Hive what partition the data belongs to. For example, to load the AL partition of the customer table, you specify the target partition: LOAD DATA LOCAL INPATH 'C:/MsBigData/TestData/customers_al' OVERWRITE INTO TABLE MsBigData.customer PARTITION (state = 'AL'); If you want to insert data into a partition from an existing table, you must still defi ne the partition that is being loaded: INSERT INTO TABLE customer PARTITION (state = 'AL') SELECT * FROM customer_import ci WHERE ci.state_code = 'AL'; However, this may not work well if you have a large number of partitions. The INSERT INTO…SELECT statement for each partition would have to scan the source table for the data, which would be very ineffi cient. An alternative approach is to use the FROM…INSERT format: FROM customer_import ci INSERT INTO TABLE customer PARTITION (state = 'AL') SELECT * WHERE ci.state_code = 'AL' PARTITION (state = 'AK') SELECT * WHERE ci.state_code = 'AK' PARTITION (state = 'AZ') SELECT * WHERE ci.state_code = 'AZ' PARTITION (state = 'AR') SELECT * WHERE ci.state_code = 'AR'; When you use this format, the table is scanned only once. Each record in the source table is evaluated against each WHERE clause. If it matches, it is inserted into the associated partition. Because the record is compared against each clause (even if it’s already matched to a previous WHERE clause), records can be inserted into multiple partitions, or none at all if it doesn’t match any clauses. You can also do dynamic partitioning. This is based on matching the last columns in the SELECT statement against the partition. For example, in the fol- lowing FROM. . .INSERT INTO statement, the country code has a hard-coded value, meaning it is static. However, the state partition does not have a hard-coded value, which makes it dynamic. The state_code column is used to dynami- cally determine what partition the record should be placed in. This isn’t based on matching the column name; it’s based on ordinal position in the SELECT. In this case, there is one partition column, so the last column in the SELECT list Chapter 6 ■ Adding Structure with Hive 129 is used. If there were two partition columns, the last two columns would be used, and so on: FROM customer_import ci INSERT INTO TABLE customer PARTITION (country='US', state) SELECT name, city, postalCode, purchases, state_code; WARNING Be careful when using dynamic partitioning. It can be easy to inad- vertently create a massive number of partitions and impact performance negatively. By default, it operates in strict mode, which means at least some of the partition col- umns must be static. This can avoid runaway partition creation. Using Views Views are a way of persisting queries so that they can be treated like any other table in Hive. They behave similarly to views in a relational database. You can use CREATE VIEW to defi ne a view based on a query: CREATE VIEW customerSales AS SELECT c.name, c.city, c.state, c.postalCode, s.salesAmount, FROM MsBigData.customer c JOIN sales s ON c.name = s.customerName WHERE c.state='FL'; Selecting from the view works like selecting from any table, except that the logic of the original query is abstracted away: SELECT * FROM customerSales WHERE salesAmount > 50000; One of the most powerful uses of views in Hive is to handle complex data types. Often, these need to be fl attened out for consumption by users or other processes. If you are using a view, the purchases column in the customer table could be fl attened into two columns, and consumers of the view wouldn’t need to understand the collection structure: CREATE VIEW customerPurchases AS SELECT c.name, c.city, c.state, c.postalCode, c.purchases['Food'] AS foodPurchase, c.purchases['Lodging'] AS lodgingPurchase FROM MsBigData.customer c WHERE c.state='FL'; You can remove views by using the DROP VIEW statement: DROP VIEW customerPurchases; 130 Part III ■ Storing and Managing Big Data Creating Indexes for Tables As mentioned in the section on creating tables, Hive doesn’t support keys. Because traditional relational databases create indexes by creating stores of indexed values that link to the keys of records with those values, you might wonder how Hive can support them. The short answer is that indexes work differently in Hive. For the slightly longer answer, read on. When an index is created in Hive, it creates a new table to store the indexed values in. The primary benefi t of this is that Hive can load a smaller number of columns (and thus use less memory and disk resources) to respond to queries that use those columns. However, this benefi t in query performance comes at the cost of processing the index and the additional storage space required for it. In addition, unlike indexes in most relational systems, Hive does not automati- cally update the index when new data is added to the indexed table. You are responsible for rebuilding the index as necessary. To create an index, you use the CREATE INDEX statement. You must provide the table and columns to use for creating the index. You also need to provide the type of index handler to use. As with many parts of Hive, indexes are designed to be extensible, so you can develop your own index handlers in Java. In the following example, the COMPACT index handler is used: CREATE INDEX customerIndex ON TABLE customer (state) AS 'COMPACT' WITH DEFERRED REBUILD IN TABLE customerIndexTable; Another option for the index handler is BITMAP. This handler creates bitmap indexes, which work well for columns that don’t have a large number of distinct values. The index creation also specifi es the table where the index data will be placed. This is optional; however, it does make it easier to see what the index contains. Most of the standard options for CREATE TABLE can also be specifi ed for the table that holds the index. The WITH DEFERRED REBUILD clause tells Hive not to populate the index immediately. Rather, you tell it to begin rebuilding with the ALTER INDEX. . . REBUILD command: ALTER INDEX customerIndex ON TABLE customer REBUILD; You can show indexes for a table using the SHOW INDEX command, and drop one by using DROP INDEX: SHOW INDEX ON customer; DROP INDEX customerIndex ON TABLE customer; Chapter 6 ■ Adding Structure with Hive 131 Summary This chapter covered the basics of working with Hive. The commands for creating databases, tables, and views were covered. In addition, the commands for insert- ing data into those tables and querying it back out were reviewed. Some more advanced functionality around partitions and indexing was also highlighted. Hive has quite a bit of functionality, and not all the functionality could be covered here due to space constraints. In particular, administration, confi gu- ration, and extensibility could require a book unto themselves to cover fully. An excellent reference for this is Programming Hive (O’Reilly Media, Inc., 2012), by Edward Capriolo, Dean Wampler, and Jason Rutherglen. However, a good overview of setting up data for querying and implementing some common performance improvements has been provided. 134 Part III ■ Storing and Managing Big Data Hive to treat the data in a familiar relational architecture. It also permits easier exchange of data between the HDFS storage and client tools used to present the data for analysis using familiar data exchange application programming interfaces (APIs) such as Java Database Connectivity (JDBC) and Open Database Connectivity (ODBC). For example using HCatalog you can use the same schema for processing the data in Hive or Pig and then pull the data into a traditional data warehouse contained in SQL Server, where it can easily be combined with your traditional BI systems. Using HBase Although HDFS is excellent at storing large amounts of data, and although MapReduce jobs and tools such as Hive and Pig are well suited for reading and aggregating large amounts of data, they are not very effi cient when it comes to individual record lookups or updating the data. This is where HBase comes into play. HBase is classifi ed as a NoSQL database. Unlike traditional relational databases like SQL Server or Oracle, NoSQL databases do not attempt to provide ACID (atomicity, consistency, isolation, durability) transactional reliability. Instead, they are tuned to handle large amounts of unstructured data, providing fast key-based lookups and updates. As mentioned previously, HBase is a key/value columnar storage system. The key is what provides fast access to the value for retrieval and updating. An HBase table consists of a set of pointers to the cell values. These pointers are made up of a row key, a column key, and a version key. Using this type of key structure, the values that make up tables and rows are stored in regions across regional servers. As the data grows, the regions are automatically split and redistributed. Because HBase uses HDFS as the storage layer, it relies on it to supply services such as automatic replication and failover. Because HBase relies so heavily on keys for its performance, it is a very important consideration when defi ning tables. In the next section, you will look at creating HBase tables and defi ning appropriate keys for the table. Creating HBase Tables Because the keys are so important when retrieving or updating data quickly, it is the most important consideration when setting up an HBase table. The creation of the keys depends a great deal on how the data gets accessed. If data is accessed as a single-cell lookup, a randomized key structure works best. If you retrieve data based on buckets (for example, logs from a certain server), you should include this in the key. If you further look up values based on log event Chapter 7 ■ Expanding Your Capability with HBase and HCatalog 135 type or date ranges, these should also be part of the key. The order of the key attributes is important. If lookups are based primarily on server and then on event type, the key should be made up of Server–Event–Timestamp. Another factor to consider when creating tables in HBase is normalization versus denormalization. Because HBase does not support table joins, it is impor- tant to know how the data will be used by the clients accessing the data. Like most reporting-centric databases, it is a good idea to denormalize your tables somewhat depending on how the data is retrieved. For example, if sales orders are analyzed by customer location, this data should be denormalized into the same table. Sometimes it makes more sense to use table joins. If you need to perform table joins, it can be implemented in a MapReduce job or in the client application after retrieving the data. To interact with HBase, you can use the HBase shell. The HBase command- line tool is located in the bin directory of your HBase installation directory if you are using HDP for Windows. (Note: The shell is not currently available in HDInsight.) To launch the HBase shell using the Windows command prompt, navigate to the bin directory and issue the following command: hbase shell After the HBase shell is launched, you can view the help by issuing the help command. After issuing the help command, you will see a list of the various command groups (see Figure 7.1). Figure 7.1: Listing the various command groups in the HBase shell. 136 Part III ■ Storing and Managing Big Data There are command groups for data defi nition statements (DDL), data manipu- lation statements (DML), replication, tools, and security. To list help for a com- mand group, issue the help command followed by the group name in quotation marks. For example, Figure 7.2 shows the help for the DDL command group (only the Alter command is showing). Figure 7.2: Listing help for the DDL command group. To create a basic table named Stocks with a column family named Price and one named Trade, enter the following code at the command prompt: create 'Stocks', 'Price','Trade' To verify that the table has been created, you can use the describe command: describe 'Stocks' Figure 7.3 shows the output describing the table attributes. Notice the two column family groups. You can also set attributes for the table/ column groups (for example, the number of versions to keep and whether to keep deleted cells). Now that you’ve created your table, you’re ready to load data into it. Loading Data into an HBase Table You can load data into the table in two ways. To load a single value, you use the put command, supplying the table, column, and value. The column is prefi xed Chapter 7 ■ Expanding Your Capability with HBase and HCatalog 137 by the column family. The following code loads a row of stock data into the Stocks table: put 'Stocks', 'ABXA_12092009','Price:Open','2.55' put 'Stocks', 'ABXA_12092009','Price:High','2.77' put 'Stocks', 'ABXA_12092009','Price:Low','2.5' put 'Stocks', 'ABXA_12092009','Price:Close','2.67' put 'Stocks', 'ABXA_12092009','Trade:Volume','158500' Figure 7.3: Describing the table attributes. To verify that the values have been loaded in the table, you can use the scan command: scan 'Stocks' You should see the key/values listed as shown in Figure 7.4. Notice a timestamp has been automatically loaded as part of the key to keep track of versioning. Figure 7.4: Listing key/values. If the key is already in the table, HBase updates the value and creates a new timestamp for the cell. The old record is still maintained in the table. Using the HBase shell and the put command illustrates the process of add- ing a row to the table, but it is not practical in a production setting. You can 138 Part III ■ Storing and Managing Big Data load data into an HBase table in several other ways. You can use the HBase tools ImportTsv and CompleteBulkLoad. You can also write a MapReduce job or write a custom application using the HBase API. Another option is to use Hive or Pig to load the data. For example, the following code loads data into an HBase table from a CSV fi le: StockData = LOAD '/user/hue/StocksTest2.csv' USING PigStorage(',') as (RowKey:chararray,stock_price_open:long); STORE StockData INTO 'hbase://StocksTest' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('Price:Open'); Now that you know how to load the data into the table, it is now time to see how you read the data from the table. Performing a Fast Lookup One of HBase’s strengths is its ability to perform fast lookups. HBase supports two data-retrieval operations: get and scan. The get operation returns the cells for a specifi ed row. For example, the following get command is used to retrieve the cell values for a row in the Stocks table. Figure 7.5 shows the resulting output: get 'Stocks', 'ABXA_12092009' Figure 7.5: Getting the row cell values. You can also use the get command to retrieve the previous versions of a cell value. The following get command gets the last three versions of the high column. Figure 7.6 shows the output: get 'Stocks', 'ABXA_12092009', {COLUMN => 'Price:High', VERSIONS => 3} Figure 7.6: Retrieving previous versions of a cell. Chapter 7 ■ Expanding Your Capability with HBase and HCatalog 139 You can query the values in the HBase table using the scan command. The following command returns all columns in the Stocks table: scan 'Stocks' You can limit the output by passing fi lters with the scan command. For example, to retrieve just the high price column, you can use the following command: scan 'Stocks', {COLUMNS => ['Price:High']} You can also use value-based fi lters, prefi x fi lters, and timestamp fi lters. For example, this fi lter scans for values greater than 2.8: scan 'Stocks', {FILTER => "ValueFilter(>, 'binary:2.8')"} HBase provides for fast scanning and lets you create some complex query- ing through the use of fi lters. Unfortunately, it is based on the JRuby language and can be quite complex to master. You may want to investigate a tool such as HBase Manager (http://sourceforge.net/projects/hbasemanagergui/), which provides a simple graphical user interface (GUI) to the HBase database. Loading and Querying HBase To complete the exercise in this section, you need to download and install the Hortonworks Data Platform (HDP) for Windows from Hortonworks. (Note: At the time of this writing the HBase shell is not yet exposed in HDP for Windows or HDInsight. It is anticipated it will be available in early 2014.) You can set up HDP for Windows on a development server to provide a local test environment that supports a single-node deployment. (For a detailed discussion of installing the Hadoop development environment on Windows, see http://hortonworks. com/products/hdp-windows/.) In this exercise, you load stock data into an HBase table and query the data using scan fi lters. The fi le containing the data is a tab-separated value fi le named StocksTest.tsv that you can download from http://www.wiley.com/go/micro- softbigdatasolutions.com. The fi rst step is to load the fi le into the Hadoop fi le system. Open the Hadoop command-line interface and issue the following command to copy the fi le (although your paths may differ): hadoop fs -copyFromLocal C:\SampleData\StocksTest.tsv /user/test/StockTest.tsv You can verify the fi le was loaded by issuing a list command for the direc- tory you placed it in: hadoop fs -ls /user/test/ 140 Part III ■ Storing and Managing Big Data The next step is to create the table structure in HBase. Open the HBase shell and enter the following command to create a table named stock_test with three column families (info, price, and trade): create 'stock_test', 'info','price','trade' You can use Pig Latin to load the data into the HBase table. Open the Pig command shell and load the TSV fi le using the PigStorage function. Replace the path below with the path where you loaded StockTest.tsv into HDFS: StockData = LOAD '/user/test/StockTest.tsv' USING PigStorage() as (RowKey:chararray,stock_symbol:chararray,date:chararray, stock_price_open:double,stock_price_close:double, stock_volume:long); Now you can load the HBase stock_test table using the HBaseStorage func- tion. This function expects the row key for the table to be pasted fi rst, and then the rest of the fi elds are passed to the columns designated in the input string: STORE StockData INTO 'hbase://stock_test' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage ('info:symbol info:date price:open price:close trade:volume'); To test the load, you can run some scans with various fi lters passed in. The following scan fi lters on the symbol to look for the stock symbol ADCT: SingleColumnValueFilter filter = new SingleColumnValueFilter( info, symbol, CompareOp.EQUAL, Bytes.toBytes("ADCT") ); scan.setFilter(filter); Managing Data with HCatalog HCatalog creates a table abstraction layer over data stored on an HDFS cluster. This table abstraction layer presents the data in a familiar relational format and makes it easier to read and write data using familiar query language concepts. Originally used in conjunction with Hive and the Hive Query Language (HQL), it has expanded to support other toolsets such as Pig and MapReduce programs. There are also plans to increase the HCatalog support for HBase. Working with HCatalog and Hive HCatalog was developed to be used in combination with Hive. HCatalog data structures are defi ned using Hive’s data defi nition language (DDL) and the Chapter 7 ■ Expanding Your Capability with HBase and HCatalog 141 Hive metastore stores the HCatalog data structures. Using the command-line interface (CLI), users can create, alter, and drop tables. Tables are organized into databases or are placed in the default database if none are defi ned for the table. Once tables are created, you can explore the metadata of the tables using commands such as Show Table and Describe Table. HCatalog commands are the same as Hive’s DDL commands except that HCatalog cannot issue state- ments that would trigger a MapReduce job such as Create Table or Select or Export Table. To invoke the HCatalog CLI, launch the Hadoop CLI and navigate to the bin directory of the HCatalog directory. Enter the command hcat.py, which should result in the output shown in Figure 7.7. Figure 7.7: Invoking the HCatalog CLI. To execute a query from the command line, use the -e switch. For example, the following code lists the databases in the metastore: hcat.py -e "Show Databases" At this point, the only database listed is the default database. Defi ning Data Structures You can create databases in HCatalog by issuing a Create Database state- ment. The following code creates a database named fl ight to hold airline fl ight statistics tables: hcat.py -e "Create Database flight" To create tables using HCatalog, you use the Create Table command. When creating the table you need to defi ne the column names and data types. HCatalog supports the same data types and is similar to those supported by most database systems such as integer, boolean, fl oat, double, string, binary, and timestamp. Complex types such as the array, map, and struct are also 142 Part III ■ Storing and Managing Big Data supported. The following code creates an airport table in the fl ight database to hold airport locations: Create Table flight.airport (code STRING, name STRING, country STRING, latitude double, longitude double) Once a table is created, you can use the Describe command to see a list of the columns and data types. If you want to see a complete listing of the table metadata, include the Extended keyword, as follows: Describe Extended flight.airport; Figure 7.8 shows this command output. Figure 7.8: Extended description of the airport table. Once a table is created, you can issue ALTER TABLE statements to do things like changing the table name, alter table properties, change column names, and change column data types. For example, the following command changes the table name: ALTER TABLE airport RENAME TO us_airports To add a new column to the table, you use the following code: ALTER TABLE us_Airports ADD COLUMNS (city String) You can also drop and truncate a table. HCatalog supports the creation of views to fi lter a table. For example, you can create a Canadian airport view using the following command: Create View canadian_airports as Select * from airport where country = 'Canada' Chapter 7 ■ Expanding Your Capability with HBase and HCatalog 143 Views are not materialized and are only logical structures (although there are plans to eventually support materialized views). When the view is referenced in a query, the view’s defi nition is used to generate the rows. Just as with tables, you can alter views and drop views. Creating Indexes When joining tables or performing lookups, it is a good idea for performance to create indexes on the keys used. HCatalog supports the creation of indexes on tables using the CREATE INDEX statement. The following code creates an index on the code column in the us_airports table. Notice that it is passing in a refer- ence to the index handler. Index handlers are pluggable interfaces, so you can create your own indexing technique depending on the requirements: CREATE INDEX Airport_IDX_1 ON TABLE us_airports (code) AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' with deferred rebuild The deferred rebuild command is used to defer rebuilding the index until the table is loaded. After the load, you need to issue an ALTER statement to rebuild the index to refl ect the changes in the data: ALTER INDEX Airport_IDX_1 ON us_airports REBUILD As with most data structures, you can delete an index by issuing the DROP command. The CompactIndexHandler creates an index table in the database to hold the index values. When the index is dropped, the underlying index table is also dropped: DROP INDEX Airport_IDX_1 ON us_airports Along with indexing, another way to improve query performance is through partitioning. In the next section, you will see how you can partition tables in HCatalog. Creating Partitions When working with large data stores, you can often improve queries by par- titioning the data. For example, if queries are retrieved using a date value to restrict the results, a partition on dates will improve performance. To create a partition on a table, you use the partitioned-by command as part of the Create Table statement. The following statement creates a table parti- tioned by the date column: Create Table flightData_Partitioned (airline_cd int,airport_cd string, delay int,dep_time int) Partitioned By(flight_date string); 144 Part III ■ Storing and Managing Big Data Using the describe command, you can view partition information for the table, as shown in Figure 7.9. Figure 7.9: Viewing partition information. You can create partitions on multiple columns, which results in a separate data directory for each distinct combination of values from the partition columns. For example, it may be benefi cial to partition the fl ight data table by both the fl ight date and airport code depending on the amount of data and the types of queries. Another option is to further bucket the partitions using Clustered By and order the data in the buckets with a Sorted By command. The follow- ing statement creates a fl ight data table partitioned by fl ight date, bucketed by airport code, and sorted by departure time: Create Table flightData_Bucketed (airline_cd int,airport_cd string, delay int,dep_time int) Partitioned By(flight_date string) Clustered By(airport_cd) Sorted By(dep_time) into 25 buckets; When loading data into a partitioned table, it is up to you to ensure that the data is loaded into the right partition. You can use Hive to load the tables directly from a fi le. For example, the following statement is used to load a fi le containing daily fl ight data into a table partitioned by date: LOAD DATA INPATH '/flight/data/flightdata_2013-01-01.txt' INTO TABLE flightdata PARTITION(date='2013-01-01') You can also use Hive to load data from one table into another table. For example, you may want to load data from a staging table into the partitioned tables, as illustrated by the following statement: FROM flightdata_stg fds INSERT OVERWRITE TABLE flightdata PARTITION(flight_date='2013-01-01') SELECT fds.airline_cd, fds.airport_cd, fds.delay, fds.dep_time WHERE fds.flight_date = '2013-01-01' Chapter 7 ■ Expanding Your Capability with HBase and HCatalog 145 To load into multiple partitions, you can create a multi-insert statement as follows: FROM flightdata_stg fds INSERT OVERWRITE TABLE flightdata PARTITION(flight_date='2013-01-01') SELECT fds.airline_cd, fds.airport_cd, fds.delay, fds.dep_time WHERE fds.flight_date = '2013-01-01' INSERT OVERWRITE TABLE flightdata PARTITION(flight_date='2013-01-02') SELECT fds.airline_cd, fds.airport_cd, fds.delay, fds.dep_time WHERE fds.flight_date = '2013-01-02' If you have a table partitioned on more than one column, HCatalog supports dynamic partitioning. This allows you to load more effi ciently without the need to know all the partition values ahead of time. To use dynamic partitioning, you need the top-level partition to be static, and the rest can be dynamic. For example, you could create a static partition on month and a dynamic partition on date. Then you could load the dates for the month in one statement. The following statement dynamically creates and inserts data into a date partition for a month’s worth of data: FROM flightdata_stg fds INSERT OVERWRITE TABLE flightdata PARTITION(flight_month='01',flight_date) SELECT fds.airline_cd, fds.airport_cd, fds.delay, fds.dep_time, fds.flight_date WHERE fds.flight_month = '01' One caveat to note is that the dynamic partition columns are selected by order and are the last columns in the select clause. Integrating HCatalog with Pig and Hive Although originally designed to provide the metadata store for Hive, HCatalog’s role has greatly expanded in the Hadoop ecosystem. It integrates with other tools and supplies read and write interfaces for Pig and MapReduce. It also integrates with Sqoop, which is a tool designed to transfer data back and forth between Hadoop and relational databases such as SQL Server and Oracle. HCatalog also exposes a REST interface so that you can create custom tools and applica- tions to interact with Hadoop data structures. In addition, HCatalog contains a notifi cation service so that it can notify workfl ow tools such as Oozie when data has been loaded or updated. Another key feature of HCatalog is that it allows developers to share data and structures across internal toolsets like Pig and Hive. You do not have to explicitly type the data structures in each program. This allows us to use the right tool for the right job. For example, we can load data into Hadoop using HCatalog, perform some ETL on the data using Pig, and then aggregate the 146 Part III ■ Storing and Managing Big Data data using Hive. After the processing, you could then send the data to your data warehouse housed in SQL Server using Sqoop. You can even automate the process using Oozie. To complete the following exercise, you need to download and install the HDP for Windows from Hortonworks. You can set up HDP for Windows on a development server to provide a local test environment that supports a single- node deployment. (For a detailed discussion of installing the Hadoop devel- opment environment on Windows, see http://hortonworks.com/products/ hdp-windows/.) In this exercise, we analyze sensor data collected from HVAC systems moni- toring the temperatures of buildings. You can download the sensor data from http://www.wiley.com/go/microsoftbigdatasolutions. There should be two fi les, one with sensor data (HVAC.csv) and a fi le containing building informa- tion (building.csv). After extracting the fi les, load the data into a staging table using HCatalog and Hive: 1. Open the Hive CLI. Because Hive and HCatalog are so tightly coupled, you can write HCatalog commands directly in the Hive CLI. As a matter of fact, you may recall that HCatalog actually uses a subset of Hive DDL statements. Create the sensor staging table with the following code: CREATE TABLE sensor_stg(dt String, time String, target_tmp Int, actual_tmp Int, system Int, system_age Int,building_id Int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; 2. Load the data into the staging table: LOAD DATA Local INPATH 'C:\SampleData\HVAC.csv' INTO TABLE sensor_stg; 3. Use the following statement to view the data to verify that it loaded. Your data should look similar to Figure 7.10: Select * from sensor_stg Limit 10; Figure 7.10: Sample sensor data. Chapter 7 ■ Expanding Your Capability with HBase and HCatalog 147 4. Using the same process, load the building data into a staging table: CREATE TABLE building_stg(building_id Int, mgr String, building_age Int, hvac_type String, country String) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; LOAD DATA Local INPATH 'C:\SampleData\building.csv' INTO TABLE building_stg; Select * from building_stg Limit 10; 5. Create tables to hold the processed data. The sensor table is partitioned by the date: CREATE TABLE sensor(time String, target_tmp Int, actual_tmp Int,delta_tmp Int, building_id Int) PARTITIONED BY (dt String); CREATE TABLE building(building_id Int, building_age Int, hvac_type String); 6. Because you are going to join the tables using the building ID, you are going to create indexes for the tables using this column: CREATE INDEX Building_IDX_1 ON TABLE sensor (building_id) AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' with deferred rebuild; CREATE INDEX Building_IDX_2 ON TABLE building (building_id) AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' with deferred rebuild; Now you can use Pig to extract, transform, and load the data from staging tables into the analysis tables. 7. Open the Hadoop CLI and browse to the bin directory of the Pig installa- tion folder. Issue the following command to launch the Pig CLI, passing in the switch to use HCatalog: pig.cmd -useHCatalog; This will launch the Grunt, the Pig CLI. 8. Issue the following Pig Latin script to load the data from the staging table: SensorData = load 'sensor_stg' using org.apache.hcatalog.pig.HCatLoader(); You should see an output message indicating that a connection to the metastore is established. 148 Part III ■ Storing and Managing Big Data 9. To ensure that the data is loaded, you can issue a Dump command to output the data to the screen (which will take a few minutes): Dump SensorData; 10. Use the following code to fi lter out nulls and calculate the temperature deltas: FilteredData = Filter SensorData By building_id is not null; ProcessedData = Foreach FilteredData Generate dt, time, target_tmp, actual_tmp,target_tmp - actual_tmp as delta_tmp,building_id; 11. The fi nal step is to load the processed data into the sensor table: STORE ProcessedData INTO 'sensor' USING org.apache.hcatalog.pig.HCatStorer(); Once the table loads, you can close the CLI. 12. Open the Hive CLI and issue the following query to verify that the sensor table has been loaded. Your output should look similar to Figure 7.11. Notice that the data is all coming from the same partition (date): Select * from sensor Limit 10; Figure 7.11: Selecting sensor data. 13. Load the building table with the following query: From building_stg bs Insert Overwrite Table building select bs.building_id, bs.building_age, bs.hvac_type; 14. Now that the tables are loaded, you need to build the indexes: ALTER INDEX Building_IDX_1 ON sensor REBUILD; ALTER INDEX Building_IDX_2 ON building REBUILD; After the index has been built, the data is now ready to analyze. Chapter 7 ■ Expanding Your Capability with HBase and HCatalog 149 You can use Hive to query and aggregate the data. The following query determines the maximum temperature difference between target and actual temperatures for each day and HVAC type: select max(s.delta_tmp), s.dt, b.hvac_type from sensor s join building b on (s.building_id = b.building_id) Group By s.dt, b.hvac_type; In Chapter 11, “Visualizing Big Data with Microsoft BI,” you will see how you can use the Hive ODBC connector to load and analyze the data in Microsoft’s BI toolset. Using HBase or Hive as a Data Warehouse Although both HBase and Hive are both considered data warehouse structures, they differ signifi cantly as to how they store and query data. Hive is more like a traditional data warehouse reporting system. It structures the data in a set of tables that you can join, aggregate, and query on using a query language (Hive Query Language [HQL]) that is very similar to the SQL, which most database developers are already used to working with. This relieves you from having to write MapReduce code. The downside to Hive is it can take a long time to process through the data and is not intended to give clients instant results. Hive is usually used to run processing through scheduled jobs and then load the results into a summary type table that can be queried on by client appli- cations. One of the strengths of Hive and HCatalog is the ability to pass data between traditional relational databases such as SQL Server. A good use case for Hive and HCatalog is to load large amounts of unstructured data, aggre- gate the data, and push the results back to SQL Server where analysts can use the Microsoft BI toolset to explore the results. Another point to consider is that Hive and HCatalog do not allow data updates. When you load the data, you can either replace or append. This makes loading large amounts of data extremely fast, but limits your ability to track changes. HBase, however, is a key/value data store that allows you to read, write, and update data. It is designed to allow quick reads of random access data from large amounts of data based on the key values. It is not designed to provide fast loading of large data sets, but rather quick updates and inserts of single sets of data that may be streaming in from a source. It also is not designed to perform aggregations of the data. It has a query language based on JRuby that is very unfamiliar to most SQL developers. Having said that, HBase will be your tool of choice under some circumstances. Suppose, for example, that you have a huge store of e-mail messages and you need to occasionally pull one for auditing. You may also tag the e-mails with identifying fi elds that may occasionally need updating. This is an excellent use case for HBase. 150 Part III ■ Storing and Managing Big Data If you do need to aggregate and process the data before placing it into a summary table that needs to be updated, you can always use HBase and Hive together. You can load and aggregate the data with Hive and push the results to a table in HBase, where the data summary statistics can be updated. Summary This chapter examined two tools that you can use to create structure on top of your big data stored in HDFS. HBase is a tool that creates key/value tuples on top of the data and stores the key values in a columnar storage structure. Its strength is that it enables fast lookups and supports consistency when updating the data. The other tool, HCatalog, offers a relational table abstraction layer over HDFS. Using the HCatalog abstraction layer allows query tools such as Pig and Hive to treat the data in a familiar relational architecture. It also permits easier exchange of data between the HDFS storage and relational databases such as SQL Server and Oracle. 154 Par t IV ■ Working with Your Big Data Pig enables users to analyze large data sets. It supports a number of built-in transformations for the data, and additional transformations can be added as user-defi ned functions through custom coding. It was originally developed as a way to reduce the complexity of writing MapReduce jobs, but it has evolved into a fully featured transformation tool for Hadoop data. Because each of these tools has strengths and weaknesses, the fi nal part of this chapter focuses on helping you decide which tool is the best choice for dif- ferent scenarios you may encounter when moving your data. Combining Big Data and SQL Server Tools for Better Solutions As with many platforms, both Hadoop and SQL Server have strengths and weaknesses. By using the tools together, you can help leverage the strengths of each platform while mitigating the weaknesses. The Hadoop platform’s strengths derive from its capabilities to scale out eas- ily to accommodate growing amounts of data. It also can handle a wide variety of data formats with little up front transformation effort. However, it has few restrictions or validations on the data it stores, and it doesn’t have the same end-user capabilities and ecosystem that SQL Server has developed. SQL Server, however, handles enforcing data quality and consistency much better than Hadoop, and a wide variety of tools and clients enable you to per- form different analytic, reporting, and development tasks with SQL Server. Challenges with SQL Server include scaling it to handle massive amounts of data and also support for easily storing loosely formatted data. Why Move the Data? When your environment includes both Hadoop and SQL Server, you will some- times need to move the data between the two systems. One example of this is importing data into Hadoop from an online transactional processing (OLTP) system using SQL Server, such as sales data from an order-processing system. There are a number of reasons to move data from SQL Server into Hadoop: ■ Archival and data retention: Hadoop storage can be less expensive than SQL Server, and the costs of scaling up are generally less. In addition, keeping extensive, infrequently accessed historical data in an OLTP data- base creates overhead that negatively impacts performance. Moving this data to Hadoop can save the physical resources on your SQL Server for processing transactions quickly and effi ciently. ■ Analytics: Hadoop offers tools like Mahout for data mining and machine learning that can be applied to your Hadoop data. These tools leverage Chapter 8 ■ Effective Big Data ETL with SSIS, Pig, and Sqoop 155 Hadoop capabilities for processing large amounts of data. Moving data from SQL Server to Hadoop enables the use of these tools. ■ Transformation and aggregation: Data can be transformed in a number of ways on both the SQL Server and Hadoop platforms. However, the Hadoop architecture enables you to distribute data transformations over a cluster, which can dramatically speed up transformation of large amounts of data. The reverse is also true. You will fi nd the need to extract data from Hadoop and place it in SQL Server. Common scenarios for this include the following: ■ Business analysis and reporting: SQL Server has more options and more robust end-user tools for doing data exploration, analysis, and reporting. Moving the data into SQL Server enables the use of these tools. ■ Integration: The results of your Hadoop analytics, transformations, and aggregation may need to be integrated with other databases in your organization. ■ Quality/consistency: SQL Server, as a relational database, offers more capabilities to enforce data quality and consistency rules on the data it stores. It does this by enforcing the rules when the data is added to the databases, giving you confi dence that the data already conforms to your criteria when you query it. NOTE SQL Server has more tools available today, but this is changing quickly. More vendors are adding the ability to interact with Hadoop directly into their tools, and the quality of the end-user experience is getting better as the competition in this space increases. Transferring Data Between Hadoop and SQL Server One key consideration for moving data between Hadoop and SQL Server is the time involved. With large data volumes, the data transfers often need to be scheduled for times when other workloads on the relevant servers are light. Hadoop is optimized for batch data processing. Generally, when writing data to Hadoop, you will see the best performance when you set up the processing to handle large batches of data for import, instead of writing single or small numbers of rows to Hadoop each time. Remember that, by default, Hadoop uses a 64MB block size for fi les, and it functions best when the fi le size exceeds the block size. If you need to process smaller numbers of rows, consider storing them in a temporary table in SQL Server or a temporary fi le and only writing them to Hadoop when the data size is large enough to make it an effi cient operation. When writing data back to SQL Server, you generally want to make sure that the data is aggregated. This will let you write a smaller amount of data to the SQL Server environment. Another concern when writing data to SQL Server is 156 Par t IV ■ Working with Your Big Data how much parallel write activity you want to allow. Depending on your method of doing the transfer, you could enable writing to the SQL Server from a large number of Hadoop nodes. SQL Server can handle parallel clients inserting data, but having too many parallel streams of insert activity can actually slow down the overall process. Finding the right amount of parallelism can involve some tuning, and requires you to understand the other workloads running on your SQL Server at the same time. Fortunately, you can control the amount of paral- lelism when moving data to SQL Server in a number of ways, as are covered for each technology. Working with SSIS and Hive SSIS doesn’t currently support direct connectivity to Hadoop. However, using Hive and Open Database Connectivity (ODBC), you can leverage data in your Hadoop system from SSIS. This involves a few steps: 1. Making sure that Hive is confi gured properly 2. Verifying that you can access Hive from the computer running SSIS 3. Verifying that the data you want to access in Hadoop has a table defi ned for it in Hive After going through this setup, you gain the ability to query your Hadoop data in SSIS (and other tools) as if it resides in a relational database. This offers the lowest-friction approach to using your Hadoop data in SSIS. Writing data to Hadoop from SSIS is a little more challenging. In Chapter 6, “Adding Structure with Hive,” we discussed that Hive only supports bulk insert operations, in keeping with the Hadoop approach of “write-once” large fi les. Unfortunately, Hive uses some nonstandard SQL to handle these bulk inserts, and the available ODBC drivers don’t fully support it. Therefore, writing to Hadoop from Integration Services is best accomplished by writing a fi le out to the fi le system and then moving it into the Hadoop fi le system. The next sections cover these operations in detail, so that you can confi gure your SSIS packages to both retrieve data from Hadoop and move data into it. The instructions assume the use of the Hortonworks Hadoop distribution. Hadoop can be installed on the same computer where you have SSIS installed. However, in a production environment, these will likely be on two different machines. This does present a few additional constraints because SSIS cannot currently interact directly with Hadoop, and without a local installation of Hadoop, it cannot access the Hadoop tools. The approaches to work around these constraints are covered next. Chapter 8 ■ Effective Big Data ETL with SSIS, Pig, and Sqoop 159 However, you will need to set it up twice, once as 32-bit and once as 64-bit, so that it’s available to both types of applications. Using a connection string for connections eliminates the need for this. Confi guring the Hive ODBC Connection You can confi gure your Hive ODBC connection in two ways. One option is to confi gure a DSN. This stores the connection parameter information locally on the computer and results in a simpler connection confi guration in SSIS. The other approach is to use a connection string that contains all the parameters for the connection. This option is a little more complex to set up, but it makes your SSIS packages more portable because they don’t rely on a DSN being set up on the local computer, and you don’t have to worry about setting up both 64-bit and 32-bit versions of the DSN. The process for setting up either is fairly similar: 1. You start by opening up the ODBC Administrator application, using the 32-bit or 64-bit version. 2. Select the DSN type from the tabs across the top of the administrator interface (see Figure 8.1). 3. Choose the System DSN if you want to store the connection information on the computer, and choose the File DSN if you want a portable con- nection string. The process differs only in that with a File DSN you have to select a location to store the fi le. This example will proceed with a fi le DSN setup because that is a more common scenario. Figure 8.1: ODBC Administrator interface. 160 Par t IV ■ Working with Your Big Data 4. Add the new connection by using the Add button, and choose the Hortonworks Hive ODBC driver from the resulting wizard. 5. At the next step, choose a location to store the File DSN. You will be opening this fi le in just a moment, so use an easily accessible location, like the desktop. 6. Finish walking through the wizard, and at the end, you will be presented with the Hive ODBC driver’s property screen (see Figure 8.2). Figure 8.2: Hortonworks ODBC driver properties. 7. Replace the Host entry with the name or IP address of the server hosting your Hive installation. NOTE If you choose Test at this point, it should connect successfully. 8. The Advanced Options dialog box also contains a few items of interest, though the default will work in most cases. The two properties that you are most likely to need to change under Advanced Options are Rows Fetched per Block and Default String Column Length. Rows Fetched per Block tells Hive how many rows to return in a batch. The default of 10,000 will work in most scenarios; but if you are having performance issues, you can adjust that value. The Default String Column Length property tells the driver what to report back as the string length to SSIS. As noted in Chapter 6, Hive doesn’t inherently track the length of string columns. However, SSIS is much more particular about this and requires the driver to report 162 Par t IV ■ Working with Your Big Data into Hive by accessing the Hadoop Distributed File System (HDFS) and moving fi les in the appropriate format to the appropriate directory. NOTE In diff erent SSIS versions, you work with Hive in diff erent ways. SSIS 2008 R2 and earlier support ODBC through an ADO.NET wrapper, whereas SSIS 2012 and later let you access ODBC directly. Generally, direct access results in better performance, so 2012 is recommended for use with Hive, but it’s not required. Another diff erence between SSIS 2008 R2 and 2012 is that the develop- ment environment has changed names. In 2008 R2 and earlier, it is known as Business Intelligence Developer Studio (BIDS). In 2012 and later, it is named SQL Server Data Tools - Business Intelligence (SSDT-BI). It works essentially the same in both versions, so instructions provided for one will apply to the other, unless specifi cally noted otherwise. Confi guring a Connection Manager for Hive To extract data from Hive, you can set up an ODBC connection to the Hive data- base, using the connection string created in the section “Connecting to Hive.” The fi rst step in this process is creating a package and setting up a connection manager. If you are using SSIS 2008 R2 or earlier versions, you must use an ADO.NET connection manager to use the ODBC driver: 1. To create a package, choose New Connection from the SSIS menu in Business Intelligence Developer Studio (BIDS). 2. Choose the ADO.NET item from the list. 3. In the resulting screen, make sure that you change the Provider option at the top of the screen to .NET Providers\ODBC Data Provider. This uses the ODBC provider through an ADO.NET wrapper. 4. After you have selected the ODBC Data Provider, you can provide either the DSN or input the connection string you created in the section “Connecting to Hive.” NOTE If you are using SSIS 2012 or later, you can take advantage of native support for ODBC by creating an ODBC connection manager. This will present the same inter- face for selecting the DSN or connection string to use. However, it bypasses the ADO .NET provider selection, and generally performs better at run time, thanks to the skip- ping of the additional ADO.NET provider layer. To use this, select the ODBC item from the list of connections. Note that the ODBC connection type will not be displayed when you right- click in the connection manager area of the package designer. You must choose New Connection to display the full list of connections in order to select the ODBC connec- tion type. Chapter 8 ■ Effective Big Data ETL with SSIS, Pig, and Sqoop 163 After selecting the DSN or adding the connection string to the connection manager, you will have the opportunity to provide the username and password. By default, Hive does not need a password, only the username (hadoop, by default). If you do specify a password, remember that SSIS treats it as a sensitive property, so you will need to encrypt the password or set it when the package is loaded. You can do this through a confi guration in 2008 R2 and earlier, or by using a parameter in SSIS 2012 or later. Extracting Data from Hive To retrieve data from Hive in SSIS 2008 R2, you can add an ADO.NET Source component to a data fl ow task (see Figure 8.3). When confi guring the source component, select the Hive connection manager, and specify a SELECT statement for your Hive table. You can preview the data here as well to confi rm that the results are what you expect. Figure 8.3: ADO.NET Source. TIP If you are using SSIS 2012, you can use the ODBC Source instead of the ADO.NET Source to gain a little performance. The interface is almost identical, and setting it up 164 Par t IV ■ Working with Your Big Data is exactly the same as the ADO.NET Source except that you will pick an ODBC connec- tion manager rather than an ADO.NET connection manager. For the best SSIS experience, make sure that the query being run against Hive returns the smallest number of rows and columns possible. Hive tables can be quite large, so it’s important for package performance that you minimize the amount of data retrieved by SSIS. One way to do this is to not use SELECT * FROM table in the queries; instead, list the specifi c columns that you need to use. Also, you will generally want to apply some sort of WHERE clause to the query to reduce the number of rows returned. Making the Query Dynamic You may have noticed that neither the ADO.NET Source nor the ODBC Source offers the ability to parameterize the queries. In many cases, you will need the WHERE clause to be dynamic at run time. To achieve this, you can use expres- sions on the SQLCommand properties exposed by the source components. An SSIS expression enables you to create a formula that is evaluated at run time. One common scenario with Hive that requires the use of expressions is run- ning a query that fi lters the results by a specifi ed time period. For an example of this, do the following: 1. Add two DateTime variables to your package, and confi gure them with an appropriate date range. 2. Select the background of the Data Flow Task containing your Hive source component. 3. In the Properties window, select the Expressions property and click the ellipsis button to bring up the Property Expression dialog. 4. Create a new entry for the property named [NameOfYourSource]. [SqlCommand]. 5. In the expression, enter the following: "SELECT * FROM Customer WHERE LastSaleDate BETWEEN \"" + (DT_WSTR, 50) @[User::FromDate] + "\" AND \"" + (DT_WSTR, 50) @[User::ToDate] + "\"" This creates an expression that will set the SqlCommand property dynami- cally at run time using the values stored in the FromDate and ToDate variables. If you take this approach, the resulting SSIS package is more fl exible and can incorporate dynamic fi ltering of the Hive source. WARNING The use of expressions in SSIS packages can be done in any ver- sion of SSIS. However, you want to be aware of an important pre-SSIS 2012 restric- tion. In earlier versions, expressions could not interact with any string longer than Chapter 8 ■ Effective Big Data ETL with SSIS, Pig, and Sqoop 165 4,000 characters, and couldn’t return a result longer than 4,000 characters. When creating long SQL statements using expressions, it is possible to exceed this limit. Unfortunately, in this case, the only solution is to reduce the length of your SQL. Fortunately, SSIS 2012 has removed that restriction, making it much easier to work with if you need to use long SQL queries. After your Hive data has been retrieved into SSIS, you can use it as you would any other data source, applying any of the SSIS transformations to it, and send it to any destination supported by SSIS. However, data retrieved from Hive often contains null values, so it can be worthwhile to make sure that your SSIS package handles null values appropriately. NOTE Hive tends to return null values because it applies schema on read rather than on write. That is, if Hive reads a value from one of the underlying fi les that makes up a table, and that value doesn’t match the expected data type of the column, or doesn’t have appropriate delimiters, Hive will return a null value for the column. This is good in that it doesn’t stop your query from processing, but it can result in some strange behavior in SSIS if you don’t have appropriate null handling in your package. Loading Data into Hadoop As noted earlier, SSIS cannot write to Hive directly using ODBC. The alterna- tive is to create a fi le with the appropriate fi le format and copy it directly to the Hadoop fi le system. If it is copied to the directory that Hive uses for managing the table, your data will show up in the Hive table. If your SSIS environment has the Hadoop tools loaded on it, loading data into Hadoop can be as simple as calling the dfs -put command from an Execute Process task: hadoop dfs -put \\HDP1-3\LandingZone\MsBigData\Customer1.txt /user/MsBigData/Customer1.txt This moves the fi le from the local fi le system to the distributed fi le system. However, it can be a little more complex if you do not have a Hadoop installa- tion on your SSIS server. In this case, you need a way to execute the dfs -put command on the remote server. Fortunately, several tools enable you to execute remote processes. The appro- priate tool depends on what operating system is running on your Hadoop cluster. If you are using Linux, you can use the SSH shell application to execute the remote process. To run this from your SSIS package, you can install a tool called puTTy on your SSIS server. This tool enables you to run SSH commands on the remote computer from an Execute Process task. If your Hadoop environment is hosted on a Windows platform, using the Hortonworks distribution, you can use PsExec, a tool from Microsoft that enables 166 Par t IV ■ Working with Your Big Data you to execute remote processes on other servers. To use this in SSIS, you call it from an Execute Process task. NOTE Security issues with PsExec are one of the more common challenges when using it. Make sure that the command line you are sending to PsExec is valid by test- ing it on the target computer fi rst. Then ensure the user account you are running the PsExec command under has permissions to run the executable on the remote com- puter. One easy way to do this is to log in to the target computer as the specifi ed user and run the executable. Finally, ensure that the account running the package matches the account you tested with. Setting up a package to implement this process is relatively straightforward. You set up a data fl ow task as normal, with a source component retrieving data from your choice of sources. Any transformations that need to be applied to the data can be performed. As the last step of the data fl ow, the data needs to be written to a fi le. The format of the fi le is determined by what the Hive system expects. The easiest format to work with from SSIS is a delimited format, with carriage return / line feeds delimiting rows, and a column delimiter like a comma (,) or vertical bar (|) separating column values. The SSIS Flat File Destination is designed to write these types of fi les. NOTE The default Hive column delimiter for fl at fi les is Ctrl-A (0x001). Unfortunately, this isn’t supported for use from SSIS. If at all possible, use a column delimiter that SSIS supports. If you must use a non-standard column delimiter, you will need to add a post-processing step to your package to translate the column delimiters after the fi le is produced. NOTE If Hive is expecting another format (see Chapter 6 for some of the possibili- ties), you might need to implement a custom destination using a script component. Although a full description of this is beyond the scope of this chapter, a custom des- tination lets you fully control the format of the fi le produced, so you can match any- thing that Hive is expecting. Once the fi le is produced, you can use a fi le system task to copy it to a network location that is accessible to both your SSIS server and your Hadoop cluster. The next step is to call the process to copy the fi le into the HDFS. This is done through an Execute Process task. Assuming that you are executing the Hadoop copy on a remote system using PsExec, you confi gure the task with the following property settings. (You might need to adjust your fi le locations): ■ Executable: C:\Sysinternals\PsExec.exe ■ Arguments: \\Your_Hadoop_Server C:\hdp\hadoop\hadoop-\bin\hadoop.cmd dfs -put \\CommonNetworkLocation\LandingZone\Customer1.txt /user /MsBigData/Customer1.txt Chapter 8 ■ Effective Big Data ETL with SSIS, Pig, and Sqoop 167 The Execute Process task can be confi gured to use expressions to make this process more dynamic. In addition, if you are moving multiple fi les, it can be used inside a For Each loop in SSIS to repeat the process a specifi ed number of times. Getting the Best Performance from SSIS As touched on earlier, one way to improve SSIS performance with big data is to minimize the amount of data that SSIS actually has to process. When querying from Hive, always minimize the number of rows and columns you are retriev- ing to the essential ones. Another way of improving performance in SSIS is by increasing the parallel activity. This has the most benefi t when you are writing to Hadoop. If you set up multiple, parallel data fl ows, all producing data fi les, you can invoke multiple dfs -put commands simultaneously to move the data fi les into the Hadoop fi le system. This takes advantage of the Hadoop capability to scale out across multiple nodes. Increasing parallelism for packages reading from Hive can have mixed results. You get a certain amount of parallelism when you query from Hive in the fi rst place because it spreads the processing out across the cluster. You can attempt to run multiple queries using different ODBC source components in SSIS simul- taneously, but generally it works better to issue a single query and let Hive determine how much parallelism to use. SSIS is a good way to interact with Hadoop, particularly for querying infor- mation. It’s also a familiar tool to those in the SQL Server space. Thanks to the number of sources and destinations it supports, it can prove very useful when integrating your big data with the rest of your organization. Transferring Data with Sqoop Sqoop is a tool designed to import and export data from Hadoop systems to other data stores, particularly relational databases. This can prove very useful for easily moving data from a SQL Server database into Hadoop or for retrieving data from Hadoop and storing it in SQL Server. Sqoop uses MapReduce to do the actual data processing, so it takes full advantage of the parallel processing capabilities of Hadoop. One of the reasons that Sqoop is easy to use is that it infers the schema from the relational data store that it is interacting with. Because of this, you don’t have to specify a lot of information to use it. Instead, it determines column names, types, and formats from the relational defi nition of the table. 168 Par t IV ■ Working with Your Big Data Behind the scenes, Sqoop is creating logic to read and write the relational data through generated code classes. This means that most operations are performed on a row-by-row basis, so it may not deliver the most optimal performance. Certain databases, like MySQL, do have options to use bulk interfaces with Sqoop, but currently, SQL Server does not. Sqoop uses Java Database Connectivity components to make connections to relational databases. These components need to be installed on the computer where Sqoop is run. Microsoft provides a JDBC driver archive for SQL Server at http://msdn.microsoft.com/en-us/sqlserver/aa937724.aspx. After down- loading the archive, you need to extract the appropriate .jar fi le in your Sqoop lib directory (on a Hortonworks default installation, C:\hdp\hadoop\sqoop-\lib) so that Sqoop can locate the driver. Copying Data from SQL Server To move data from SQL Server to Hadoop, you use the sqoop -import com- mand. A full example is shown here: sqoop import --connect "jdbc:sqlserver://Your_SqlServer;database=MsBigData; Username=demo;Password=your_password;" --table Customers --m 1 --target-dir /MsBigData/Customers The fi rst argument of the command, --connect, determines what type of driver you will use for connecting to the relational database. In this case, the command is specifying that Sqoop will use the SQL Server JDBC driver to connect to the database. NOTE When specifying the connection to the database, you should use the server name or IP address. Do not use localhost, because this connection string will be sent to all the cluster nodes involved in the job, and they will attempt to make their own connections. Because localhost refers to the local computer, each node will attempt to connect to the database as if it exists on that node, which will likely fail. You may notice that the --connect argument contains the full connection string for the database. Ideally, you will use Windows Authentication in the connection string so that the password doesn’t have to be specifi ed. You can also use the --password-file argument to tell Sqoop to use a fi le that stores the password, instead of entering it as part of the command. The --table argument tells Sqoop which table you intend to import from the specifi ed database. This is the table that Sqoop will derive its metadata from. Chapter 8 ■ Effective Big Data ETL with SSIS, Pig, and Sqoop 169 By default, all columns within the table are imported. You can limit the column list by using the --columns argument: --columns "FirstName,LastName,City,State,PostalCode" You can also fi lter the rows returned by Sqoop by using the --where argu- ment, which enables you to specify a where clause for the query: --where "State='FL'" If you need to execute a more complex query, you can replace the --table, --columns, and --where arguments with a --query argument. This lets you specify an arbitrary SELECT statement, but some constraints apply. The SELECT statement must be relatively straightforward; nested tables and common table expressions can cause problems. Because Sqoop needs to split up the data to process it in parallel, you must also provide some additional information: --query 'SELECT customer.*, sales.* FROM customer JOIN sales on (customer.id == sales.customerId) WHERE $CONDITIONS' --split-by customer.id The WHERE $CONDITIONS portion of the query provides a placeholder for the criteria Sqoop uses to split up processing. The --split-by argument tells Sqoop which column to use when determining how to split up the data from the input query. By default, if the import is referencing a table instead of a query, the table’s primary key is used as the split column. The --m argument controls how many parallel activities are created by Sqoop. The default value for this is 4. Setting it to 1, as in this example, means that the Sqoop process will be single threaded. WARNING Although increasing the parallel activities can improve performance, you must be careful not to increase it too much. Increasing the --m argument past the number of nodes in your cluster will adversely impact performance. Also, the more parallel activities, the higher the load on the database server. Finally, the --target-dir argument determines what folder the data will be written into on the Hadoop system. You can control whether the new data is added to an existing directory by using the --append argument. And you can import using Hive rather than a directory by specifying the --hive-import and --hive-table arguments: sqoop import --connect "jdbc:sqlserver://Your_SqlServer;database=MsBigData; Username=demo;Password=your_password;" --table Customers --m 1 --target-dir /MsBigData/Customers --append 170 Pa r t IV ■ Working with Your Big Data sqoop import --connect "jdbc:sqlserver://Your_SqlServer;database=MsBigData; Username=demo;Password=your_password;" --table Customers --m 1 --hive-import --hive-table CustomerImport NOTE There is also a Sqoop import-all-tables command. This imports all tables and all their columns from the specifi ed database. It functions well only if all the tables have single column primary keys. Although you can specify a list of tables to exclude with this command, it has less fl exibility and control than importing indi- vidual tables. Because of this, it is recommended that you import tables one at a time in most cases. Copying Data to SQL Server The Sqoop export command enables you to export data from Hadoop to rela- tional databases. As with the import command, it uses the table defi nition in the relational database to derive metadata for the operation, so it requires that the database table already exists before you can export data to the database: sqoop export --connect "jdbc:sqlserver://Your_SqlServer;database=MsBigData; Username=demo;Password=your)password;" --table Customers --export-dir /MsBigData/Customers The arguments for the export command are similar to the import command. However, you have fewer options with the export. --export-dir indicates the folder in the Hadoop fi le system that will be used as the source for records to load into the database. The --table argument indicates the relational table that will be populated from Hadoop. Alternatively, you can use the --call argument to indicate that a stored procedure should be called for each row of information found in the Hadoop system. If you do not specify the --call argument, by default Sqoop generates an INSERT statement for each record found in the Hadoop directory. By specifying the --update-key argument and indicating a key column or columns, you can modify this behavior to generate UPDATE statements rather than INSERTs. You can use the --update-mode argument to indicate rows that don’t already exist in the target table should be inserted, and rows that do exist should be updated: sqoop export --connect "jdbc:sqlserver://Your_SqlServer;database=MsBigData; Username=demoPassword=your)password;" --table Customers --export-dir /MsBigData/Customers --update-key ID --update-mode allowinsert Chapter 8 ■ Effective Big Data ETL with SSIS, Pig, and Sqoop 171 Exports done using Sqoop commit to the target database every 10,000 rows. This prevents excessive resources from being tied up on the database server managing large transactions. However, it does mean that the exports are not atomic and that a failure during execution may leave a partial set of rows in the target database. The --m argument controls the amount of parallel activity, just as it does with the import. The same warnings and caveats apply to its use with export. Particularly in the case of exports, because Sqoop does its operations on a row- by-row basis, running a large number of parallel nodes can have a very negative impact on the target database. Sqoop is a useful tool for quickly moving data in and out of Hadoop, particu- larly if it is a one-time operation or the performance is not particularly important. Using Pig for Data Movement Pig was originally developed for much the same reasons as Hive. Users needed a way to work with MapReduce without becoming Java developers. Pig solves that problem by providing a language, Pig Latin, that is easy to understand and allows the developer to express the intent of the data transformation, instead of having to code each step explicitly. Another major benefi t of Pig is its ability to scale, so that large data transfor- mation processes can be run across many nodes. This makes processing large data sets much more feasible. Because Pig uses MapReduce under the covers, it benefi ts from MapReduce’s ability to scale across the nodes in your Hadoop cluster. Pig does come with some downsides. It cannot natively write to other data stores, so it is primarily useful for transforming data inside the Hadoop eco- system. Also, because there is some overhead in preparing and executing the MapReduce jobs, it’s not an ideal choice for data transformations that are trans- actional in nature. Instead, it does best when processing large amounts of data in batch operations. Transforming Data with Pig Pig can be run in a batch or interactive mode. To run it in batch, simply save your Pig commands to a fi le and pass that fi le as an argument to the Pig execut- able. To run commands interactively, you can run the Pig executable from the command prompt. Pig uses a language, Pig Latin, to defi ne the data transformations that will be done. Pig Latin statements are operators that take a relation and produces another relation. A relation, in Pig Latin terms, is a collection of tuples, and a 172 Pa r t IV ■ Working with Your Big Data tuple is a collection of fi elds. One way to envision this is that a relation is like a table in a database. The table has a collection of rows, which is analogous to the tuples. The columns in the row are analogous to the fi elds. The primary dif- ference between a relation and a database table is that relations do not require that all the tuples have the same number or type of fi elds in them. An example Pig Latin statement follows. This statement loads information from Hadoop into a relation. Statements must be terminated with semicolons, and extra whitespace is ignored: source = LOAD '/MsBigData/Customer/' USING PigStorage() AS (name, city, state, postalcode, totalpurchaseamount); In this case, the result of the LOAD function is a relation that is being assigned to the alias of source. The alias allows the relation to be referred in later state- ments. Also, while this example declares the fi elds that will be retrieved, it is not required to defi ne them. In fact, you may have noticed that there is no type defi nition. Pig can reference fi elds by ordinal position or name, if provided, and data values will be implicitly converted as needed. The LOAD function is using the PigStorage() function. This is the default storage function, which allows access to Hadoop fi les and supports delimited text and the standard binary formats for Hadoop. Additional storage functions can be developed to allow Pig to communicate with other data stores. To reduce the number of tuples (rows) in the relation, you can apply a fi lter to it using the FILTER function. In this case, the FILTER is being applied to the source alias created in the previous statement: filtered = FILTER source BY state = 'FL'; The relation produced by this statement is assigned to an alias of filtered. You can also group the data using a GROUP function. The following statement results in a new relation that contains a tuple for each distinct city, with one fi eld containing the city value and another fi eld containing a collection of tuples for the rows that are part of the group: grouped = GROUP filtered BY city; You can look at this as producing a new table, with any rows belonging to the same grouped value being associated with that row: grouped | filtered Jacksonville | (John Smith, FL, 32079, 10000), (Sandra James, FL, 32079, 8000) Tampa | (Robert Betts, FL, 32045, 6000) | (Tim Kerr, FL, 32045, 1000) Miami | (Gina Jones, FL, 32013, 7000) Chapter 8 ■ Effective Big Data ETL with SSIS, Pig, and Sqoop 173 When you need to operate on columns, you can use the FOREACH function. It is used when working with data like that shown here, because it runs the associ- ated function for each value in the specifi ed column. If you want to produce an average totalpurchaseamount for each city, you can use the following statement: averaged = FOREACH grouped GENERATE group, AVG(filtered.totalpurchaseamount); To order the results, you can use the ORDER function. In this case, the $2 indicates that the statement is using the ordinal column position, rather than addressing it by name: ordered = ORDER averaged BY $2 DESC; To store the results, you can call the STORE function. This lets you write the values back to Hadoop using the PigStorage() functionality: STORE ordered INTO 'c:\SampleData\PigOutput.txt' USING PigStorage(); If you take this entire set of statements together, you can see that Pig Latin is relatively easy to read and understand. These statements could be saved to a fi le as a Pig script and then executed as a batch fi le: source = LOAD '/MsBigData/Customer/' USING PigStorage() AS (name, city, state, postalcode, totalpurchaseamount); filtered = FILTER source BY state = 'FL'; grouped = GROUP filtered BY city; averaged = FOREACH grouped GENERATE group, AVG(filtered.totalpurchaseamount); ordered = ORDER averaged BY $2 DESC; STORE ordered INTO 'c:\SampleData\PigOutput.txt' USING PigStorage(); NOTE Pig scripts are generally saved with a .PIG extension. This is a convention, but it is not required. However, it does make it easier for people to fi nd and use your scripts. Another key aspect of Pig Latin is that the statements are declarative rather than imperative. That is, they tell Pig what you intend to do, but the Pig engine can determine the best way accomplish the operation. It may rearrange or combine certain operations to produce a more effi cient plan for accomplishing the work. This is similar to the way SQL Server’s query optimizer may rewrite your SQL queries to get the results in the fastest way possible. Several functions facilitate debugging Pig Latin. One useful one is DUMP. This will output the contents of the specifi ed relation to the screen. If the relation contains a large amount of data, though, this can be time-prohibitive to execute: DUMP source; 174 Pa r t IV ■ Working with Your Big Data DESCRIBE outputs the schema of a relation to a console. This can help you understand what the relation looks like after various transformations have been applied: DESCRIBE grouped; EXPLAIN shows the planned execution model for producing the specifi ed rela- tion. This outputs the logical, physical, and MapReduce plans to the console: EXPLAIN filtered; ILLUSTRATE shows the data steps that produce a given relation. This is different from the plan, in that it actually displays the data in each relation at each step: ILLUSTRATE grouped; A large number of other functions are available for Pig. Unfortunately, space constraints do not allow a full listing here. You can fi nd complete documenta- tion on Pig at http://pig.apache.org. Using Pig and SSIS Together Pig’s primary advantage over SSIS is its ability to scale the workload across multiple nodes in a cluster. When you are doing data transformations over large amounts of data, being able to scale out is a signifi cant advantage. Data operations that might otherwise take days to run on a single node may take just hours when spread across multiple nodes. A primary concern for many business users is timeliness. If producing the data they are requesting takes multiple days, that data might no longer be useful. A proven useful pattern is to offl oad parts of the data transformation that involve large amounts of data to Pig and then consume the results in SSIS for any fi nal transformation and land the data in the appropriate data store. You can execute the Pig scripts from an Execute Process task, in the same manner used to move fi les into Hadoop storage from SSIS. The Pig scripts should produce a delimited text fi le because this is easy for SSIS to consume. After the output fi le has been created, you can read the text from the Hadoop fi le system, or you can access it via a Hive connection in SSIS. From that point, SSIS can use the data as if it came from any other source. This lets you take advantage of parallel processing in Hadoop while still using SSIS capabilities to land the data in any SSIS supported data store. NOTE If the data is supposed to remain in Hadoop, in most cases it won’t make sense to send it through SSIS, because doing so introduces some overhead. Unless you have need of some special transformation in SSIS, using Pig is probably a better option if the source and target are both Hadoop. Even if there is a unique transforma- tion in SSIS, remember that Pig can be extended with custom functions as well. Using Chapter 8 ■ Effective Big Data ETL with SSIS, Pig, and Sqoop 175 the streaming model for the Pig transformation will even allow you to build these transformations using .NET. Choosing the Right Tool You have a variety of options for integrating your SQL Server and Hadoop environments. As with any set of tools, each option has pros and cons too, and the best one to use will vary depending on the use case and specifi c require- ments that you have. In addition, existing skillsets can impact tool choice. It is worthwhile being familiar with the different strengths of each tool, because some scenarios may be much easier to accomplish in one tool or another. The following sections lay out some of the advantages and disadvantages of each tool, as well as scenarios where you might want to use them. Use Cases for SSIS SSIS works well in cases where you have the following: ■ Staff trained on SSIS ■ The need to do additional transformations on the data after reading it from Hive or prior to writing it to Hadoop ■ Performance tuning is important SSIS is the best fi t for shops that are already invested in SSIS and that need to incorporate Hadoop data into existing data-integration processes. However, SSIS does not have an inherent ability to scale out. In cases where there is sig- nifi cant data processing to be done, the best results can come from a hybrid solution leveraging both SSIS and Pig. SSIS delivers the integration with other data sources and destinations, and Pig delivers the ability to scale transforma- tion of data across a Hadoop cluster. Use Cases for Pig Pig is best used in the following cases: ■ The amount of data to be processed is too much to be handled in SSIS. ■ You need to take advantage of the scalability of Hadoop. ■ Your IT staff is comfortable learning a new language and tool. ■ Your Hadoop data is stored in standard Hadoop binary fi le formats. Pig proves quite useful when you need the data transformation to happen on your Hadoop cluster so that the process scales and conserves resources on 176 Pa r t IV ■ Working with Your Big Data your SSIS systems. Using it along with SSIS can deliver the best of both worlds: a solution that scales with Hadoop and that has the extensive integration capa- bilities of SSIS. In addition, if the data doesn’t need to leave Hadoop storage, Pig is a natural fi t. Use Cases for Sqoop Sqoop proves most useful in the following cases: ■ There is little need to transform the data being moved between SQL Server and Hadoop. ■ The IT staff isn’t comfortable with SSIS or Pig. ■ Ease of use is a higher priority than performance. ■ Your Hadoop data is stored in standard Hadoop binary fi le formats. Sqoop primarily comes into play for either simple table replication scenarios or for one-time data import and export from Hadoop. Because of the reduced control over transformations and lack of fi ne-grained tuning capability, it generally doesn’t work as well in production-level data integration unless the integration is limited to replicating tables. Summary This chapter reviewed multiple methods of integrating your existing SQL Server environment with your big data environment, along with the pros and cons of each. SSIS was discussed, along with how to set it up for communication with Hive via ODBC and how to get the best performance from it. Sqoop was also covered, as a useful tool for handling bulk data import and export from Hadoop. A third option, Pig, was discussed, with a description of how you can leverage it to take advantage of Hadoop scalability and how it can be part of an SSIS solution to create a better solution overall. The chapter concluded by looking at when each tool is most applicable. Chapter 9 ■ Data Research and Advanced Data Cleansing with Pig and Hive 179 designed and tuned to process large data sets involving a number of steps. As such, it is primarily an extraction transform load (ETL) tool. In addition, like all Hadoop processing, it relies on map-reduce jobs that can be run in paral- lel on separate chunks of data and combined after the analysis to arrive at a result. For example, it would be ideal to look through massive amounts of data measurements like temperatures, group them by days, and reduce it to the max temperature by day. Another factor to keep in mind is the latency involved in the batch processing of the data. This means that Pig processing is suitable for post-processing of the data as opposed to real-time processing that occurs as the data is collected. You can run Pig either interactively or in batch mode. Typically interactive mode is used during development. When you run Pig interactively, you can easily see the results of the scripts dumped out to the screen. This is a great way to build up and debug a multi-step ETL process. Once the script is built, you can save it to a text fi le and run it in batch mode using scheduling or part of a workfl ow. This generally occurs during production where scripts are run unattended during off-peak hours. The results can be dumped into a fi le that you can use for further analysis or as an input fi le for tools, such as PowerPivot, Power View, and Power Map. (You will see how these tools are used in Chapter 11, “Visualizing Big Data with Microsoft BI.”) Taking Advantage of Built-in Functions As you saw in Chapter 8, “Effective Big Data ETL with SSIS, Pig, and SQOOP,” Pig scripts are written in a script language called Pig Latin. Although it is a lot easier to write the ETL processing using Pig Latin than it is to write the low level map-reduce jobs, at some point the Pig Latin has to be converted into a map-reduce job that does the actual processing. This is where functions come into the picture. In Pig, functions process the data and are written in Java. Pig comes with a set of built-in functions to implement common processing tasks such as the following: ■ Loading and storing data ■ Evaluating and aggregating data ■ Executing common math functions ■ Implementing string functions For example, the default load function PigStorage is used to load data into structured text fi les in UTF-8 format. The following code loads a fi le containing fl ight delay data into a relation (table) named FlightData: FlightData = LOAD 'FlightPerformance.csv' using PigStorage(','); 180 Par t IV ■ Working with Your Big Data Another built-in load function is JsonLoader. This is used to load JSON (JavaScript Object Notation)-formatted fi les. JSON is a text-based open standard designed for human-readable data interchange and is often used to trans- mit structured data over network connections. The following code loads a JSON-formatted fi le: FlightData = LOAD ' FlightPerformance.json' using JsonLoader(); NOTE For more information on JSON, see http://www.json.org/. You can also store data using the storage functions. For example, the following code stores data into a tab-delimited text fi le (the default format): STORE FlightData into 'FlightDataProcessed' using PigStorage(); Functions used to evaluate and aggregate data include IsEmpty, Size, Count, Sum, Avg, and Concat, to name a few. The following code fi lters out tuples that have an empty airport code: FlightDataFiltered = Filter FlightData By IsEmpty(AirportCode); Common math functions include ABS, CEIL, SIN, and TAN. The following code uses the CEIL function to round the delay times up to the nearest minute (integer): FlightDataCeil = FOREACH FlightData GENERATE CEIL(FlightDelay) AS FlightDelay2; Some common string functions are Lower, Trim, Substring, and Replace. The following code trims leading and trailing spaces from the airport codes: FlightDataTrimmed = FOREACH FlightData GENERATE TRIM(AirportCode) AS AirportCode2; Executing User-defi ned Functions In the preceding section, you looked at some of the useful built-in functions available in Pig. Because these are built-in functions, you do not have to register the functions or use fully qualifi ed naming to invoke them because Pig knows where the functions reside. It is recommended that you use the built-in func- tions if they meet your processing needs. However, these built-in functions are limited and will not always meet your requirements. In these cases, you can use user-defi ned functions (UDFs). Creating your own functions is not trivial, so you should investigate whether a publicly available UDF could meet your needs before going to the trouble of creating your own. Two useful open source libraries containing prebuilt UDFs are PiggyBank and DataFu, discussed next. Chapter 9 ■ Data Research and Advanced Data Cleansing with Pig and Hive 181 PiggyBank PiggyBank is a repository for UDFs provided by the open source community. Unlike with the built-in UDFs, you need to register the jar to use them. The jar fi le contains the compiled code for the function. Once registered, you can use them in your Pig scripts by providing the function’s fully qualifi ed name or use the define statement to provide an alias for the UDF. The following code uses the reverse function contained in the piggybank.jar fi le to reverse a string. The HCatLoader loads data from a table defi ned using HCatalog (covered in Chapter 7, “Expanding Your Capability with HBase and HCatalog”): REGISTER piggybank.jar; define reverse org.apache.pig.piggybank.evaluation.string.Reverse(); FlightData = LOAD 'FlightData' USING org.apache.hcatalog.pig.HCatLoader(); FlightDataReversed = Foreach FlightData Generate (origin, reverse(origin)); PiggyBank functions are organized into packages according to function type. For example, the org.apache.pig.piggybank.evaluation package contains functions for custom evaluation operations like aggregates and column trans- formations. The functions are further organized into subgroups by function. The org.apache.pig.piggybank.evaluation.string functions contain custom functions for string evaluations such as the reverse seen earlier. In addition to the evaluation functions, there are functions for comparison, fi ltering, group- ing, and loading/storing. DataFu DataFu was developed by LinkedIn to aid them in analyzing their big data sets. This is a well-tested set of UDFs containing functions for data mining and advanced statistics. You can download the jar fi le from www.wiley.com/go/ microsoftbigdatasolutions. To use the UDFs, you complete the same process as you do with the PiggyBank library. Register the jar fi le so that Pig can locate it and defi ne an alias to use in your script. The following code fi nds the median of a set of measures: REGISTER 'C:\Hadoop\pig-0.9.3-SNAPSHOT\datafu-0.0.10.jar'; DEFINE Median datafu.pig.stats.Median(); TempData = LOAD '/user/test/temperature.txt' using PigStorage() AS (dtstamp:chararray, sensorid:int, temp:double); TempDataGrouped = Group TempData ALL; MedTemp = ForEach TempDataGrouped { TempSorted = ORDER TempData BY temp; GENERATE Median(TempData.temp);}; 182 Par t IV ■ Working with Your Big Data Using UDFs You can set up Hortonworks Data Platform (HDP) for Windows on a develop- ment server to provide a local test environment that supports a single-node deployment. (For a detailed discussion of installing the Hadoop development environment on Windows, see Chapter 3, “Installing HDInsight.”) NOTE To complete the following activity, you need to download and install the HDP for Windows from Hortonworks or the HDInsight Emulator from Microsoft. After you have installed the environment, you should see three icons on the desktop for interacting with the Hadoop service. The NameNode maintains the directory fi les in the Hadoop Distributed File System (HDFS). It also keeps track of where the data fi les are kept in a Hadoop cluster. Figure 9.1 shows the information displayed when you click the NameNode icon. Figure 9.1: Displaying the NameNode information Additional links appear on the NameNode information page for browsing the fi le system and log fi les and for additional node information. Figure 9.2 shows the fi les in the /user/Dan directory. Chapter 9 ■ Data Research and Advanced Data Cleansing with Pig and Hive 183 Figure 9.2: Exploring the directory file listing You can also drill in to see the contents of a data fi le, as shown in Figure 9.3. Figure 9.3: Drilling into a data file The Hadoop Map-Reduce Status icon launches the Map/Reduce Administration page. This page provides useful information about the map-reduce jobs cur- rently running, scheduled to run, or having run in the past. Figure 9.4 shows summary information for a job that was run on the cluster. 184 Par t IV ■ Working with Your Big Data Figure 9.4: Viewing job summary stats The third link is a shortcut to the Hadoop command-line console window displaying the Hadoop command prompt. Using this console, you can build and issue map-reduce jobs and issue Hadoop File System (FS) commands. You can also use this console to administer the Hadoop cluster. Figure 9.5 shows the Hadoop console being used to list the fi les in a directory. Figure 9.5: Using the Hadoop command-line console After installing and setting up the environment, you are now ready to imple- ment an ETL process using Pig. In addition you will use UDFs exposed by PiggyBank and DataFu for advanced processing. The four basic steps contained in this activity are: 1. Loading the data. 2. Running Pig interactively with Grunt. 3. Using PiggyBank to extract time periods. 4. Using DataFu to implement some advanced statistical analysis. Loading Data The fi rst thing you need to do is load a data fi le. You can download a sample highway traffi c data fi le at www.wiley.com/go/microsoftbigdatasolutions. Chapter 9 ■ Data Research and Advanced Data Cleansing with Pig and Hive 185 Once the data fi le is downloaded, you can load it into HDFS using the Hadoop command-line console. Open the console and create a directory for the fi le using the following code: hadoop fs –mkdir /user/test Next, use the following command to load the traffic.txt fi le into the directory: Hadoop fs –copyFromLocal C:\SampleData\traffic.txt /user/test Once the fi le is copied into the Hadoop directory, you should be able to browse to the directory and view the data using the Hadoop NameNode link. Figure 9.6 shows the fi le open in the NameNode browser. Figure 9.6: Viewing the traffic.txt data Running Pig Interactively with Grunt From the bin folder of the Pig install folder (hdp\hadoop\pig\bin), open the Pig command-line console to launch the Grunt shell. The Grunt shell enables you to run Pig Latin interactively and view the results of each step. Enter the following script to load and create a schema for the traffi c data: 186 Par t IV ■ Working with Your Big Data SpeedData = LOAD '/user/test/traffic.txt' using PigStorage() AS (dtstamp:chararray, sensorid:int, speed:double); Dump the results to the screen: DUMP SpeedData; By doing so, you can run a map-reduce job that outputs the data to the console window. You should see data similar to Figure 9.7, which shows the tuples that make the set of data. Figure 9.7: Dumping results to the console window Using PiggyBank to Extract Time Periods The next step in analyzing the data is to group it into different date/time buckets. To accomplish this, you use functions defi ned in the piggybank.jar fi le. If that fi le is not already installed, you can either download and compile the source code or download a compiled jar fi le from www.wiley.com/go/microsoftbig- datasolutions. Along with the piggybank.jar fi le, you need to get a copy of the joda-time-2.2.jar fi le from www.wiley.com/go/microsoftbigdatasolu- tions, which is referenced by the piggybank.jar fi le. Place the jar fi les in a directory accessible to the Pig command-line console; for example, you can place it in the same directory as the pig.jar fi le. Now you can register and alias the PiggyBank functions in your Pig Latin scripts. The fi rst function you use here is the CustomFormatToISO. This function converts the date/time strings in the fi le to a standard ISO format: REGISTER 'C:\hdp\hadoop\pig-\piggybank.jar'; REGISTER 'C:\hdp\hadoop\pig-\joda-time-2.2.jar'; Chapter 9 ■ Data Research and Advanced Data Cleansing with Pig and Hive 187 DEFINE Convert org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO; Use the following code to load and convert the date/time values: SpeedData = LOAD '/user/test/traffic.txt' using PigStorage() AS (dtstamp:chararray, sensorid:int, speed:double); SpeedDataFormat = FOREACH SpeedData Generate dtstamp, Convert(dtstamp,'MM/dd/YYYY hh:mm:ss a') as dtISO; Dump SpeedDataFormat; After the job completes, you should see data similar to the data shown in Figure 9.8. Figure 9.8: Reformatted date/times Now that you have the dates in ISO format, you can easily strip out the day and hour from the date. Use the following code to create the day and hour fi elds. The output should match Figure 9.9: REGISTER 'C:\hdp\hadoop\pig-\piggybank.jar'; REGISTER 'C:\hdp\hadoop\pig-\joda-time-2.2.jar'; DEFINE Convert org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO; DEFINE SubString org.apache.pig.piggybank.evaluation.string.SUBSTRING; SpeedData = LOAD '/user/test/trafic.txt' using PigStorage() AS (dtstamp:chararray, sensorid:int, speed:double); SpeedDataFormat = FOREACH SpeedData Generate dtstamp, Convert(dtstamp,'MM/dd/YYYY hh:mm:ss a') as dtISO, speed; SpeedDataHour = FOREACH SpeedDataFormat Generate dtstamp, SubString(dtISO,5,7) as day, SubString(dtISO,11,13) as hr, speed; Dump SpeedDataHour; 188 Par t IV ■ Working with Your Big Data Figure 9.9: Splitting day and hour from an ISO date field Now you can group the data by hour and get the maximum, minimum, and average speed recorded during each hour (see Figure 9.10): SpeedDataGrouped = Group SpeedDataHour BY hr; SpeedDataAgr = FOREACH SpeedDataGrouped GENERATE group, MAX(SpeedDataHour.speed), MIN(SpeedDataHour.speed), AVG(SpeedDataHour.speed); Dump SpeedDataAgr; Figure 9.10: Speed data aggregated by hour Using DataFu for Advanced Statistics Even though Pig contains some rudimentary statistical UDFs you can use to analyze the data, you often need to implement advanced statistical techniques to accurately process the data. For example, you might want to eliminate outli- ers in your data. To determine the outliers, you can use the DataFu Quantile function and compute the 10th and 90th percentile values. To use the DataFu UDFs, download the datafu.jar fi le from www.wiley .com/go/microsoftbigdatasolutions and place it in the same directory as the Chapter 9 ■ Data Research and Advanced Data Cleansing with Pig and Hive 189 piggybank.jar fi le. You can now reference the jar fi le in your script. Defi ne an alias for the Quantile function and provide the quantile values you want to calculate: REGISTER 'C:\hdp\hadoop\pig-\datafu-0.0.10.jar'; DEFINE Quantile datafu.pig.stats.Quantile('.10','.90'); Load and group the data: SpeedData = LOAD '/user/test/traffic.txt' using PigStorage() AS (dtstamp:chararray, sensorid:int, speed:double); SpeedDataGrouped = Group SpeedData ALL; Pass sorted data to the Quantile function and dump the results out to the command-line console (see Figure 9.11). Using this data, you can then write a script to fi lter out the outliers: QuantSpeeds = ForEach SpeedDataGrouped { SpeedSorted = ORDER SpeedData BY speed; GENERATE Quantile(SpeedData.speed);}; Dump QuantSpeeds; Figure 9.11: Finding the 10th and 90th percentile Now that you know how to use UDFs to extend the functionality of Pig, it is time to take it a step further and create your own UDF. Building Your Own UDFs for Pig Unless you are an experienced Java programmer, writing your own UDF is not trivial, as mentioned earlier. However, if you have experience in another object- oriented programming language such as C#, you should be able to transition to 190 Par t IV ■ Working with Your Big Data writing UDFs in Java without too much diffi culty. One thing you may want to do to make things easier is to download and install a Java interface development environment (IDE) such as Eclipse (http://www.eclipse.org/). If you are used to working in Visual Studio, you should be comfortable developing in Eclipse. You can create several types of UDFs, depending on the functionality. The most common type is the eval function. An eval function accepts a tuple as an input, completes some processing on it, and sends it back out. They are typi- cally used in conjunction with a FOREACH statement in HiveQL. For example, the following script calls a custom UDF to convert string values to lowercase: Register C:\hdp\hadoop\pig-\SampleUDF.jar; Define lcase com.BigData.hadoop.pig.SampleUDF.Lower; FlightData = LOAD '/user/test/FlightPerformance.csv' using PigStorage(',') as (flight_date:chararray,airline_cd:int,airport_cd:chararray, delay:int,dep_time:int); Lower = FOREACH FlightData GENERATE lcase(airport_cd); To create the UDF, you fi rst add a reference to the pig.jar fi le. After doing so, you need to create a class that extends the EvalFunc class. The EvalFunc is the base class for all eval functions. The import statements at the top of the fi le indicate the various classes you are going to use from the referenced jar fi les: import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple; public class Lower extends EvalFunc { } The next step is to add an exec function that implements the processing. It has an input parameter of a tuple and an output of a string: public String exec(Tuple arg0) throws IOException { if (arg0 == null || arg0.size() == 0) return null; try { String str = (String)arg0.get(0); return str.toLowerCase(); } catch(Exception e) { throw new IOException("Caught exception processing input row ", e); } } Chapter 9 ■ Data Research and Advanced Data Cleansing with Pig and Hive 191 The fi rst part of the code checks the input tuple to make sure that it is valid and then uses a try-catch block. The try block converts the string to lowercase and returns it back to the caller. If an error occurs in the try block, the catch block returns an error message to the caller. Next, you need to build the class and export it to a jar fi le. Place the jar fi le in the Pig directory, and you are ready to use it in your scripts. Another common type of function is the fi lter function. Filter functions are eval functions that return a Boolean result. For example, the IsPositive func- tion is used here to fi lter out negative and zero-delay values (integers): Register C:\hdp\hadoop\pig-\SampleUDF.jar; Define isPos com.BigData.hadoop.pig.SampleUDF.isPositive; FlightData = LOAD '/user/test/FlightPerformance.csv' using PigStorage(',') as (flight_date:chararray,airline_cd:int,airport_cd:chararray, delay:int,dep_time:int); PosDelay = Filter FlightData BY isPos(delay); The code for the isPositive UDF is shown here: package com.BigData.hadoop.pig.SampleUDF; import java.io.IOException; import org.apache.pig.FilterFunc; import org.apache.pig.data.Tuple; public class isPositive extends FilterFunc { @Override public Boolean exec(Tuple arg0) throws IOException { if (arg0 == null || arg0.size() != 1) return null; try { if (arg0.get(0) instanceof Integer) { if ((Integer)arg0.get(0)>0) return true; else return false; } else return false; } catch(Exception e) { throw new IOException ("Caught exception processing input row ", e); } } } 192 Par t IV ■ Working with Your Big Data It extends the FilterFunc class and includes an exec function that checks to confi rm whether the tuple passed in is not null and makes sure that it has only one member. It then confi rms whether it is an integer and returns true if it is greater than zero; otherwise, it returns false. Some other UDF types are the aggregation, load, and store functions. The functions shown here are the bare-bones implementations. You also need to consider error handling, progress reporting, and output schema typing. For more information on custom UDF creation, consult the UDF manual on the Apache Pig wiki (http://wiki.apache.org/pig/UDFManual). Using Hive Another tool available to create and run map-reduce jobs in Hadoop is Hive. One of the major advantages of Hive is that it creates a relational database layer over the data fi les. Using this paradigm, you can work with the data using traditional querying techniques, which is very benefi cial if you have a SQL background. In addition, you do not have to worry about how the query is translated into the map-reduce job. There is a query engine that works out the details of what is the most effi cient way of loading and aggregating the data. In the following sections you will gain an understanding of how to perform advanced data analysis with Hive. First you will look at the different types of built-in Hive functions available. Next, you will see how to extend Hive with custom map-reduce scripts written in Python. Then you will go one step further and create a UDF to extend the functionality of Hive. Data Analysis with Hive One strong point of HiveQL is that it contains a lot of built-in functions that assist you in your data analysis. There are a number of mathematical, collection, type conversion, date, and string functions. Most of the functions that are in the SQL language have been included in HiveQL. For example, the following HiveQL counts the fl ights and fi nds the maximum delay at each airport from the flightdata table. Figure 9.12 shows the output in the Hive console: Select airport_cd, count(*), max(delay) from flightdata group by airport_cd; Types of Hive Functions Hive has several fl avors of functions you can work with, including the following: ■ UDFs ■ UDAFs (user-defi ned aggregate functions) Chapter 9 ■ Data Research and Advanced Data Cleansing with Pig and Hive 193 ■ UDTFs (user-defi ned table-generating functions) Figure 9.12: Flight counts and maximum delays UDFs work on single rows at a time and consist of functions such as type conversion, math functions, string manipulation, and date/time functions. For example, the following Hive query uses the date function to get the day from the flightdate fi eld: Select day(flightdate), airport_cd, delay from flightdata where delay > 100; Whereas UDFs work on single rows at a time and processing occurs on the map side of the processing, UDAFs work on buckets of data and are implemented on the reduce side of the processing. For example, you can use the built-in UDAF count and max to get the delay counts and maximum delays by day: Select day(flightdate), count(*), max(delay) from flightdata group by day(flightdate); Another type of function used in Hive is a UDFT. This type of function takes a single-row input and produces multiple-row outputs. These functions are useful for taking a column containing an array that needs to be split out into multiple rows. Hive’s built-in UDFTs include the following: ■ The explode function, which takes an array as input and splits it out into multiple rows. ■ The json_tuple function, which is useful for querying JSON-formatted nodes. It takes the JSON node and splits out the child nodes into separate rows into a virtual table for better processing performance. ■ The parse_url_tuple function, which takes a URL and extracts parts of a URL string into multiple rows in a table structure. 194 Par t IV ■ Working with Your Big Data The fi le shown in Figure 9.13 contains student data in JSON format. Figure 9.13: JSON formatted data To parse the values from the JSON nodes, you use the json_tuple function in combination with the lateral view operator to create a table: Select b.studentid, b.studentdata from studentdata a lateral view json_tuple(a.jstring,'StudentId','StudentData') b as studentid, studentdata; Figure 9.14 shows the query output. Figure 9.14: Parsing JSON data Notice that this query did not parse out the nested JSON data from each row. To query the nested data, you need to add an additional lateral view: Select b.studentid, c.name, c.major from studentdata a lateral view json_tuple(a.jstring,'StudentId','StudentData') b as studentid, studentdata lateral view json_tuple(b.studentdata,'Name','Major') c as name, major; Figure 9.15 shows the output with the nested JSON data parsed out into separate columns. Figure 9.15: Parsing out nested data Chapter 9 ■ Data Research and Advanced Data Cleansing with Pig and Hive 195 Now that you have expanded out the nested data, you can analyze the data using reduce functions such as counting the number of students in each major. Extending Hive with Map-reduce Scripts There are times when you need to create a custom data-processing transforma- tion that is not easy to achieve using HiveQL but fairly easy to do with a script- ing language. This is particularly useful when manipulating if the result of the transform produces a different number of columns or rows than the input. For example, you want to split up an input column into several output columns using string-parsing functions. Another example is a column containing a set of key/value pairs that need to be split out into their own rows. The input values sent to the script will consist of tab-delimited strings, and the output values should also come back as tab-delimited strings. Any null values sent to the script will be converted to the literal string \N to differentiate it from an empty string. Although technically you can create your script in any scripting language, Pearl and Python seem to be the most popular. The code shown in Figure 9.16 is an example Python script that takes in a column formatted as hh:mm:ss and splits it into separate columns for hour, minute, and second. Figure 9.16: Python script for splitting time To call this script from HiveQL, you use the TRANSFORM clause. You need to provide the TRANSFORM clause, the input data, output columns, and map-reduce script fi le. The following code uses the previous script. It takes an input of a time column and a log level and parses the time. Figure 9.17 shows the output: add file c:\sampledata\split_time.py; SELECT TRANSFORM(l.t4, l.t2) USING 'python split_time.py' AS (hr,loglevel,min,sec,fulltime) from logs l; 196 Par t IV ■ Working with Your Big Data Figure 9.17: Splitting the time into hours, minutes, and seconds The preceding script was a mapping script. You can also create reduce scripts for your custom processing. A reduce script takes tab-delimited columns from the input and produces tab-delimited output columns just like a map script. The difference is the reduce script combines rows in the process, and the rows out should be less than the rows put in. To run a reduce script, you need to have a mapping script. The mapping script provides the key/value pairs for the reducer. The Python script in Figure 9.18 is a mapping script that takes the input and checks whether it starts with a [ character. If it does, it outputs it to a line and gives it a count of one. Figure 9.18: Mapping logging levels Figure 9.19 shows a sample output from the script. The output from the map script is fed into the reduce script, which counts the occurrence of each log level and returns the total count for each log level on a new line. Figure 9.20 shows the code for the reduce script. Chapter 9 ■ Data Research and Advanced Data Cleansing with Pig and Hive 197 Figure 9.19: Mapping output Figure 9.20: Reduce script to aggregate log level counts You combine the map and reduce script into your HiveQL where the output from the mapper is the input for the reducer. The cluster by statement is used to partition and sort the output of the mapping by the loglevel key. The fol- lowing code processes the log fi les through the custom map and reduce scripts: add file c:\sampledata\map_loglevel.py; add file c:\sampledata\level_cnt.py; from (from log_table SELECT TRANSFORM(log_entry) USING 'python map_loglevel.py' 198 Par t IV ■ Working with Your Big Data AS (loglevel,cnt) cluster by loglevel) map_out Select Transform(map_out.loglevel,map_out.cnt) using 'python level_cnt.py' as level, cnt; Another way to call the scripts is by using the map and reduce statements, which are an alias for the TRANSFORM statement. The following code calls the scripts using the map and reduce statements and loads the results into a script_ test table: add file c:\sampledata\map_loglevel.py; add file c:\sampledata\level_cnt.py; from (from log_table Map log_entry USING 'python map_loglevel.py' AS loglevel, cnt cluster by loglevel) map_out Insert overwrite table script_test Reduce map_out.loglevel, map_out.cnt using 'python level_cnt.py' as level, cnt; Figure 9.21 shows the aggregated counts of the different log levels. Figure 9.21: Output from the map-reduce scripts Creating a Custom Map-reduce Script In this exercise, we create a custom mapping script that takes a set of four measure- ments and returns the maximum value of the four. Figure 9.22 shows the input fi le, which you can download from www.wiley.com/go/microsoftbigdatasolutions. Open your favorite text editor and enter the following code. Make sure to pay attention to the indenting: #!/usr/bin/env python import sys for line in sys.stdin.readlines(): line = line.strip() fields = line.split('\t') time = fields[0] sensor= fields[1] maxvalue = max(fields[2:5]) print time,"\t",sensor,"\t",maxvalue Chapter 9 ■ Data Research and Advanced Data Cleansing with Pig and Hive 199 Figure 9.22: Input traffic data Save the fi le as get_maxValue.py in a reachable folder (for example, C:\ SampleData). In the Hive command-line console, create a speeds table and load the data from traffic.txt into it: CREATE TABLE speeds(recdate string, sensor string, v1 double, v2 double, v3 double, v4 double) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'; LOAD DATA LOCAL INPATH 'c:\sampledata\traffic.txt' OVERWRITE INTO TABLE speeds; Add a reference to the fi le and the TRANSFORM statement to call the script: add file C:\SampleData\get_maxValue.py; SELECT TRANSFORM(s.recdate,s.sensor,s.v1,s.v2,s.v3,s.v4) USING 'python get_maxValue.py' AS (recdate,sensor,maxvalue) FROM speeds s; The data output should look similar to Figure 9.23. Creating Your Own UDFs for Hive As mentioned previously, Hive contains a number of function types depending on the processing involved. The simplest type is the UDF, which takes a row in, processes it, and returns the row back. The UDAF is a little more involved 200 Part IV ■ Working with Your Big Data because it performs an aggregation on input values and reduces the number of rows coming out. The other type of function you can create is the UDTF, which takes a row in and parses it out into a table. If you followed along in the earlier section on building custom UDFs for Pig, you will fi nd that building UDFs for Hive is a similar experience. First, you cre- ate a project in your favorite Java development environment. Then, you add a reference to the hive-exec.jar and the hive-serde.jar fi les. These are located in the hive folder in the lib subfolder. After you add these references, you add an import statement to the org.apache.hadoop.hive.ql.exec.UDF class and extend it with a custom class: package Com.BigData.hadoop.hive.SampleUDF; import org.apache.hadoop.hive.ql.exec.UDF; public class OrderNums extends UDF{ } Figure 9.23: Output of the get_maxValue.py script The next step is to add an evaluate function that will do the processing and return the results. The following code processes the two integers passed in and returns the larger value. If they are equal it returns null. Because you are using the IntWritable class, you need to add an import for pointing to the org,apache .hadoop.io.IntWritable class in the Hadoop-core.jar fi le: package Com.BigData.hadoop.hive.SampleUDF; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.hive.ql.exec.UDF; public class GetMaxInt extends UDF{ public IntWritable evaluate(IntWritable x, IntWritable y) Chapter 9 ■ Data Research and Advanced Data Cleansing with Pig and Hive 201 { if (x.get()>y.get()) return x; else if (x.get()



需要 8 金币 [ 分享文档获得金币 ] 3 人已下载