Developed a full-stack movie recommendation system with
- RESTful API to provide clients with 20 recommended movies. Determine the hit rate by collecting clients’ watching data to see if they watch the recommended movies afterwards.
- Continuous Integration for pipeline code. Automated daily model quality evaluation and system supervision with Jinkens.
- Designed and built the infrastructure that can incrementally deploy new versions of recommendation service triggered by canary release and A/B testing.
- Integrated with feedback loops mechanism to detect potential positive or negative feedback loops to further identify potential adversarial attacks. Implemented the monitoring and detection by applying lambda architecture to combine the stream and batch processing results to detect problematic behaviors.
- Comprehensive data quality control on raw data received from Kafka stream, especially focus on data schema issues, missing data, and duplicated data.
- Monitoring Dashboard UI with D3.js. Developed the whole web server by Flask.
- Containerized the whole service by Docker.
Our recommendation system consists of five modules.
KafkaProcessingModule receives, processes Kafka stream and stores data in MongoDB.
FeatureExtractionModule retrieves, transforms data from MongoDB according to FeatureExtractionConfig to features. Then it stores the features in the FeatureStoreFolder.
The above two models inspect the data format or semantic to ensure data quality.
Offline Recommend Training Module retrieves feature from feature store folder and trains the model. It also evaluates the model performance offline and stores the model with evaluation results in model management folder. The hyperparameter or model path are all configured in offline training configuration.
Online prediction module provides online recommendation results given userid. It is within webserver and responds to external API. It loads and manages trained model according to the online deployment configuration.
Production Test module obtains data from both Kafka stream and Online Prediction Module.
All five models are tested for infrastructure quality.
To ensure data is absolutely clean before it can be used to train the model, we do “double check” data cleaning both in “Kafka streaming” part and “feature extraction” part.
For “Kafka streaming” part, it is very important to ensure that the data queried from Kafka topic follows the correct schema. There are two types of ConsumerRecord here. One is watch data which represents the information of a user watch a movie and it follows the schema like:"[TimeStamp],[user_id],GET /data/m/[movie_id]/[block_num].mpg"
The other is rate data which represent the information of how a user rate a movie and it follows the schema like:"[TimeStamp],[user_id],GET /rate/[movie_id]=[score]."
For both watch and rate data, they can be split into 3 pieces by comma. So first we check if this data can be split into exactly 3 piece by comma. [related_test] And we also check if user_id is a numeric string [related test]. Then we check the third piece, which is the most important part. For watch data, it can be split into 5 pieces by “/“ while for rate data it can be split into 3 pieces by “/“. So if the third piece neither can be split into 3 nor 5 pieces, it is invalid [related_test]. If the third piece is split into 5 pieces, we should then make sure the second block is exactly “data” [related_test]. And if the third piece is split into 3 pieces, we should then make sure the second block is exactly “rate” [related_test]. After that, we can extract movie_id out and check if it is not None [related_test]. For watch data, we can further extract block_num out. We then check if block_num is a non-negative numeric string[related test]. For rate data, the movie_id and score can be extracted by splitting last piece by “=“. If splitting result is not two pieces, it is invalid [related test]. Then we further check if score is a numeric string within range 0 to 5 [related test].
After extract useful information from Kafka raw streaming data, we also do lots of check before writing them into database. There are four tables in database. One is for user information, one is for movie information, one is for watch data, and one is for rate data. After extract a user_id out, we first check if the user_data table has already had the record for this user. If yes, we do nothing. If no, we query the API and insert a new record into user_data table [related test]. Same as movie information data [related test]. For rate data, we do consider a rare situation that a user may re-rate a movie with a different score. So we first check if the “stream_rate_data” table has already had a record for this user rating this movie. If yes, we update the score with new score to avoid duplicated data [related test].
Beside, we do consider the situation like what if user and movie information API returns invalid result like unvalid JSON format. So we also mock the API behavior to test if the system can handle the unvalided data provided by API [related_test].
The error handling here is to throw “AttributeError” exception with error message to notify the system that the a data input is not valid. We do not try to do correction here since the error pattern is too unpredictable. We simply dump the invalid data to make sure that the system keeps executing correctly without any bad influence. If time permitted, we may try to do data correction and pattern detection to keep more valuable data.
During the phase of feature extraction, we fetch the data from our MongoDB database, get the features we want and generate a csv file. Although we can allow more flexible feature extraction mechanism through allowing users to config what features they want by writing a config.json, we stick to a simpler implementation of extracting user_id
, movie_id
and score
. Here is how we ensure the data quality.
-
Monitoring the validity of
user_id
,movie_id
andscore
. It is because the pipeline afterwards assume them to be valid numbers (thoughuser_id
andmovie_id
do not need to be numbers,score
must be numbers). It is easy to check through checking whether exceptions are being thrown if we try to convert them to numbers. We do this by writing try..except and setting a threshold. If the percentage of invalid data is bigger than the threshold, actions can be taken. For example, the script can generate emails and send them to developers to warn them about potential data schema change or too many missing data. -
Monitoring the percentage of duplicated data. Exactly same data should not be present in a dataset for machine learning problems. While generating the features, we can calculate the percentage of the duplicated data [related_test]. Through running this test, we can get a sense of how much duplicated data we have in the database. Further actions include storing a clean deduped version of data back into database to overwrite old data.
Under the current implementations, some issues that might go undetected includes:
- The same user rate the same movie for many times with different scores. It may be a problem due to the cause of this issue. If it is due to some bug in the front-end of the application, we should fix it. If the user changes his/her mind, we should keep the last record only.
We have four level tests. The first level is manually test . When finishing writing program, we manually run the program to test correctness. It's the basic step for achieving program functionality.
The second level is unit test. It tests the correctness of each individual module. For example, in the Kafka-Stream-Processing Module, we mock a Kafka stream producer to test whether our system is able to correctly fetch data from remote server. In the Offline Recommend Training Module and Online Prediction Module, we unit test whether the configuration conforms to a particular schema, whether the path we use to open the file exists, whether the some intermediates variables satisfies some constraints, whether each train, evaluate function behaves as expected.
The third level is integration test. It tests whether the interaction between two modules are correct or not. The primary interaction form of this project is by intermediate data(mongodb, watch data table, movie data table, feature vector, model). We build scripts to pipeline some procedures and check whether a cascade of two module behaves as expected.
The fourth level is system level testing. It tests the correctness of the system as a whole. We conduct this level test by containerize our whole service and deploy it at other place and manually query the API to check whether the system behaves as expected.
Because we have limited time for testing, there are still some places for improvement. We are confident about the basic functionality of service, it is robust to wrong path, wrong data format, malformat query api and kafak stream and change of host machine. However, we are not confident about some extreme test cases, performance(memory usage, latency,scalibility) because we don't test much about it. Below are Specific explanations and improvement.
-
Increase unit test coverage. The unit test for Offline Training Module is not adequate. Some bad cases are not tested, like the train and evaluation method in the Model class, only happy cases are tested. Also some test about connecting to external service are not adequate like mocking Mongo DB.
-
Reorganize test folder structure. Since we distribute different modules to different team member, we end up adopting a "Hybrid" test folder structure, some tests are within the same folder as source code and some tests are in an independent folder. To improve it, we should organize all the test in one folder which looks clean and is easy for automation
-
Add more test for performance. Even though we have some tests about the running time , we don't have a comprehensive test about the performance in this project. Load testing, memory testing and latency testing are all very important for service availability.
For test in production, we store each client API query result as {“user_Id”: <user_id>, “movies”:[<recommend_movie_id_1>, <recommend_movie_id_2>,…], “query_time”: <query_time>} into a separate table called “query_table_” to record what movies are recommended to a certain user today. We also summary the watch data as {“user_id”: <user_id>, “movie_id”: <movie_id>, “query_time”: <query_time>} into a separate table called “watch_data_<date>” to record what movies are watched by a certain user today. With these two kind of data, we can see if user really watched movies that recommended by our API.
For example, we have the API query records for “2019-10-10”. And we also have the watch data for “2019-10-11”, “2019-10-12”, and “2019-10-13”. Then we can first extract the users that queried our API on “2019-10-10” from query data [related code]. And then we can see what movies did these users watch in next three days (“2019-10-11”, “2019-10-12”, and “2019-10-13”) [related code]. if they indeed watched the one of the movies we recommended to them, the “counter” will be plused by 1 [related code]. And we compute “counter" multipled by the total number of movies we recommended on “2019-10-10” to get the “hit rate” to see if how well our model performs [related code].
To make our test in production more flexible, the method get_hit_rate(date, delta) in “daily_query_summary.py” takes two input parameters. First one indicates which query date you want to analyze. Second one indicates how many days after the “query date” do you want to collect for the watch data. For example, if we want to see if users watched our recommended movies after three days they queried API on "2019-10-10”, we can simply call get_hit_rate(“2019-10-10", 3) to get the hit rate. To show the result more clearly, we save the hit rate result into a csv called “[date][with_delta][delta].csv’” under "test_in_production/daily_query_summary/“ path. (In this case, the result csv file’s name is “2019-10-10_with_delta_3.csv”)
To run the test in production mechanism, simply run “python3 daily_query_summary.py [date] [delta]” under “web_server” folder. And then we will get analysis result in "test_in_production/daily_query_summary/[date]with_delta[delta].csv”.
The example result is shown as following:
date | delta | hit rate |
---|---|---|
2019-10-10 | 3 | 0.4 |
2019-10-11 | 3 | 0.5 |
2019-10-12 | 3 | 0.6 |
2019-10-13 | 3 | 0.7 |
If we see a growing trend of hit rate, it indicates that our model has a good performance and earn clients' trusts. If the hit rate keeps going down, we definitely need to reconsider what is going wrong and may be required to retrain the model.