• No se han encontrado resultados

5.3 Fase de construcción

5.3.3 Construcción del Data Lake

Desplegados y disponibles todos los contenedores Docker de la capa de datos, es el momento de desarrollar los programas necesarios para construir el DataLake. La estrucura es sencilla, un programa recopilará los tweets y los colocará en un canal de Kafka mientras que otro programa accederá a este canal para poder recopilar estos tweets y los escribirá en la base de datos con un

formato clave valor.

5.3.3.1 La fuente. Twitter

Recordemos que se decide durante la fase de elaboración tomar como fuente única de datos Big Data a la red social Twitter. Como se detalla en la sección de tecnologías del capítulo 4 de este trabajo, no es necesario inventar una manera de recopilar los tweets, ya que Twitter posee una API a la que

se puede tener acceso gratuito de forma limitada. El primer paso, por tanto, será conseguir acceso a dicha API. Para ello, debemos registrarnos en developer.twitter.com dando datos como nuestra cuenta de Twitter, un email y contraseña, nombre de usuario y explicar el uso que se le dará a la API. Una vez solicitado, hemos de esperar a que Twitter acepte nuestra solicitud.

Una vez disponemos de una cuenta de desarrollador, creamos una nueva aplicación dentro de la página web anteriormente mencionada. Una aplicación no es otra cosa que una instancia de la API. Cada aplicación se distingue por cuatro claves cifradas, únicas, y secretas que se generan automáticamente al crear la aplicación:

• API key

• API secret key

• Access Token

• Access Token secret

Las dos primeras claves se utilizan para identificar la aplicación mientras que las dos últimas claves

se utilizan para identificar al usuario que intenta acceder a la misma. En la figura 5.4 se pueden ver las claves generadas para este trabajo:

66 5.3. FASE DE CONSTRUCCIÓN

Figura 5.4: Claves API Twitter

Con este paso, ya disponemos de todo lo necesario para construir un programa que puedaminar Tweets. Conviene también en esta sección explicar cuál es la estructura de un Tweet, ya que es muy distinto de lo que se puede ver en la red social como usuario. En esencia, un Tweet es un texto (el

tweet en sí) que viene acompañado por un gran número de metadatos como la localización donde fue escrito, desde que dispositivo, los datos de la cuenta del autor, y un largo etcétera. La mejor forma es ver un ejemplo directamente tomado de Twitter:

