Coverage for src/aggregator/stats_utils.py: 98%
52 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-05-14 23:08 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-05-14 23:08 +0000
1"""Module for stats adding and updating utils."""
2import json
3import time
4from collections import OrderedDict
5from os import listdir
6from os.path import isfile, join
8import requests
11def change_artist_data(response, artist_id, file_path):
12 """Util function for artists stats data aggregation or updating."""
13 artist_stats = response.json().get('data').get('artistUnion').get('stats')
15 with open(
16 f'{file_path}/artist-{artist_id}.json',
17 mode='r+',
18 encoding='utf-8'
19 ) as artist_file:
20 data = json.load(artist_file, object_pairs_hook=OrderedDict)
21 data['followers'] = artist_stats.get('followers')
22 data['monthly_listeners'] = artist_stats.get('monthlyListeners')
23 data['world_rank'] = artist_stats.get('worldRank')
24 data['top_cities'] = artist_stats.get('topCities').get('items')
25 artist_file.seek(0)
26 json.dump(
27 data,
28 artist_file,
29 indent=4,
30 sort_keys=False,
31 ensure_ascii=False
32 )
33 artist_file.truncate()
36def change_track_data(response, artist_id, file_path):
37 """Util function for track stats data aggregation or updating."""
38 tracks_data = response.get('data').get('albumUnion').get('tracks').get('items')
39 path = f'{file_path}/artist-{artist_id}.json'
41 for track in tracks_data:
42 track_data = track.get('track')
43 track_id = track_data.get('uri').split(':')[2]
44 track_playcount = track_data.get('playcount')
46 with open(
47 file=path,
48 mode='r+',
49 encoding='utf-8'
50 ) as artist_file:
51 data = json.load(
52 artist_file,
53 object_pairs_hook=OrderedDict
54 )
55 for artist_track in data.get('tracks'):
56 if artist_track.get('track_id') == track_id:
57 artist_track['playcount'] = track_playcount
58 artist_file.seek(0)
59 json.dump(
60 data,
61 artist_file,
62 indent=4,
63 sort_keys=False,
64 ensure_ascii=False
65 )
66 artist_file.truncate()
69def get_artist_response_template(artist_id, timeout, request_count, headers, extensions):
70 """Util function for taking artist response template."""
71 get_artist_stats_params = {
72 'operationName': 'queryArtistOverview',
73 'variables': '{"uri":"spotify:artist:' +
74 artist_id +
75 '","locale":"","includePrerelease":true,"enableAssociatedVideos":false}',
76 'extensions': extensions,
77 }
79 response = requests.get(
80 'https://api-partner.spotify.com/pathfinder/v1/query',
81 params=get_artist_stats_params,
82 headers=headers,
83 timeout=10
84 )
85 time.sleep(timeout)
86 request_count += 1
88 return response, request_count
91def get_artist_albums_ids(artist_id, file_path):
92 """Util function for taking artist albums ids."""
93 with open(
94 f'{file_path}/artist-{artist_id}.json',
95 mode='r',
96 encoding='utf-8'
97 ) as artist_file:
98 data = json.load(artist_file)
100 artist_albums = data.get('albums')
101 albums_ids = []
102 for album in artist_albums:
103 albums_ids.append(album.get('album_id'))
105 return albums_ids
108def get_ids_from_file_names(files_path):
109 """Util function for taking ids from files names by files path."""
110 files_list = listdir(files_path)
111 if '.DS_Store' in files_list:
112 files_list.remove('.DS_Store')
113 files_ids = [
114 file.split('.')[0].split('-')[1] for file in files_list if isfile(
115 join(
116 files_path,
117 file
118 )
119 )
120 ]
122 return files_ids
125# if __name__ == '__main__':
126# response, request_count = get_artist_response_template(
127# artist_id='0M2HHtY3OOQzIZxrHkbJLT',
128# timeout=1,
129# request_count=0,
130# headers=client_headers,
131# extensions=get_artist_stats_extensions
132# )
133# print(response)