Doporučovací systém se Sparkem

Protože se učím PySpark, respektive Spark jako takovej, dostal jsem se k základnímu zadání: napsat doporučovací systém. Ten na základě hodnocení filmů vybere takové filmy, které se uživateli budou nejpravděpodobněji líbit. Zde popsaná implementace je hodně jednoduchá, pro učební účely ale dostatečná. A pochází z tutorialu na Codementor.io.

Podobný systém jsem již jednou tvořil. Vznikal v rámci kurzu analýzy pomocí R od Duke university. Celý výstup je zde: https://rpubs.com/chajim/movie_prediction

Doporučovací systém na Sparku je veskrze jednoduchý, je dokonce kratší, než implementace v R, ale tam je nejdelší stejně hledání závislých proměnných. Ve Sparku jsem rovnou skočil do machine learningového algoritmu, takže jsem ušetřil dost řádků. Navíc implementace v R mi skončila vlastně jen závěrem, že jsem našel řešení. V rámci tutorialu Sparku jsem ale nepokračoval v implementaci online rozhraní, které by mi umožnilo udělat interaktivní aplikaci (nebylo to ani mým záměrem, chtěl jsem jen vyzkoušet manipulaci s daty a trénování na datasetu).

Tato implementace využívá předpokladu, že když dva uživatelé sdílí názor na jednu věc, budou mít podobný názor i na jinou věc. Takže se hledají průniky k rozhodnutí, zda ten nebo onen film doporučit a to podle toho, co se líbilo danému uživateli a jiným uživatelům.

Fascinující na celé věci je, že implementace byla strašně rychlý a přímočará. Zatímco mé předchozí experimenty s machine learningovými aplikacemi, ať už šlo o štítkování emocí titulků u prezidenta, nebo jiné experimenty s rozpoznáváním čísel, apod (vyloženě školní úkoly, pro demonstraci principu, nic wow), obnášela vždy o něco víc řádků, tak ve Sparku se to dalo pořešit prakticky během jednoho příkazu. To je docela podobný i R. Mám z toho velkou radost, protože tohle umožňuje se víc soustředit na samotné principy, namísto implementace.

Ještě si projedu pár věcí, než se pustím do vlastního vymýšlení, ale ze Sparku jsem nadšenej podobně, jako z Unity:D

 

Zdrojový kód, který jsem pustil na Databricks.com:


# Databricks notebook source

# COMMAND ----------

'''
RATINGS.CSV
'''
#načíst dataset - ratings
small_ratings_raw_data = sc.textFile("/FileStore/tables/bn9lterq1493545343133/ratings.csv")
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]

# zprocesovat do RDD
'''
vezmi raw data, filtruj je podle podmínky: pokud řádek není hlavička, tak vykonej mapování - rozděl data na čárce a vytvoř tokeny pro každou takto rozdělenou hodnotu
dataset ratings.csv má formát: userId,movieId,rating,timestamp => a[0],a[1],a[2],a[3]
'''
small_ratings_data = small_ratings_raw_data.filter(lambda line: line!=small_ratings_raw_data_header)\
.map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

#ukaž výsledek pro kontrolu
#small_ratings_data.take(3)

# COMMAND ----------

#/FileStore/tables/bn9lterq1493545343133/links.csv
#/FileStore/tables/bn9lterq1493545343133/tags.csv

'''
MOVIES.CSV
'''

#načíst dataset
small_movies_raw_data = sc.textFile("/FileStore/tables/bn9lterq1493545343133/movies.csv")
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]
# zprocesovat do RDD
'''
vezmi raw data, filtruj je podle podmínky: pokud řádek není hlavička, tak vykonej mapování - rozděl data na čárce a vytvoř tokeny pro každou takto rozdělenou hodnotu
dataset movies.csv má formát: movieId,title,genres => a[0],a[1],a[2]
'''
small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
.map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

#ukaž výsledek pro kontrolu
#small_movies_data.take(3)

# COMMAND ----------

#/FileStore/tables/bn9lterq1493545343133/links.csv
#/FileStore/tables/bn9lterq1493545343133/tags.csv

'''
MOVIES.CSV
'''

#načíst dataset
small_movies_raw_data = sc.textFile("/FileStore/tables/bn9lterq1493545343133/movies.csv")
small_movies_raw_data_header = small_movies_raw_data.take(1)[0]
# zprocesovat do RDD
'''
vezmi raw data, filtruj je podle podmínky: pokud řádek není hlavička, tak vykonej mapování - rozděl data na čárce a vytvoř tokeny pro každou takto rozdělenou hodnotu
dataset movies.csv má formát: movieId,title,genres => a[0],a[1],a[2]
'''
small_movies_data = small_movies_raw_data.filter(lambda line: line!=small_movies_raw_data_header)\
.map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()

