Skip to content

Commit

Permalink
Merge pull request #935 from MukuFlash03/master
Browse files Browse the repository at this point in the history
Added count_documents() implementation to builtin_timeseries
  • Loading branch information
shankari authored Sep 13, 2023
2 parents 94e7478 + ac619ba commit f976ab2
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 0 deletions.
36 changes: 36 additions & 0 deletions emission/storage/timeseries/builtin_timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,39 @@ def update_data(user_id, key, obj_id, data):
logging.debug("updating entry %s into timeseries" % new_entry)
edb.save(ts.get_timeseries_db(key), new_entry)

def find_entries_count(self, key_list = None, time_query = None, geo_query = None, extra_query_list = None):
"""
Returns the total number of documents for the given key_list referring to each of the two timeseries db.
Input: Key list with keys from both timeseries DBs = [key1, key2, key3, key4, ...]
Suppose (key1, key2) are orig_tsdb keys and (key3, key4) are analysis_tsdb keys
Output: total_count = orig_tsdb_count + analysis_tsdb_count
Orig_tsdb_count and Analysis_tsdb_count are lists containing counts of matching documents
for each key considered separately for the specific timeseries DB.
:param key_list: list of metadata keys we are querying for.
:param time_query: the time range in which to search the stream
:param geo_query: the query for a geographical area
:param extra_query_list: any additional queries to filter out data
For key_list = None or empty, total count of all documents are returned considering the matching entries from entire dataset.
"""
print("builtin_timeseries.find_entries_count() called")

orig_tsdb = self.timeseries_db
analysis_tsdb = self.analysis_timeseries_db

if key_list == []:
key_list = None

# Segregate orig_tsdb and analysis_tsdb keys so as to fetch counts on each dataset
(orig_tsdb_keys, analysis_tsdb_keys) = self._split_key_list(key_list)

orig_tsdb_count = self._get_entries_for_timeseries(orig_tsdb, orig_tsdb_keys, time_query, geo_query, extra_query_list, None)[0]
analysis_tsdb_count = self._get_entries_for_timeseries(analysis_tsdb, analysis_tsdb_keys, time_query, geo_query, extra_query_list, None)[0]

total_matching_count = orig_tsdb_count + analysis_tsdb_count
return total_matching_count


7 changes: 7 additions & 0 deletions emission/tests/storageTests/TestAnalysisTimeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ def setUp(self):
edb.get_analysis_timeseries_db().delete_many({'user_id': self.testUserId})
self.test_trip_id = "test_trip_id"

def tearDown(self):
self.clearRelatedDb()

def clearRelatedDb(self):
edb.get_timeseries_db().delete_many({"user_id": self.testUserId})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUserId})

def testCreateNew(self):
etsa.createNewTripLike(self, esda.RAW_TRIP_KEY, ecwrt.Rawtrip)
etsa.createNewPlaceLike(self, esda.RAW_PLACE_KEY, ecwrp.Rawplace)
Expand Down
7 changes: 7 additions & 0 deletions emission/tests/storageTests/TestPlaceQueries.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ def setUp(self):
self.testUserId = uuid.uuid3(uuid.NAMESPACE_URL, "mailto:[email protected]")
edb.get_analysis_timeseries_db().delete_many({'user_id': self.testUserId})

def tearDown(self):
self.clearRelatedDb()

def clearRelatedDb(self):
edb.get_timeseries_db().delete_many({"user_id": self.testUserId})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUserId})

def testGetLastPlace(self):
old_place = ecwrp.Rawplace()
old_place.enter_ts = 5
Expand Down
7 changes: 7 additions & 0 deletions emission/tests/storageTests/TestSectionQueries.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ def setUp(self):
edb.get_analysis_timeseries_db().delete_many({'user_id': self.testUserId})
self.test_trip_id = "test_trip_id"

def tearDown(self):
self.clearRelatedDb()

def clearRelatedDb(self):
edb.get_timeseries_db().delete_many({"user_id": self.testUserId})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUserId})

def testQuerySections(self):
new_section = ecws.Section()
new_section.start_ts = 5
Expand Down
7 changes: 7 additions & 0 deletions emission/tests/storageTests/TestStopQueries.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ def setUp(self):
edb.get_analysis_timeseries_db().delete_many({'user_id': self.testUserId})
self.test_trip_id = "test_trip_id"

def tearDown(self):
self.clearRelatedDb()

def clearRelatedDb(self):
edb.get_timeseries_db().delete_many({"user_id": self.testUserId})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUserId})

def testQueryStops(self):
new_stop = etsa.savePlaceLike(self, esda.RAW_STOP_KEY, ecws.Stop)
new_stop["data"]["trip_id"] = self.test_trip_id
Expand Down
110 changes: 110 additions & 0 deletions emission/tests/storageTests/TestTimeSeries.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import emission.storage.timeseries.aggregate_timeseries as estag

import emission.core.wrapper.localdate as ecwl
import emission.core.wrapper.entry as ecwe

# Test imports
import emission.tests.common as etc
Expand All @@ -38,6 +39,7 @@ def tearDown(self):
edb.get_timeseries_db().delete_many({"user_id": self.testUUID})
edb.get_uuid_db().delete_one({"user_email": "user1"})
edb.get_uuid_db().delete_one({"user_email": "user2"})
edb.get_analysis_timeseries_db().delete_many({"user_id": self.testUUID})