1 { 2 " text ": " string ", 3 " truncated ": true, 4 " in_reply_to_user_id ":" string ", 5 " in_reply_to_status_id ":" string ", 6 " favorited ": false,

7 " source ": "<a href =\" http:// twitter .com /\"

rel =\" nofollow \"> Twitter for iPhone </a >", 8 " in_reply_to_screen_name ": null, 9 " in_reply_to_status_id_str ": null, 10 " id_str ": " string ", 11 " entities ": { 12 " user_mentions ": [ 13 { 14 " indices ": [ 15 3, 16 19 17 ], 18 " screen_name ": " string ", 19 " id_str ":" string ", 20 " name ": " string ", 21 "id": 271572434 22 } 23 ], 24 " urls ": [ ], 25 " hashtags ": [ ] 26 }, 27 " contributors ": null,

CAPÍTULO 5. RESULTADOS 67

28 " retweeted ": false,

29 " in_reply_to_user_id_str ": null, 30 " place ": null,

31 " retweet_count ": 4,

32 " created_at ": "Sun Apr 03 23:48:36 +0000 2011", 33 " retweeted_status ": { 34 " text ": " string ", 35 " truncated ": false, 36 " in_reply_to_user_id ": null, 37 " in_reply_to_status_id ": null, 38 " favorited ": false, 39 " source ": "<a

href =\" http:// www. hootsuite .com \" rel =\" nofollow \"> HootSuite </a >", 40 " in_reply_to_screen_name ": null, 41 " in_reply_to_status_id_str ": null, 42 " id_str ":" string ", 43 " entities ": { 44 " user_mentions ": [ ], 45 " urls ": [ ], 46 " hashtags ": [ 47 { 48 " text ": "PGP", 49 " indices ": [ 50 130, 51 134 52 ] 53 } 54 ] 55 }, 56 " contributors ": null, 57 " retweeted ": false, 58 " in_reply_to_user_id_str ": null, 59 " place ": null, 60 " retweet_count ": 4,

61 " created_at ": "Sun Apr 03 20:24:49 +0000 201

1", 62 " user ": { 63 " notifications ": null, 64 " profile_use_background_image ": true, 65 " statuses_count ": 31, 66 " profile_background_color ": "C0DEED ", 67 " followers_count ": 3066, 68 " profile_image_url ":

" http://a3.com / themes / theme1/bg.jpg", 69 " listed_count ": 6,

70 " profile_background_image_url ":

" http://a3.com / themes / theme1/bg.jpg", 71 " description ": "",

72 " screen_name ": " PostGradProblem ", 73 " default_profile ": true,

68 5.3. FASE DE CONSTRUCCIÓN 74 " verified ": false, 75 " time_zone ": null, 76 " profile_text_color ": "333333", 77 " is_translator ": false, 78 " profile_sidebar_fill_color ": " DDEEF6", 79 " location ": "", 80 " id_str ": "271572434", 81 " default_profile_image ": false, 82 " profile_background_tile ": false, 83 " lang ": "en", 84 " friends_count ": 21, 85 " protected ": false, 86 " favourites_count ": 0,

87 " created_at ": "Thu Mar 24 19:45:44

+0000 2011", 88 " profile_link_color ": "0084B4", 89 " name ": " PostGradProblems ", 90 " show_all_inline_media ": false, 91 " follow_request_sent ": null, 92 " geo_enabled ": false, 93 " profile_sidebar_border_color ": "C0DEED ", 94 "url": null, 95 "id": 271572434, 96 " contributors_enabled ": false, 97 " following ": null, 98 " utc_offset ": null 99 }, 100 "id": 54640519019642880, 101 " coordinates ": null, 102 "geo": null 103 }, 104 "id": 54691802283900930, 105 " coordinates ": null, 106 "geo": null 107 } 5.3.3.2 Ingesta de datos

Con las claves ya generadas y una aplicación creada en la API de Twitter, podemos comenzar el desarrollo del programa con el que recopilaremos los datos y los enviaremos a un canal de Kafka que previamente crearemos. Este programa se desarrolla en Python ya que en él se hace uso de la librería

Tweepy. Sin entrar en detalle, Tweepy implementa funciones que permiten al usuario conectar con la API de Twitter de manera rápida y, más importante, detectar cada tweet recolectado para permitir al usuario realizar una acción en el momento en el que lo detecta.

En la primera sección de código se importan los paquetes necesarios para conectar con Kafka así como todas las funciones que se necesitan de Tweepy. Inmediatamente después, se declaran como variables globales las claves que obtuvimos al registrarnos como desarrolladores en Twitter.

CAPÍTULO 5. RESULTADOS 69

función que actúa de forma similar a un agente que realiza web scrapping, solo que es interno a la Time Line de Twitter. Es decir, cuando se ejecute este programa se comenzará a recopilar tweets en directo, y se ignoran los tweets cuya fecha sea inferior al momento en el que se lanza el programa. Esta clase constará de tres funciones:

• init: Es el constructor de la clase. En este caso, añadimos como argumento un producer de Kafka, que previamente hemos importado, y lo inicializamos.

• on_data: Es la función llamada cada vez que se detecta un nuevo tweet recopilado. El condi- cional que contiene la función sirve como filtro para detectar tweets de los cuáles se pueda

extraer todo el contenido. Esta operación es necesaria porque la propia API de Twitter divide los tweets en tres categorías: tweet, retweet, y mención; y solo la primera de las tres opciones devuelve el texto del tweet totalmente completo, lo cuál es vital ya que este texto es la materia

prima para el análisis de sentimientos. Una vez encontrado, se añade el tweet al topic de Kafka que creamos codificado en UTF-8.

• on_error: Función que se dispara si se produce un error en el método on_data.

Por último, se desarrolla el método main de esta clase. Para ello, el primer paso es conectar a la API de Twitter a través de los credenciales que asignamos previamente en forma de variables globales. Primero se crea un objeto de la clase StdOutListener al que se le especifica como argumento el puerto

donde se aloja el canal de Kafka en el que depositará los tweets que recopile. Después, se utiliza el método de la OAuthHandler de la librería Tweepy para utilizar los credenciales y crear una intancia del streaming de Twitter. En resumen, en este punto del código ya se ha definido la codificación que

tendrán los datos almacenados, su lugar, y su modo de almacenamiento.

Finalmente, solo queda hacer uso del métodofilter que permite especificar características relativas a los metadatos del tweet para actuar a modo de filtro de los tweets a recopilar. En este caso, por

motivos del poco volumen de contenido que genera el tema educación en castellano, se especifica que solo se busquen tweets de habla inglesa. Es importante indicar que este filtro solo se limita al idioma por lo que la localización del usuario que escriba el tweet no es importante. Finalmente, se añaden

una serie de keywords relacionadas con el campo de la educación, especialmente universitaria y la codificación en UTF-8 para asegurar una correcta decodificación a la hora de almacenarlos en la base de datos.

Listado 5.1: Reader.py

 

1 #Import de las librerias necesarias

2 from kafka import KafkaConsumer , KafkaProducer 3 from tweepy . streaming import StreamListener 4 from tweepy import OAuthHandler

5 from tweepy import Stream 6

7 #Variables globales con los credenciales de acceso a la API de Twitter

8 consumer_key = " cnwuws9XWkaT1aQOGDpAiRMRw " 9 consumer_secret = " u1Ce8Lou0rZckm5ZdY4FVp8MK2G48WwGpZ4wYGj4se8fqlZOKm " 10 access_token = " 839122483169144832 - UvNQprUoWC1VFS5MdFzRISd2qHd1Rde " 11 access_token_secret = " cP83mgRGg5gr07BKciJC3aCh4KfchrU3AUOnMJOmioZQk " 12 13

14 #Clase Listener de la libreria Tweepy. Imprime los tweets recopilados en pantalla

15 class StdOutListener ( StreamListener ): 16

17 def __init__ (self , kafkaProducer ):

18 super( StdOutListener , self ). __init__ () 19 self . producer = kafkaProducer

70 5.3. FASE DE CONSTRUCCIÓN

20

21 def on_data (self , data ):

22 #print(data)

23 if(not(" retweeted_status " in data )):

24 if(" extended_tweet " in data ):

25 print( data )

26 self . producer . send ('my - topic ',

data . encode ('UTF -8 '))

27 return True

28

29 def on_error (self , status ): 30 print( status )

31

32 if __name__ == '__main__ ': 33

34 #Autentificacion del usuario y conexion a la API de Twitter

35 l =

StdOutListener ( KafkaProducer ( bootstrap_servers ='kafka :9092 '))

36 auth = OAuthHandler ( consumer_key , consumer_secret )

37 auth . set_access_token ( access_token , access_token_secret ) 38 stream = Stream (auth , l)

39

40 #Filtros aplicados para la busqueda de tweets que contengan palabras clave.

41 stream .filter( languages =["en"], track =[" university ",

" college ", " education ", " subject ",

" professor "," teacher "," learning ", " tuition ",

" scholar "," scholarship ",

42 " degree "," student "," dropout "," higher

education "], encoding ='utf8 ')

 

5.3.3.2 Escritura en Base de Datos Oracle NoSQL

Una vez se ha comprobado el correcto funcionamiento del programa de ingesta de datos, es el

momento de desarrollar el programa que tome los datos del topic de Kafka y los introduzca en la base de datos con un formato clave-valor. Este programa se desarrolla en Java, ya que es un lenguaje desarrollado por Oracle y al ser la base de datos de la misma empresa proveen al usuario con una

librería que facilita la conexión con la base de datos, así como el resto de operaciones CRUD [ref ] En primer lugar, es necesario declarar las variables globales que identifican el canal de Kafka de donde el programa tendrá que tomar los mensajes. Para ello, debemos indicar el nombre del topic, el servidor

donde funciona Kafka y su host. De forma adicional se declara una variable global con el nombre del KV Store al que se deberá acceder para almacenar los datos. Después. el programa consta de dos métodos estácticos donde se realizan las funcionalidades del programa, y un método main donde se

hace uso de los dos métodos anteriores al ejecutar el programa.

El método createConsumer() se utiliza para construir un Consumidor Kafka. En el capítulo cuatro de este trabajo ya se introdujo la tecnología de Kafka y se explicó como en todo evento de comunicación se necesita mínimo un Consumidor, y un Productor. El productor fue creado en el programa anterior, por lo que en este programa solo será necesario crear el Consumidor. Para ello basta con crear un objeto de la clase Properties y asignar como atributos las variables globales que

definimos previamente. Este objeto de la clase Properties actúa como argumento del constructor del Consumidor Kafka. Una vez creado este objeto, antes de devolverlo es necesario suscribirlo al mismo canal o topic al que el programa de carga está enviando los datos.

Por su parte, el métodoload() se utiliza para escribir los mensajes retirados del topic de Kafka por parte del Consumidor a nuestra base de datos no relacional Oracle NoSQL. Este método necesita

como argumento un objeto de la clase KV Store, que le indica en qué KV Store se deben almacenar los datos ya que como se estudió en la sección anterior, es posible la existencia de varias “stores” dentro

CAPÍTULO 5. RESULTADOS 71

de la misma base de datos. Antes de continuar con el algoritmo, es necesario profundizar en cómo

funciona el sistema de almacenamiento de pares clave-valor en esta base de datos para comprender la necesidad de utilizar dos claves primarias ya que no es nada intuitivo, especialmente si se toma como referencia el modelo relacional que es el más común.

De forma sencilla y resumida, una clave es la concatenación de unamajor key y una minor key ambas especificadas por el usuario. Todos los registros de una colección comparten la misma major key, y además se almacenan en la misma ubicación para optimizar la localización de los datos. Por su

parte, la minor key se suele utilizar como identificativo de cada registro dentro de la misma colección. En este trabajo se almacenan tweets, y todos ellos comparten la misma procedencia: el streaming de la API; por tanto se declara como major key la cadena “TweeterStream” para identificar inequívocamente

esta colección. En segundo lugar, es necesario identificar cada registro, para ello se asigna como minor key el atributo ID de cada tweet, que es único e irrepetible. Para conseguirlo se hace uso de la librería Jackson [ref ] que implementa un Parser de Jackson que utilizamos exclusivamente para

obtener el ID del largo objeto que es el tweet en sí.

Finalmente, el método escribe en la base de datos los tweets recogidos del canal de Kafka. Para

ello basta con crear un objeto Key formado por ambas claves, y un objeto Value que será el propio tweet en formato cadena JSON. Para escribirlo, se mantiene la codificación UTF-8 ya que es la misma codificación que tienen los datos en su origen. Una vez escritos, cuándo el programa termina se cierra

la conexión al topic de Kafka para evitar un exceso de gasto de recursos computacionales.

Listado 5.2: Writter.java

 

1 public class writer_v01 { 2

3 private final static String TOPIC = "my - topic ";

4 private final static String BOOTSTRAP_SERVERS = " kafka :9092 "; 5 private final static String defaulthost = " localhost :5000 "; 6 private final static String storename = " kvstore ";

7

8 public static void main ( String [] args ) {

9 / / Creacion del Handle a la BD

10 KVStoreConfig kconfig = new KVStoreConfig ( storename ,

defaulthost );

11 KVStore kvstore = KVStoreFactory . getStore ( kconfig );

12 / / Introduccion de los datos

13 load ( kvstore ); 14 System . exit (0) ;

15 }

16

17 / / Metodo para crear el consumer de kafka, se usa en readNwrite

18 private static Consumer <Long , String > createConsumer () { 19 final Properties props = new Properties ();

20 props . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG ,

BOOTSTRAP_SERVERS );

21 props . put ( ConsumerConfig . GROUP_ID_CONFIG , "my - group "); 22 props . put ( ConsumerConfig . KEY_DESERIALIZER_CLASS_CONFIG ,

LongDeserializer .class. getName ());

23 props . put ( ConsumerConfig . VALUE_DESERIALIZER_CLASS_CONFIG ,

StringDeserializer .class. getName ());

24

25 / / Create the consumer using props.

26 final Consumer <Long , String > consumer = new

KafkaConsumer <>( props );

27

28 / / Subscribe to the topic.

29 consumer . subscribe ( Collections . singletonList ( TOPIC )); 30 return consumer ;

31 }

72 5.3. FASE DE CONSTRUCCIÓN

33

34 private static void load ( KVStore kvstore ){ 35

36 final Consumer <Long , String > consumer = createConsumer (); 37 long timeout = 1000;

38

39 while (true) {

40 final ConsumerRecords <Long , String > consumerRecords =

consumer . poll ( timeout );

41 String majorComp = " TweeterStream "; 42

43 for ( ConsumerRecord <Long , String > msg :

consumerRecords ) {

44 try {

45 final ObjectNode node = new

ObjectMapper (). readValue ( msg . value () , ObjectNode .class);

46 / / Componente mayor "fija"

47 String minorComp = node . get ("id"). toString ();

48 / / Creamos la key, coponente menor única

49 Key myKey = Key . createKey ( majorComp , minorComp );

50 / / Creamos el value serializado

51 Value myValue =

Value . createValue ( msg . value (). getBytes ("UTF -8"));

52 / / Anadimos el record

53 kvstore . put (myKey , myValue );

54 } catch ( Exception e) {

55 System . out . print (" Exception : " +

e. getMessage ()); 56 consumer . close (); 57 System . exit (1) ; 58 } 59 } 60 consumer . commitAsync (); 61 } 62 } 63 }