#ukaž výsledek pro kontrolu
small_movies_data.take(3)

# COMMAND ----------

'''
Připravit data pro machine-learning doporučovací systém
'''
# training dataset

# vyber náhodný vzorek
training_RDD, validation_RDD, test_RDD = small_ratings_data.randomSplit([6, 2, 2], seed=0L)

# tuple
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

# COMMAND ----------

'''
Trénuj data

Máme strukturu UserID, MovieID a Rating. Rating je předpovídám tímto ALS modelem.
'''
from pyspark.mllib.recommendation import ALS
import math

# parametry
seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)

predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))

# sluč predikce s aktuálním hodnocením
rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
# vypočítej chybovost/přesnost
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
errors[err] = error
err += 1
print 'Pro hodnocení %s je střední kvadratická odchylka %s' % (rank, error)
if error < min_error:
min_error = error
best_rank = rank

print 'Nejlepší model je pro hodnocení %s' % best_rank

predictions.take(3)
rates_and_preds.take(3)

# COMMAND ----------

#/FileStore/tables/khxozpad1493547888211/reduced_ratings.csv
'''
soubor měl 0.7GB, takže jsem ho promazal
'''

ratings_raw_data = sc.textFile("/FileStore/tables/khxozpad1493547888211/reduced_ratings.csv")
ratings_raw_data_header = ratings_raw_data.take(1)[0]

# Parse
ratings_data = ratings_raw_data.filter(lambda line: line!=ratings_raw_data_header)\
.map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

print "Celkem je kompletních %s doporučení." % (ratings_data.count())
# COMMAND ----------

'''
Teď můžeme natrénovat dataset
'''

training_RDD, test_RDD = ratings_data.randomSplit([7, 3], seed=0L)

complete_model = ALS.train(training_RDD, best_rank, seed=seed,
iterations=iterations, lambda_=regularization_parameter)

# otestuj

test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print 'V testu vychází střední kvadr.odchylka jako %s' % (error)

# COMMAND ----------

'''
větší datasety
/FileStore/tables/f3il8gs71493549413754/movies.csv
/FileStore/tables/f3il8gs71493549413754/links.csv
'''

complete_movies_raw_data = sc.textFile("/FileStore/tables/f3il8gs71493549413754/movies.csv")
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
.map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))

print "Cewlkem v datasetu je %s filmů" % (complete_movies_titles.count())

# COMMAND ----------

'''
teď je třeba vybrat filmy s minimálním počtem hodnocení
'''

def get_counts_and_averages(ID_and_ratings_tuple):
nratings = len(ID_and_ratings_tuple[1])
return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

# COMMAND ----------

'''
Nový uživatel hodnotí - ID 0, protože toto id není přiřazeno
'''

new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
(0,260,4), # Star Wars (1977)
(0,1,3), # Toy Story (1995)
(0,16,3), # Casino (1995)
(0,25,4), # Leaving Las Vegas (1995)
(0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
(0,335,1), # Flintstones, The (1994)
(0,379,1), # Timecop (1994)
(0,296,3), # Pulp Fiction (1994)
(0,858,5) , # Godfather, The (1972)
(0,50,4) # Usual Suspects, The (1995)
]
new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print 'Nová hodnocení: %s' % new_user_ratings_RDD.take(10)

complete_data_with_new_ratings_RDD = ratings_data.union(new_user_ratings_RDD)

# COMMAND ----------

'''
Nový trénink
'''

from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed,
iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print "Nový model byl natrénován v %s vteřinách" % round(tt,3)

# COMMAND ----------

'''
top rating
'''

new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

# COMMAND ----------

# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

# COMMAND ----------

new_user_recommendations_rating_title_and_count_RDD = \
new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

# méně než 25 hodnocení? nechceme!
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('NEJlepší doporučení (s více, jak 25 hodnoceními):\n%s' %
'\n'.join(map(str, top_movies)))

# COMMAND ----------

'''
Nejlepší film pro uživatele
'''

my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD.take(1)

&nbsp;

Reklamy

Zanechat Odpověď

Vyplňte detaily níže nebo klikněte na ikonu pro přihlášení:

WordPress.com Logo

Komentujete pomocí vašeho WordPress.com účtu. Odhlásit / Změnit )

Twitter picture

Komentujete pomocí vašeho Twitter účtu. Odhlásit / Změnit )

Facebook photo

Komentujete pomocí vašeho Facebook účtu. Odhlásit / Změnit )

Google+ photo

Komentujete pomocí vašeho Google+ účtu. Odhlásit / Změnit )

Připojování k %s