How much coding is there in big data

Introduction to Hadoop - The Key Components of Hadoop (Part 3 of 3)

Regardless of whether you are using Apache Hadoop, a distribution or a big data suite, Hadoop is based on a large number of different components. That's why Hadoop likes to talk about a framework or the Hadoop ecosystem, i.e. a series of freely selectable libraries which, through their combination and interlocking, allow the wheels of the big data machinery to run.

The development of Hadoop is driven by an open source community, which consists of a healthy mix of private individuals and employees of distributors, manufacturers and companies related to Hadoop and Big Data. The development of Hadoop started in 2002 with the work on the search engine Nutch by Doug Cutting, got its theoretical basis from the papers published by Google in 2003 and 2004 on Google's file system and MapReduce and then really picked up speed as Doug Cutting in 2006 switched to Yahoo, who saw the enormous potential behind it and are now operators and users of the world's largest Hadoop clusters. Other internet giants such as Facebook, Twitter and eBay also took part in the development, and the two distributors Cloudera and Hortonworks were started as spin-offs from among Yahoo! All of this has ensured that Hadoop is rock solid today, the ecosystem is very extensive and continues to grow, and that work is currently underway on the next major release Hadoop 2.0 (or YARN), which is expected at the end of the year. In short: Hadoop has long been ready for prime time and powers some of the world's largest companies and business models.

The most important components of Hadoop and their placement in the ecosystem are shown in the following figure.

An overview of Core Hadoop and the Hadoop ecosystem

 

Core Hadoop

The two central components are referred to as the core of Hadoop Hadoop Distributed File System (HDFS) and MapReduce. They are responsible for the two core tasks - storing and processing large amounts of data. Even if these two components are closely interlinked, they are independent components and so it would also be possible to use a different file system besides HDFS with MapReduce. In reality, however, the combination of these two central elements is currently almost always used (e.g. the MapR distribution uses a different, separate file system than HDFS).

 

Hadoop Distributed File System (HDFS)

HDFS is a Java-based distributed file system that allows reliable and persistent storage as well as fast access to large volumes of data. A "write once, read many" paradigm is used for this, i.e. it is designed in such a way that data is ideally only written once to HDFS and is then read out multiple times from there. Modifying data after writing is hardly possible, only data can be appended to existing files, which in practice is hardly used due to the inefficiency of the operation.

In order to be able to store the data in a distributed manner, it is split up into blocks (mostly 64/128 MB) and distributed over the nodes of the Hadoop cluster. By dividing it up into blocks, it is also logical that HDFS is designed to be used with large files, and the use of many small files is an antipattern. For each of these blocks, 3 copies are usually created on different servers in the cluster in order to guarantee error and failure safety. HDFS takes over the management of the data completely automatically, for the user the access is designed like a virtual file system. The commands are also very similar to the * nix counterparts, e.g. hadoop fs -ls to view the properties of a file or hadoop fs -put to save a file from the logical file system (such as ext3 / 4) in HDFS. There is also a rights management system that every Linux user will be familiar with, for example hadoop fs -chmod 777 to allow full access to a file.

On the architecture side, HDFS is designed like a "classic" master-slave system with the following components:

An overview of the HDFS architecture

  • The NameNode is the central master component and manages the metadata for all data stored on all DataNodes. For this purpose, he maintains a plan with the current storage locations of the blocks of directories and files as well as a journal file with the current file operations. On the basis of these two files, it can answer requests for files at any time, but does not save any data itself.
  • The DataNodes are the slave components of the architecture, which are only responsible for data storage. They are therefore the "simply" knitted elements and the scaling of the cluster is achieved through them.
  • The Secondary NameNode is not used as the name suggests to backup the NameNode, but it is only a helper element, which periodically runs the Block map and the Journal log of NameNodes merges and then dubbed it again. This will make the NameNode relieved and the cluster starts faster after a failure.

