diff --git a/emission/storage/timeseries/builtin_timeseries.py b/emission/storage/timeseries/builtin_timeseries.py index 64898dcef..e2717f242 100644 --- a/emission/storage/timeseries/builtin_timeseries.py +++ b/emission/storage/timeseries/builtin_timeseries.py @@ -440,3 +440,39 @@ def update_data(user_id, key, obj_id, data): logging.debug("updating entry %s into timeseries" % new_entry) edb.save(ts.get_timeseries_db(key), new_entry) + def find_entries_count(self, key_list = None, time_query = None, geo_query = None, extra_query_list = None): + """ + Returns the total number of documents for the given key_list referring to each of the two timeseries db. + + Input: Key list with keys from both timeseries DBs = [key1, key2, key3, key4, ...] + Suppose (key1, key2) are orig_tsdb keys and (key3, key4) are analysis_tsdb keys + Output: total_count = orig_tsdb_count + analysis_tsdb_count + + Orig_tsdb_count and Analysis_tsdb_count are lists containing counts of matching documents + for each key considered separately for the specific timeseries DB. + + :param key_list: list of metadata keys we are querying for. + :param time_query: the time range in which to search the stream + :param geo_query: the query for a geographical area + :param extra_query_list: any additional queries to filter out data + + For key_list = None or empty, total count of all documents are returned considering the matching entries from entire dataset. + """ + print("builtin_timeseries.find_entries_count() called") + + orig_tsdb = self.timeseries_db + analysis_tsdb = self.analysis_timeseries_db + + if key_list == []: + key_list = None + + # Segregate orig_tsdb and analysis_tsdb keys so as to fetch counts on each dataset + (orig_tsdb_keys, analysis_tsdb_keys) = self._split_key_list(key_list) + + orig_tsdb_count = self._get_entries_for_timeseries(orig_tsdb, orig_tsdb_keys, time_query, geo_query, extra_query_list, None)[0] + analysis_tsdb_count = self._get_entries_for_timeseries(analysis_tsdb, analysis_tsdb_keys, time_query, geo_query, extra_query_list, None)[0] + + total_matching_count = orig_tsdb_count + analysis_tsdb_count + return total_matching_count + + diff --git a/emission/tests/storageTests/TestAnalysisTimeseries.py b/emission/tests/storageTests/TestAnalysisTimeseries.py index 408c1f850..ae07c5b03 100644 --- a/emission/tests/storageTests/TestAnalysisTimeseries.py +++ b/emission/tests/storageTests/TestAnalysisTimeseries.py @@ -31,6 +31,13 @@ def setUp(self): edb.get_analysis_timeseries_db().delete_many({'user_id': self.testUserId}) self.test_trip_id = "test_trip_id" + def tearDown(self): + self.clearRelatedDb() + + def clearRelatedDb(self): + edb.get_timeseries_db().delete_many({"user_id": self.testUserId}) + edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUserId}) + def testCreateNew(self): etsa.createNewTripLike(self, esda.RAW_TRIP_KEY, ecwrt.Rawtrip) etsa.createNewPlaceLike(self, esda.RAW_PLACE_KEY, ecwrp.Rawplace) diff --git a/emission/tests/storageTests/TestPlaceQueries.py b/emission/tests/storageTests/TestPlaceQueries.py index c549d9721..f882ee447 100644 --- a/emission/tests/storageTests/TestPlaceQueries.py +++ b/emission/tests/storageTests/TestPlaceQueries.py @@ -28,6 +28,13 @@ def setUp(self): self.testUserId = uuid.uuid3(uuid.NAMESPACE_URL, "mailto:test@test.me") edb.get_analysis_timeseries_db().delete_many({'user_id': self.testUserId}) + def tearDown(self): + self.clearRelatedDb() + + def clearRelatedDb(self): + edb.get_timeseries_db().delete_many({"user_id": self.testUserId}) + edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUserId}) + def testGetLastPlace(self): old_place = ecwrp.Rawplace() old_place.enter_ts = 5 diff --git a/emission/tests/storageTests/TestSectionQueries.py b/emission/tests/storageTests/TestSectionQueries.py index a0727bff4..9414a9aa4 100644 --- a/emission/tests/storageTests/TestSectionQueries.py +++ b/emission/tests/storageTests/TestSectionQueries.py @@ -30,6 +30,13 @@ def setUp(self): edb.get_analysis_timeseries_db().delete_many({'user_id': self.testUserId}) self.test_trip_id = "test_trip_id" + def tearDown(self): + self.clearRelatedDb() + + def clearRelatedDb(self): + edb.get_timeseries_db().delete_many({"user_id": self.testUserId}) + edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUserId}) + def testQuerySections(self): new_section = ecws.Section() new_section.start_ts = 5 diff --git a/emission/tests/storageTests/TestStopQueries.py b/emission/tests/storageTests/TestStopQueries.py index 8683c22b5..bbe4de188 100644 --- a/emission/tests/storageTests/TestStopQueries.py +++ b/emission/tests/storageTests/TestStopQueries.py @@ -31,6 +31,13 @@ def setUp(self): edb.get_analysis_timeseries_db().delete_many({'user_id': self.testUserId}) self.test_trip_id = "test_trip_id" + def tearDown(self): + self.clearRelatedDb() + + def clearRelatedDb(self): + edb.get_timeseries_db().delete_many({"user_id": self.testUserId}) + edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUserId}) + def testQueryStops(self): new_stop = etsa.savePlaceLike(self, esda.RAW_STOP_KEY, ecws.Stop) new_stop["data"]["trip_id"] = self.test_trip_id diff --git a/emission/tests/storageTests/TestTimeSeries.py b/emission/tests/storageTests/TestTimeSeries.py index 2d5d65edc..73c6c6ae9 100644 --- a/emission/tests/storageTests/TestTimeSeries.py +++ b/emission/tests/storageTests/TestTimeSeries.py @@ -20,6 +20,7 @@ import emission.storage.timeseries.aggregate_timeseries as estag import emission.core.wrapper.localdate as ecwl +import emission.core.wrapper.entry as ecwe # Test imports import emission.tests.common as etc @@ -38,6 +39,7 @@ def tearDown(self): edb.get_timeseries_db().delete_many({"user_id": self.testUUID}) edb.get_uuid_db().delete_one({"user_email": "user1"}) edb.get_uuid_db().delete_one({"user_email": "user2"}) + edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID}) def testGetUUIDList(self): uuid_list = esta.TimeSeries.get_uuid_list() @@ -81,6 +83,114 @@ def testExtraQueries(self): with self.assertRaises(AttributeError): list(ts.find_entries(time_query=tq, extra_query_list=[ignored_phones])) + def testFindEntriesCount(self): + ''' + Test: Specific keys with other parameters not passed values. + + Input: A list of keys from either of the timeseries databases. + - For each dataset: ["background/location", "background/filtered_location", "analysis/confirmed_trip"] + - Testing this with sample dataset: "shankari_2015-aug-21", "shankari_2015-aug-27" + + Outputs: Single number representing total count of matching entries. + - For builtin_timeseries: Returns total count of all entries matching the userid. + - For aggregate_timeseries: Returns total count of all entries matching all users. + + - Validated using grep count of occurrences for keys: 1) "background/location" 2) "background/filtered_location" 3) "analysis/confirmed_trip" + - Syntax: $ grep -c .json + - Sample: $ grep -c "background/location" emission/tests/data/real_examples/shankari_2015-aug-21 + + - Grep Output Counts For Aug-21 dataset for each key: + 1) background/location = 738, 2) background/filtered_location = 508, 3) analysis/confirmed_trip = 0 + Hence total count = 738 + 508 + 0 = 1246 + + - Grep Output Counts For Aug-27 dataset for each key: + 1) background/location = 555, 2) background/filtered_location = 327, 3) analysis/confirmed_trip = 0 + Hence total count = 555 + 327 + 0 = 882 + + For Aggregate Timeseries test case: + + - Input: [] + - Output: 3607 + - 3607 = 2125 (UUID1) + 1482 (UUID2) + - Key 1: timeseries [] -> 3607 = 2125 (UUID1) + 1482 (UUID2) + - Key 2: analysis_timeseries [] -> 0 = 0 (UUID1) + 0 (UUID2) + - Hence total count = 3607 + 0 = 3607 + + - Input: ["background/location", "background/filtered_location", "analysis/confirmed_trip"] + - Output: 2128 + - For each of the 3 input keys from key_list1: + - Key 1: "background/location" -> 1293 = 738 (UUID1) + 555 (UUID2) + - Key 2: "background/filtered_location" -> 835 = 508 (UUID1) + 327 (UUID2) + - Key 3: "analysis/confirmed_trip" -> 0 = 0 (UUID1) + 0 (UUID2) + - Hence total count = 1293 + 835 + 0 = 2128 + + ''' + + ts1_aug_21 = esta.TimeSeries.get_time_series(self.testUUID1) + ts2_aug_27 = esta.TimeSeries.get_time_series(self.testUUID) + + # Test case: Combination of original and analysis timeseries DB keys for Aug-21 dataset + key_list1=["background/location", "background/filtered_location", "analysis/confirmed_trip"] + count_ts1 = ts1_aug_21.find_entries_count(key_list=key_list1) + self.assertEqual(count_ts1, 1246) + + # Test case: Combination of original and analysis timeseries DB keys for Aug-27 dataset + key_list1=["background/location", "background/filtered_location", "analysis/confirmed_trip"] + count_ts2 = ts2_aug_27.find_entries_count(key_list=key_list1) + self.assertEqual(count_ts2, 882) + + # Test case: Only original timeseries DB keys for Aug-27 dataset + key_list2=["background/location", "background/filtered_location"] + count_ts3 = ts2_aug_27.find_entries_count(key_list=key_list2) + self.assertEqual(count_ts3, 882) + + # Test case: Only analysis timeseries DB keys + key_list3=["analysis/confirmed_trip"] + count_ts4 = ts2_aug_27.find_entries_count(key_list=key_list3) + self.assertEqual(count_ts4, 0) + + # Test case: Empty key_list which should return total count of all documents in the two DBs + key_list4=[] + count_ts5 = ts1_aug_21.find_entries_count(key_list=key_list4) + self.assertEqual(count_ts5, 2125) + + # Test case: Invalid or unmatched key in metadata field + key_list5=["randomxyz_123test"] + with self.assertRaises(KeyError) as ke: + count_ts6 = ts1_aug_21.find_entries_count(key_list=key_list5) + self.assertEqual(str(ke.exception), "'randomxyz_123test'") + + # Test case: Aggregate timeseries DB User data passed as input with non-empty key_list + ts_agg = esta.TimeSeries.get_aggregate_time_series() + count_ts7 = ts_agg.find_entries_count(key_list=key_list1) + self.assertEqual(count_ts7, 2128) + + # Test case: Aggregate timeseries DB User data passed as input with empty key_list + try: + ts_agg = esta.TimeSeries.get_aggregate_time_series() + count_ts8 = ts_agg.find_entries_count(key_list=key_list4) + self.assertEqual(count_ts8, 3607) + except AssertionError as e: + print(f"Assertion failed for 3607...") + for ct in count_ts8: + cte = ecwe.Entry(ct) + print(f"CTE = ") + print(cte.user_id) + print(cte.metadata.key) + print(cte) + print("=== Trip:", cte.data.start_loc, "->", cte.data.end_loc) + + # Test case: New User created with no data to check + self.testEmail = None + self.testUUID2 = self.testUUID + etc.createAndFillUUID(self) + ts_new_user = esta.TimeSeries.get_time_series(self.testUUID) + count_ts9 = ts_new_user.find_entries_count(key_list=key_list1) + self.assertEqual(count_ts9, 0) + + print("Assert Test for Count Data successful!") + + if __name__ == '__main__': import emission.tests.common as etc etc.configLogging()