Post on 02-Jul-2015
Hadoop y Mapreduce (y otras yerbas)
Cómo analizar petabytes de datos y no morir en el intento
domingo 6 de noviembre de 2011
Quién soy yo?
• Ezequiel Golub
• Desarrollador en Smowtion
• Antes en wixi.com
• Hace 3 meses implementamos Hadoop y estamos contentos!
domingo 6 de noviembre de 2011
Primero lo primero
domingo 6 de noviembre de 2011
Qué es Hadoop?
• Framework para aplicaciones distribuidas en miles de nodos
• Ahora familia de productos, pero en su core son 2:
• HDFS
• Map/Reduce
• Proyecto de apache.org
domingo 6 de noviembre de 2011
Historia
• Inspirado por los papers de Google Mapreduce y bigtable del 2004
• Creado por Doug Cutting para ser usado por Nutch, un buscador de codigo libre
• BTW, Doug Cutting invento Lucene
• Hadoop era el elefante de juguete de su hijo
domingo 6 de noviembre de 2011
Limitaciones de RDBMS
• Datasets de petabytes
• no escalan horizontalmente
• Escalar horizontalmente = chau ACID
• Particionar y shardear es dificil.
• Rendimiento de lecturas o escrituras: Elegí una.
• Usar hardware especializado = $$$!!
domingo 6 de noviembre de 2011
Por qué Hadoop?
• Escalabilidad horizontal
• Confiabilidad
• Apoyo de comunidad
• Map/Reduce
• Orientado a procesos batch para datos “grandes”
• AWS
domingo 6 de noviembre de 2011
Qué no es Hadoop?
• Una base de datos relacional
• Un sistema de almacenamiento de datos estructurado
• Online (Con excepciones!)
domingo 6 de noviembre de 2011
Quienes lo usan?domingo 6 de noviembre de 2011
Cuándo usar Hadoop?
• Demasiados datos para que entren en algun tipo de datastore comodamente
• Datos desestructurados
• Y necesitas analizarlos!
• Y necesitas alta disponibilidad!
• Suficientes datos para que analizarlos en una sola maquina sea dificil
domingo 6 de noviembre de 2011
Donde se usa?
• Log File & Web Analytics
• Ad Targeting
• Scientific Applications
• Financial Analysis
• Search
• Natural Language Processing
• Image processing
• Data warehousing
domingo 6 de noviembre de 2011
Componentes
domingo 6 de noviembre de 2011
HDFS
• Sistema de archivos distribuido, redundante y con alta disponibilidad.
• Soporta archivos muy grandes.
• Pensado para commodity hardware
• Acceso streaming de baja latencia y alta transferencia.
domingo 6 de noviembre de 2011
HDFS
• Integridad y compresión soportadas nativamente
• N copias de cada bloque del archivo distribuidas
• 1 namenode para N datanodes
• Location aware
• Interfaz tipo linux (mv, cp, rm, ls, etc)
domingo 6 de noviembre de 2011
Ventajas
• Tolerancia a los fallos
• Autoregenerativo
• Escalable
• Confiabilidad
• Soporte
domingo 6 de noviembre de 2011
HDFS
NamenodeBackupnode
Datanode Datanode Datanode
• Mantiene metadata
• Ubicación de bloques
• No tiene los datos!
• SPOF
• Contiene los datos
• No tiene metadata
• Sirve los datos a los clientes
domingo 6 de noviembre de 2011
Leer
Namenode Backupnode
Datanode Datanode Datanode Datanode
Cliente HDFS
Ubicación de los bloques, metada
Transferencia de datos
domingo 6 de noviembre de 2011
Escribir
Namenode Backupnode
{node1,node2,node3}
Transferencia de datos
foo.bar
Datanode Datanode Datanode Datanode
Cliente HDFS
domingo 6 de noviembre de 2011
Escribir
Namenode Backupnode
{node1,node2,node4}
Transferencia de datos
foo.bar
Datanode Datanode Datanode Datanode
Cliente HDFS
domingo 6 de noviembre de 2011
Escribir
Namenode Backupnode
{node2,node3,node4}
Transferencia de datos
foo.bar
Cliente HDFS
Datanode Datanode Datanode Datanode
domingo 6 de noviembre de 2011
Escribir
Namenode Backupnode
{node1,node3,node4}
Transferencia de datos
foo.bar
Datanode Datanode Datanode Datanode
Cliente HDFS
domingo 6 de noviembre de 2011
Fault tolerance
Namenode Backupnode
Datanode Datanode Datanode Datanode
El namenode detecta un datanode caido
domingo 6 de noviembre de 2011
Fault tolerance
Namenode Backupnode
Datanode Datanode Datanode
El namenode releva los bloques perdidos y los recupera de los nodos sanos, manteniendo el nivel de replicacion
domingo 6 de noviembre de 2011
Fault tolerance
Namenode Backupnode
Datanode Datanode Datanode
El namenode releva los bloques perdidos y los recupera de los nodos sanos, manteniendo el nivel de replicacion
domingo 6 de noviembre de 2011
Fault tolerance
Namenode Backupnode
Datanode Datanode Datanode
El namenode releva los bloques perdidos y los recupera de los nodos sanos, manteniendo el nivel de replicacion
domingo 6 de noviembre de 2011
Escalamiento horizontal dinamico y rebalanceo
Namenode Backupnode
Datanode Datanode Datanode Datanode
Se agrega un nuevo datanode al cluster
domingo 6 de noviembre de 2011
Escalamiento horizontal dinamico y rebalanceo
Namenode Backupnode
Datanode Datanode Datanode Datanode
El namenode rebalancea el nuevo cluster, removiendo las copias extras que no se necesitan
domingo 6 de noviembre de 2011
Escalamiento horizontal dinamico y rebalanceo
Namenode Backupnode
Datanode Datanode Datanode Datanode
El namenode rebalancea el nuevo cluster, removiendo las copias extras que no se necesitan
domingo 6 de noviembre de 2011
Escalamiento horizontal dinamico y rebalanceo
Namenode Backupnode
Datanode Datanode Datanode Datanode
El namenode rebalancea el nuevo cluster, removiendo las copias extras que no se necesitan
domingo 6 de noviembre de 2011
Escalamiento horizontal dinamico y rebalanceo
Namenode Backupnode
Datanode Datanode Datanode Datanode
El namenode rebalancea el nuevo cluster, removiendo las copias extras que no se necesitan
domingo 6 de noviembre de 2011
Map/Reduce
• Paradigma de programación distribuida
• Basada en un paper de Google (http://bit.ly/gXZbsk)
• Modelada a partir de las ideas de programación funcional
• Distribuible en N nodos
• map() -> reduce()
• La etapa de reduce se inicia cuando todos los mappers terminan.
domingo 6 de noviembre de 2011
Map/Reduce
• Dos pasos secuenciales
• Map: Toma todas las lineas de un input, y por cada una, las procesa y devuelve un par de key valor
• Reduce: Recibe secuencialmente un key valor, los procesa y emite un resultado (ej: otro k-v)
domingo 6 de noviembre de 2011
import hadoopy
def mapper(key, value): for word in value.split(): yield word, 1
def reducer(key, values): accum = 0 for count in values: accum += int(count) yield key, accum
if __name__ == "__main__": hadoopy.run(mapper, reducer, doc=__doc__)
M/R: WC en Python
$ echo "a b a a b c" | python wc.py map | sort | python wc.py reducea 3b 2c 1
wc.py
Probandolo localmente!
** usando el modulo Hadoopy para usar python con Hadoop!
domingo 6 de noviembre de 2011
Ejemplo: wordcount
Muchos archivos
mapper
mapper
mapper
mapper<arch2.parte2>
<arch2.parte1>
<arch1.part2>
<arch1.part1>{‘foo’:12,‘bar’:13,‘baz’:19}
{‘foo’:33,‘bar’:23,‘baz’:42}
{‘foo’:1,‘bar’:0,‘baz’:99}
{‘foo’:55,‘bar’:43,‘baz’:65}
Reducer
Reducer
Reducer
Ordenar y
agrupar por key
{‘foo’:[12,33,1,55]}
{‘bar’:[13,23,0,43]}
{‘baz’:[19,42,99,65]}
count()
count()
count()
count()
sum()
sum()
sum()
{‘foo’:101,‘bar’:79,‘baz’:218}
domingo 6 de noviembre de 2011
M/R en Hadoop
• Las tareas de M/R son entregadas como un “job”
• Los “jobs” se asignan a una cola especifica de trabajo
• Los jobs son “rack-aware”: aprovecharse del “data locality”
• Cada “job” es divido en N “tasks”
• Streaming: No usar Java para M/R
• Las tareas que fallen son reiniciadas automaticamente.
domingo 6 de noviembre de 2011
Esquema conceptualJobtracker
TaskTracker TaskTracker TaskTracker TaskTracker
Datos temporales en HDFS
Cliente M/RCliente M/RCliente M/R
domingo 6 de noviembre de 2011
Fase inicialJobtracker
TaskTracker TaskTracker TaskTracker TaskTracker
Datos temporales en HDFS
Cliente M/RCliente M/RCliente M/R
mapper mapper mapper mapper
Envia jobs
Los maps() son asignados a los TaskTracker(teniendo en cuenta la localidad de la data)Cada mapper es
ejecutado en una JVM
Lee los archivos de input y graba los archivos intermedios
domingo 6 de noviembre de 2011
Fase reduceJobtracker
TaskTracker TaskTracker TaskTracker TaskTracker
Datos temporales en HDFS
Cliente M/RCliente M/RCliente M/R
reducer reducer reducer reducer
Envia “jobs”Envia jobs
Comienza la fase de Reduce
Lee los archivos temporales y graba los resultados
domingo 6 de noviembre de 2011
M/R: Implementación
• M/R es excelente para los problemas donde los ‘sub-problemas’ no son interdepientes
• Nada se comparte entre mappers y reducers, ni siquiera si corren en el mismo nodo
• X ej: La salida de un mapper no puede depender de la salida o comunicación con otro mapper
domingo 6 de noviembre de 2011
HBASE
• Key/Value store montado sobre HDFS
• Rapido (Finalmente!)
• Soporta range scan de keys
• Soporta nocion de tablas, pero usando column families para agrupar columnas
• Soporta M/R sobre las tablas
domingo 6 de noviembre de 2011
HBASE no es
• Un reemplazo de un RDBMS
• Un reemplazo de un datawarehouse
• No Joins, no query engine, no datatypes, no sql
• No acid
• No Schema
• No es excelente para guardar datos pequeños
• No es excelente para almacenar grandes datos binarios
domingo 6 de noviembre de 2011
HBASE es• Excelente para escrituras rapidas/streaming
• Tolerante a fallos
• Buena escalando horizontalmente de manera lineal
• Eficiente manejando billones de filas y millones de columnas
• Buena manteniendo la historia de una fila
• Autobalance
• Excelente para data no normalizada
• Un complemento excelente entre la RDBMS y el Datawarehouse (Hadoop)
domingo 6 de noviembre de 2011
HBASE
• Escrito en Java
• Almacenamiento orientado a columnas = schemas flexibles
• Se puede alterar el schema simplemente agregando el nombre de la columna.
• No hay migraciones de schema!
• Cada columna tiene un timestamp asociado
• La misma columna con el timestamp más reciente gana
domingo 6 de noviembre de 2011
Hive
• Simula datos estructurados usando archivos en HDFS
• HiveQL: Query language similar a SQL
• Traduce HiveQL a Map/Reduce
• O sea: No es realtime, no reemplaza RDBMS
• Auto-particionado
domingo 6 de noviembre de 2011
Ejemplo de Hivehive> select key, count(1) from kv1 where key > 100 group by key;
vs.
$ cat > /tmp/reducer.shuniq -c | awk '{print $2"\t"$1}‘
$ cat > /tmp/map.shawk -F '\001' '{if($1 > 100) print $1}‘
$ bin/hadoop jar contrib/hadoop-0.19.2-dev-streaming.jar -input /user/hive/warehouse/kv1 -mapper map.sh -file /tmp/reducer.sh -file /tmp/map.sh -reducer reducer.sh -output /tmp/largekey -numReduceTasks 1
$ bin/hadoop dfs –cat /tmp/largekey/part*
domingo 6 de noviembre de 2011
Otros componentesLibreria para implementar machine learning sobre hadoop
Zookeeper: Servicio que mantiene un K-V store consistente usando N nodos. Se usa para coordinar servicios distribuidos
HUE: Hadoop User Experience. Una linda WEB-UI sobre Hadoop.
domingo 6 de noviembre de 2011
En Smowtion?
• 250.000.000 de hits x dia
• Solucion con PHP + MySQL =
• Lo reemplazamos por algo asi:
domingo 6 de noviembre de 2011
Cómo seguir?
• Cloudera.com
• Cloudera.com
• Hadoop.apache.org
• IRC: #hadoop en freenode.org
• http://developer.yahoo.com/hadoop/tutorial/
domingo 6 de noviembre de 2011
Gracias!
• Twitter: @ezegolub
• egolub@smowtion.com
• http://www.linkedin.com/in/ezegolub
domingo 6 de noviembre de 2011
Trabajá en Smowtion
• Estamos buscando perfiles tecnicos (Developers/SysAdmins)
• Nos gustan los problemas dificiles
• Nos gustan las tecnologias nuevas
• Buen ambiente de trabajo y todo eso
• jobs@smowtion.com
domingo 6 de noviembre de 2011