If a file is now written to HDFS, the asks Client at the NameNode first of all whether he is authorized to write the file at all. If he receives the OK, he sends details of the file to the NameNode and receives a list from him DataNodes on which he can save the file. The rest of the communication then takes place directly between the Client and the DataNodes instead and it writes the file in blocks to the transmitted DataNodes. In the background, HDFS then ensures the automatic replication of the blocks for failure safety. The standard replication level is 3 and the algorithm provides for the second and third copy of the block to be on two different ones DataNodes in a different rack.

When reading a file from HDFS, communication between the Client and the NameNode instead of whether the Client is authorized to read the file. If that is the case, he receives from NameNode the list with the blocks and DataNodes and then reads the blocks directly from the DataNodes. In this way, read and write access can be parallelized, which increases the readout speed many times over, especially with large files, compared to sequential readout

 

MapReduce

MapReduce is also Java-based and serves as a framework for the distributed and parallel processing of large amounts of structured and unstructured data, which Hadoop typically stores in HDFS. It is based on a divide-and-conquer approach and thus allows distributed calculations over large computer clusters in a very reliable and fault-tolerant manner.

The way of working and the various phases can best be illustrated using the entry-level example of WordCount clarify - that Hello World the MapReduce world. The task here is to create a list with the frequencies of the words that occur from a collection of input texts. The distributed algorithm to solve this problem works as follows:

Hadoop’s MapReduce using WordCount as an example

The distributed method MapReduce is generally based on key-value pairs. The input quantity is used for the example WordCount the documents with a key (such as the file name) and the actual text as a value. These key-value pairs are then distributed to individual processes on cluster nodes and the following phases are distributed and take place in parallel:

  • In the Map phase the text is broken down into the individual words and each occurrence of a word is counted individually. For this purpose, a key-value pair is emitted for each word with the word as the key and the value 1 as the key.
  • The Combine phase is optional and serves as a kind of local mini-reducer. Here key-value pairs with the same key are already locally combined to form a key-value pair, as happens in the example on Mapper # 2.
  • So far we have only counted locally on the cluster nodes and a global view still needs to be established. So find one first Partitioning of the previous results and so in this example all key-value pairs end up with the key a on reducer # 1, all with b on Reducer # 2 and everyone with c on reducer # 3.
  • That the key-value pairs end up on the right reducer will be in the Shuffle & Sort phase ensured. Here the partial results are distributed over the network to the corresponding reducer, which is usually a different cluster node than the mapper. In addition, it is ensured that the partial results for each reducer are locally pre-sorted before the reduce phase starts.
  • In the Reduce phase all values ​​with the same key are now with the same reducer and this reducer only has to add up the frequency. So come as a and b twice and c three times, which is also recorded again as a key-value pair. At the end, each reducer writes these results to a file in the HDFS, so that at the end we would have 3 files that could then be combined, e.g. via cat, to the desired final result.

On the architecture side, we have a déjà-vu, because the components of MapReduce also form a master-slave system with elements comparable to HDFS, which is why we are looking at the components directly in the combined architecture:

The combined architecture of MapReduce and HDFS

  • The JobTracker As a master component, it manages all jobs and resources in the Hadoop cluster and is therefore what the NameNode for HDFS is.
  • The TaskTracker are the slave components that run on every server in the cluster and are used for the actual execution of MapReduce jobs including the periodic delivery of status messages to the JobTracker are responsible.
  • Tasks are from TaskTracker locally managed per cluster node and essentially a Java Virtual Machine (JVM) in which a mapper or reducer is executed as a process.

But how do you start a MapReduce job? To do this, you first program the logic of your MapReduce job - typically in Java. However, other languages ​​such as Python, etc. can also be used via Hadoop's streaming support. This source code is packaged with the job-specific configuration to form a JAR file and transferred to the NameNode. The Client asks for and also receives the storage location of the data from it - we remember the principle of data locality. With this information, the Client the job to the JobTrackerwhich the task to the affected TaskTracker further delegated. The TaskTracker then get all job-relevant information from NameNode and start the Taskswhich then execute the program code and do the "real" work. During execution, they periodically send status information to the JobTracker. Should be Task fail the coordinated JobTracker the correct execution of the job with different measures depending on the phase. In the end, the job is guaranteed to run and the results are stored in the HDFS and can then be viewed or used for further processing.

 