def testGetUUIDList(self):
uuid_list = esta.TimeSeries.get_uuid_list()
Expand Down Expand Up @@ -81,6 +83,114 @@ def testExtraQueries(self):
with self.assertRaises(AttributeError):
list(ts.find_entries(time_query=tq, extra_query_list=[ignored_phones]))

def testFindEntriesCount(self):
'''
Test: Specific keys with other parameters not passed values.
Input: A list of keys from either of the timeseries databases.
- For each dataset: ["background/location", "background/filtered_location", "analysis/confirmed_trip"]
- Testing this with sample dataset: "shankari_2015-aug-21", "shankari_2015-aug-27"
Outputs: Single number representing total count of matching entries.
- For builtin_timeseries: Returns total count of all entries matching the userid.
- For aggregate_timeseries: Returns total count of all entries matching all users.
- Validated using grep count of occurrences for keys: 1) "background/location" 2) "background/filtered_location" 3) "analysis/confirmed_trip"
- Syntax: $ grep -c <key> <dataset>.json
- Sample: $ grep -c "background/location" emission/tests/data/real_examples/shankari_2015-aug-21
- Grep Output Counts For Aug-21 dataset for each key:
1) background/location = 738, 2) background/filtered_location = 508, 3) analysis/confirmed_trip = 0
Hence total count = 738 + 508 + 0 = 1246
- Grep Output Counts For Aug-27 dataset for each key:
1) background/location = 555, 2) background/filtered_location = 327, 3) analysis/confirmed_trip = 0
Hence total count = 555 + 327 + 0 = 882
For Aggregate Timeseries test case:
- Input: []
- Output: 3607
- 3607 = 2125 (UUID1) + 1482 (UUID2)
- Key 1: timeseries [] -> 3607 = 2125 (UUID1) + 1482 (UUID2)
- Key 2: analysis_timeseries [] -> 0 = 0 (UUID1) + 0 (UUID2)
- Hence total count = 3607 + 0 = 3607
- Input: ["background/location", "background/filtered_location", "analysis/confirmed_trip"]
- Output: 2128
- For each of the 3 input keys from key_list1:
- Key 1: "background/location" -> 1293 = 738 (UUID1) + 555 (UUID2)
- Key 2: "background/filtered_location" -> 835 = 508 (UUID1) + 327 (UUID2)
- Key 3: "analysis/confirmed_trip" -> 0 = 0 (UUID1) + 0 (UUID2)
- Hence total count = 1293 + 835 + 0 = 2128
'''

ts1_aug_21 = esta.TimeSeries.get_time_series(self.testUUID1)
ts2_aug_27 = esta.TimeSeries.get_time_series(self.testUUID)

# Test case: Combination of original and analysis timeseries DB keys for Aug-21 dataset
key_list1=["background/location", "background/filtered_location", "analysis/confirmed_trip"]
count_ts1 = ts1_aug_21.find_entries_count(key_list=key_list1)
self.assertEqual(count_ts1, 1246)

# Test case: Combination of original and analysis timeseries DB keys for Aug-27 dataset
key_list1=["background/location", "background/filtered_location", "analysis/confirmed_trip"]
count_ts2 = ts2_aug_27.find_entries_count(key_list=key_list1)
self.assertEqual(count_ts2, 882)

# Test case: Only original timeseries DB keys for Aug-27 dataset
key_list2=["background/location", "background/filtered_location"]
count_ts3 = ts2_aug_27.find_entries_count(key_list=key_list2)
self.assertEqual(count_ts3, 882)

# Test case: Only analysis timeseries DB keys
key_list3=["analysis/confirmed_trip"]
count_ts4 = ts2_aug_27.find_entries_count(key_list=key_list3)
self.assertEqual(count_ts4, 0)

# Test case: Empty key_list which should return total count of all documents in the two DBs
key_list4=[]
count_ts5 = ts1_aug_21.find_entries_count(key_list=key_list4)
self.assertEqual(count_ts5, 2125)

# Test case: Invalid or unmatched key in metadata field
key_list5=["randomxyz_123test"]
with self.assertRaises(KeyError) as ke:
count_ts6 = ts1_aug_21.find_entries_count(key_list=key_list5)
self.assertEqual(str(ke.exception), "'randomxyz_123test'")

# Test case: Aggregate timeseries DB User data passed as input with non-empty key_list
ts_agg = esta.TimeSeries.get_aggregate_time_series()
count_ts7 = ts_agg.find_entries_count(key_list=key_list1)
self.assertEqual(count_ts7, 2128)

# Test case: Aggregate timeseries DB User data passed as input with empty key_list
try:
ts_agg = esta.TimeSeries.get_aggregate_time_series()
count_ts8 = ts_agg.find_entries_count(key_list=key_list4)
self.assertEqual(count_ts8, 3607)
except AssertionError as e:
print(f"Assertion failed for 3607...")
for ct in count_ts8:
cte = ecwe.Entry(ct)
print(f"CTE = ")
print(cte.user_id)
print(cte.metadata.key)
print(cte)
print("=== Trip:", cte.data.start_loc, "->", cte.data.end_loc)

# Test case: New User created with no data to check
self.testEmail = None
self.testUUID2 = self.testUUID
etc.createAndFillUUID(self)
ts_new_user = esta.TimeSeries.get_time_series(self.testUUID)
count_ts9 = ts_new_user.find_entries_count(key_list=key_list1)
self.assertEqual(count_ts9, 0)

print("Assert Test for Count Data successful!")


if __name__ == '__main__':
import emission.tests.common as etc
etc.configLogging()
Expand Down

0 comments on commit f976ab2

Please sign in to comment.