Coverage for src/aggregator/stats_utils.py: 98%

52 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-05-14 23:08 +0000

1"""Module for stats adding and updating utils.""" 

2import json 

3import time 

4from collections import OrderedDict 

5from os import listdir 

6from os.path import isfile, join 

7 

8import requests 

9 

10 

11def change_artist_data(response, artist_id, file_path): 

12 """Util function for artists stats data aggregation or updating.""" 

13 artist_stats = response.json().get('data').get('artistUnion').get('stats') 

14 

15 with open( 

16 f'{file_path}/artist-{artist_id}.json', 

17 mode='r+', 

18 encoding='utf-8' 

19 ) as artist_file: 

20 data = json.load(artist_file, object_pairs_hook=OrderedDict) 

21 data['followers'] = artist_stats.get('followers') 

22 data['monthly_listeners'] = artist_stats.get('monthlyListeners') 

23 data['world_rank'] = artist_stats.get('worldRank') 

24 data['top_cities'] = artist_stats.get('topCities').get('items') 

25 artist_file.seek(0) 

26 json.dump( 

27 data, 

28 artist_file, 

29 indent=4, 

30 sort_keys=False, 

31 ensure_ascii=False 

32 ) 

33 artist_file.truncate() 

34 

35 

36def change_track_data(response, artist_id, file_path): 

37 """Util function for track stats data aggregation or updating.""" 

38 tracks_data = response.get('data').get('albumUnion').get('tracks').get('items') 

39 path = f'{file_path}/artist-{artist_id}.json' 

40 

41 for track in tracks_data: 

42 track_data = track.get('track') 

43 track_id = track_data.get('uri').split(':')[2] 

44 track_playcount = track_data.get('playcount') 

45 

46 with open( 

47 file=path, 

48 mode='r+', 

49 encoding='utf-8' 

50 ) as artist_file: 

51 data = json.load( 

52 artist_file, 

53 object_pairs_hook=OrderedDict 

54 ) 

55 for artist_track in data.get('tracks'): 

56 if artist_track.get('track_id') == track_id: 

57 artist_track['playcount'] = track_playcount 

58 artist_file.seek(0) 

59 json.dump( 

60 data, 

61 artist_file, 

62 indent=4, 

63 sort_keys=False, 

64 ensure_ascii=False 

65 ) 

66 artist_file.truncate() 

67 

68 

69def get_artist_response_template(artist_id, timeout, request_count, headers, extensions): 

70 """Util function for taking artist response template.""" 

71 get_artist_stats_params = { 

72 'operationName': 'queryArtistOverview', 

73 'variables': '{"uri":"spotify:artist:' + 

74 artist_id + 

75 '","locale":"","includePrerelease":true,"enableAssociatedVideos":false}', 

76 'extensions': extensions, 

77 } 

78 

79 response = requests.get( 

80 'https://api-partner.spotify.com/pathfinder/v1/query', 

81 params=get_artist_stats_params, 

82 headers=headers, 

83 timeout=10 

84 ) 

85 time.sleep(timeout) 

86 request_count += 1 

87 

88 return response, request_count 

89 

90 

91def get_artist_albums_ids(artist_id, file_path): 

92 """Util function for taking artist albums ids.""" 

93 with open( 

94 f'{file_path}/artist-{artist_id}.json', 

95 mode='r', 

96 encoding='utf-8' 

97 ) as artist_file: 

98 data = json.load(artist_file) 

99 

100 artist_albums = data.get('albums') 

101 albums_ids = [] 

102 for album in artist_albums: 

103 albums_ids.append(album.get('album_id')) 

104 

105 return albums_ids 

106 

107 

108def get_ids_from_file_names(files_path): 

109 """Util function for taking ids from files names by files path.""" 

110 files_list = listdir(files_path) 

111 if '.DS_Store' in files_list: 

112 files_list.remove('.DS_Store') 

113 files_ids = [ 

114 file.split('.')[0].split('-')[1] for file in files_list if isfile( 

115 join( 

116 files_path, 

117 file 

118 ) 

119 ) 

120 ] 

121 

122 return files_ids 

123 

124 

125# if __name__ == '__main__': 

126# response, request_count = get_artist_response_template( 

127# artist_id='0M2HHtY3OOQzIZxrHkbJLT', 

128# timeout=1, 

129# request_count=0, 

130# headers=client_headers, 

131# extensions=get_artist_stats_extensions 

132# ) 

133# print(response)