Other important Hadoop components

In addition to the two core components for storing and processing data, many other components have been added in the course of the development of Hadoop, which make up the flexibility and power of the Hadoop ecosystem. In the following we want to give a brief overview of the most important of these projects.

 

Apache Pig

Writing MapReduce jobs in Java is time-consuming and requires programming knowledge. Wouldn't it be nice if there was an abstraction layer? This is exactly what the people at Yahoo thought, and the result is called Apache Pig. Pig consists of an abstract script language (Pig Latin), which allows the data flow of a MapReduce job to be described on a more abstract level. Based on this, the Pig compiler generates already optimized MapReduce jobs, which are then executed again in the usual way using MapReduce - but without the user having to implement all the details himself. With this you give up a completely fine-grained control over the jobs, but in practice you need this very seldom and accordingly Pig is used very often.

 

Apache Hive

Apache Hive is also an abstraction layer that is based on the MapReduce framework and is often described as the data warehouse system for Hadoop. Hive brings a SQL-like language (HiveQL) with it, which enables tasks such as aggregations, analyzes and other queries to be carried out on amounts of data stored in HDFS (or other Hadoop-compatible file systems). In the end, however, HiveQL is also transformed back into MapReduce jobs and executed that way. The great advantage here, too, is the abstraction and the fact that SQL knowledge is widespread and therefore no pure technicians, e.g. in specialist departments of large companies, can work with it well - an important factor for its widespread use in the Hadoop environment.

 

Apache HCatalog

HCatalog connects worlds - in practice mainly Pig and Hive. It provides a central table and metadata management service for describing the structure of data stored in Hadoop. Once written, the data can be used more conveniently in both Pig and Hive, and entire chains of MapReduce jobs can be set up much more easily. For example, Pig can be used for ETL processes, i.e. importing and cleaning the data and then processing it with Hive - a process that is often encountered in practice.

 

Apache Oozie

Building up process chains and executing them in an automated and time-controlled manner is precisely the task of the Oozie workflow component. It fills the gap that it is not possible to create dependencies between jobs using the mechanisms in MapReduce. With Oozie, for example, a simple syntax in XML format can be used to specify that two dependent MapReduce jobs are started automatically after a predecessor has been successfully completed, then a synchronization point is used to wait for both jobs to be carried out successfully in order to then receive the last export Start job of the collected results. You recognize the potential behind it ...

 

Apache HBase

As we have learned, Hadoop is optimized for the single storage and multiple reading of data and works in a batch-oriented manner. While these are the best prerequisites for many use cases in the area of ​​big data, there are also enough use cases that require the manipulation of data and are very write-intensive. And this is where the NoSQL database system HBase comes into play. It is based on the BigTable approach, i.e. it saves the data in a column-oriented manner. HBase relies on HDFS and thus offers fault-tolerant storage of data combined with fast access to the large amounts of data that are distributed. HBase also extends the Hadoop system through transaction management with user-defined updates as well as insert and delete operations.

 

Apache ZooKeeper

The entire distributed processes of a Hadoop cluster also have to be coordinated and that is exactly what ZooKeeper does. Distributed applications can use ZooKeeper to store, distribute, and update critical configuration information. ZooKeeper is kept very generic and is not limited to the Hadoop universe. For example, Twitter also uses Storm ZooKeeper to build a shared-nothing architecture.

 

Apache Ambari

One of the challenges with Hadoop is certainly the installation, administration and monitoring of the cluster, as it can consist of thousands of nodes. For a long time this was a domain of the Cloudera Distribution with its commercial Cloudera Manager, there is now with Apache Ambari an open source and mature system that solves exactly these tasks using an intuitive web interface.

 

Apache Sqoop

What good is a Hadoop if you can't easily integrate existing data? And since most companies nowadays store existing data in relational databases, Apache Sqoop was launched, which was designed for the efficient import and export of large amounts of data between rational databases and Hadoop.

 

Apache Flume

In addition to relational data, you would of course also want to process other data in Hadoop. And this is where Apache Flume, which was designed for the distributed and reliable collection, aggregation and movement of log data, is very helpful. It does not necessarily need Hadoop, but has connectors to write data from different sources, e.g. to HDFS. Generic transport formats such as Thrift or Avro are used.

Apache Mahout

Mahout is a scalable, very powerful library for machine learning and data mining developed for Hadoop. The project includes the implementation of various machine learning algorithms, including for classification, clustering and collaborative filtering. On the basis of Mahout, for example, powerful recommendation systems can be developed, as you already know, these subsystems “customers who bought this, also buy this” in online shops with which Amazon, for example, nowadays makes a considerable part of its sales.

 

Hadoop 2.0

As you have seen, the Hadoop community is a very active and lively one and of course the wheels do not stand still there. This concerns on the one hand the various projects of the ecosystem, of which you have just got to know the most important ones, and on the other hand also the core functionalities. In the meantime, the architecture of Hadoop has also gotten a bit old and has its limits and limits - in short, a more radical renovation is required. So Hadoop in version 1 reaches its scalability limits from a cluster size of around 4500 nodes and the NameNode and the ongoing communication becomes a bottleneck. From an architectural point of view it is NameNode HDFS also provides a single point of failure, which is not so difficult in practice, as you can get by here with virtualization solutions, for example. Nevertheless, more elegant solutions are now known.

Because of these and other considerations, a whole bunch of innovations is currently being worked on to create the next generation of the Hadoop framework - collectively known as Hadoop 2.0. While all innovations would go beyond the scope and a lot is currently still in the works (a release is expected around the end of 2013), we want to at least briefly indicate the most important features so that you can estimate the future development:

 

YARN (or MapReduce 2)

So far, the paradigm for processing data in Hadoop has been MapReduce, and this is currently the only way to process data. But here you would like to allow other processing options and the solution is called YARN (Yet A.nother R.esource Negotiator). The point here is to separate the resource management from the actual processing of the data, so that MapReduce will only become one of many “plugins” for Hadoop. In addition, architecturally the JobTracker in a Resource manager and one Application Master The various plugins can then request resources in the cluster via YARN and YARN takes over their automatic management. The first plug-in remains MapReduce, of course, but it also has to be adapted to the new architecture and is therefore often referred to as MapReduce 2, but the data continues to process the well-known way. In addition, there is already broad support for other processing paradigms in this early phase and for example a YARN version of the near realtime library Twitter Storm already exists and Apache Tez also promises to become a near realtime framework for big data. Completely new use cases will also enable the Apache Giraph or Apache Drill graph library for ad hoc queries with low latency. As you can see, there is still a lot going on here, but we are sure that YARN will open up many exciting new possibilities for processing data - and with it completely new business models.

 

HDFS Federation

The measures to improve HDFS are summarized under HDFS Federation. Essentially, this is about eliminating the bottlenecks mentioned by the NameNode dissolve and thereby make HDFS even more scalable and robust. To be Namespaces introduced, which are independent of each other and do not need any coordination among each other. Furthermore, a Block Storage Service introduced, which consists of two components, the Block management on the part of the NameNodes and the Storage service on the part of DataNodes. The previous HDFS architecture only allowed one namespace and the change has now made several NameNodes possible per cluster, which significantly increases scalability and, of course, high availability. At Yahoo, clusters are currently running on the new version with significantly more than 10,000 nodes and there are still no recognizable limitations.

 

Stinger initiative

The Stinger Initiative has set itself the goal of accelerating Apache Hive by a factor of 100 and also establishing full ANSI SQL-92 compatibility. In order to achieve this lofty goal, there is a whole range of measures, starting with the introduction of the ORC data format for storing the data, through integration with YARN, to significantly improved buffering and more intelligence when creating the execution plans.