From 531cb2afb44b780432ea52ca9ff41220bd183c43 Mon Sep 17 00:00:00 2001
From: lindsay stevens <lstevens@getodk.org>
Date: Wed, 31 Jul 2024 17:14:24 +1000
Subject: [PATCH 1/4] add: entities methods create_many, delete, merge

---
 .../create_entities_from_submissions.py       |  34 +-
 pyodk/_endpoints/entities.py                  | 299 +++++++++++-
 pyodk/_endpoints/entity_lists.py              |   6 +-
 pyodk/_utils/validators.py                    |   7 +
 tests/endpoints/test_entities.py              | 461 ++++++++++++++++++
 tests/test_client.py                          | 106 ++++
 6 files changed, 897 insertions(+), 16 deletions(-)

diff --git a/docs/examples/create_entities_from_submissions/create_entities_from_submissions.py b/docs/examples/create_entities_from_submissions/create_entities_from_submissions.py
index 6421576..4130c68 100644
--- a/docs/examples/create_entities_from_submissions/create_entities_from_submissions.py
+++ b/docs/examples/create_entities_from_submissions/create_entities_from_submissions.py
@@ -2,19 +2,20 @@
 A script that uses CSV data to create an entity list and populate it with entities.
 """
 
-import csv
+from csv import DictReader
 from pathlib import Path
 from uuid import uuid4
 
 from pyodk import Client
 
-if __name__ == "__main__":
-    project_id = 1
-    entity_list_name = f"previous_survey_{uuid4()}"
-    entity_label_field = "first_name"
-    entity_properties = ("age", "location")
-    csv_path = Path("./imported_answers.csv")
+project_id = 1
+entity_list_name = f"previous_survey_{uuid4()}"
+entity_label_field = "first_name"
+entity_properties = ("age", "location")
+csv_path = Path("./imported_answers.csv")
+
 
+def create_one_at_a_time():
     with Client(project_id=project_id) as client, open(csv_path) as csv_file:
         # Create the entity list.
         client.entity_lists.create(entity_list_name=entity_list_name)
@@ -22,9 +23,26 @@
             client.entity_lists.add_property(name=prop, entity_list_name=entity_list_name)
 
         # Create the entities from the CSV data.
-        for row in csv.DictReader(csv_file):
+        for row in DictReader(csv_file):
             client.entities.create(
                 label=row[entity_label_field],
                 data={k: str(v) for k, v in row.items() if k in entity_properties},
                 entity_list_name=entity_list_name,
             )
+
+
+def create_with_merge():
+    with Client(project_id=project_id) as client, open(csv_path) as csv_file:
+        client.entity_lists.default_entity_list_name = client.session.get_xform_uuid()
+        entity_list = client.entity_lists.create()
+        client.entities.merge(
+            source_data=list(DictReader(csv_file)),
+            entity_list_name=entity_list.name,
+            source_label_key=entity_label_field,
+            source_keys=entity_properties,
+        )
+
+
+if __name__ == "__main__":
+    # create_one_at_a_time()
+    create_with_merge()
diff --git a/pyodk/_endpoints/entities.py b/pyodk/_endpoints/entities.py
index 31074ed..3a10c31 100644
--- a/pyodk/_endpoints/entities.py
+++ b/pyodk/_endpoints/entities.py
@@ -1,13 +1,45 @@
 import logging
+from collections.abc import Iterable, Mapping
+from dataclasses import dataclass, field
 from datetime import datetime
+from typing import Any
 from uuid import uuid4
 
 from pyodk._endpoints import bases
+from pyodk._endpoints.entity_list_properties import EntityListPropertyService
 from pyodk._utils import validators as pv
 from pyodk._utils.session import Session
 from pyodk.errors import PyODKError
 
 log = logging.getLogger(__name__)
+SENTINEL = object()
+
+
+@dataclass
+class MergeActions:
+    """Return type for EntityService._prep_data_for_merge / merge"""
+
+    match_keys: list
+    to_insert: dict = field(default_factory=dict)
+    to_update: dict = field(default_factory=dict)
+    to_delete: dict = field(default_factory=dict)
+    source_keys: set = field(default_factory=set)
+    target_keys: set = field(default_factory=set)
+    reserved_keys: frozenset = frozenset({"__id", "__system", "label"})
+    # Set by "merge" function according to the "add_new_properties" parameter.
+    final_keys: set = field(default_factory=set)
+
+    @property
+    def keys_difference(self) -> set:
+        return (self.source_keys - self.target_keys) - self.reserved_keys
+
+    @property
+    def keys_intersect(self) -> set:
+        return (self.source_keys & self.target_keys) - self.reserved_keys
+
+    @property
+    def keys_union(self) -> set:
+        return (self.source_keys | self.target_keys) - self.reserved_keys
 
 
 class CurrentVersion(bases.Model):
@@ -41,6 +73,7 @@ class Config:
     list: str = _entities
     post: str = _entities
     patch: str = f"{_entities}/{{entity_id}}"
+    delete: str = patch
     get_table: str = f"{_entity_name}.svc/Entities"
 
 
@@ -116,7 +149,7 @@ def create(
         :param label: Label of the Entity.
         :param data: Data to store for the Entity.
         :param entity_list_name: The name of the Entity List (Dataset) being referenced.
-        :param project_id: The id of the project this form belongs to.
+        :param project_id: The id of the project this Entity belongs to.
         :param uuid: An optional unique identifier for the Entity. If not provided then
           a uuid will be generated and sent by the client.
         """
@@ -144,6 +177,49 @@ def create(
         data = response.json()
         return Entity(**data)
 
+    def create_many(
+        self,
+        data: dict,
+        entity_list_name: str | None = None,
+        project_id: int | None = None,
+    ) -> Entity:
+        """
+        Create one or more Entities in a single request.
+
+        Required keys in data: entities[].label, entities[].data, source.name.
+        Example of the required data format:
+
+        {
+            "entities": [
+                {"label": "Sydney", "data": {"state": "NSW", "postcode": "2000"}},
+                {"label": "Melbourne", "data": {"state": "VIC", "postcode": "3000"}},
+            ],
+            "source": {"name": "pyodk", "size": 1},
+        }
+
+        :param data: Data to store for the Entities.
+        :param entity_list_name: The name of the Entity List (Dataset) being referenced.
+        :param project_id: The id of the project this Entity belongs to.
+        """
+        try:
+            pid = pv.validate_project_id(project_id, self.default_project_id)
+            eln = pv.validate_entity_list_name(
+                entity_list_name, self.default_entity_list_name
+            )
+            data = pv.validate_is_instance(data, typ=Iterable, key="data")
+        except PyODKError as err:
+            log.error(err, exc_info=True)
+            raise
+
+        response = self.session.response_or_error(
+            method="POST",
+            url=self.session.urlformat(self.urls.post, project_id=pid, el_name=eln),
+            logger=log,
+            json=data,
+        )
+        data = response.json()
+        return data["success"]
+
     def update(
         self,
         uuid: str,
@@ -165,16 +241,15 @@ def update(
         :param base_version: The expected current version of the Entity on the server. If
           `force` is not True, then `base_version` must be specified.
         :param entity_list_name: The name of the Entity List (Dataset) being referenced.
-        :param project_id: The id of the project this form belongs to.
+        :param project_id: The id of the project this Entity belongs to.
         """
         try:
             pid = pv.validate_project_id(project_id, self.default_project_id)
             eln = pv.validate_entity_list_name(
                 entity_list_name, self.default_entity_list_name
             )
-            params = {
-                "uuid": pv.validate_str(uuid, key="uuid"),
-            }
+            eid = pv.validate_str(uuid, key="uuid")
+            params = {}
             if force is not None:
                 params["force"] = pv.validate_bool(force, key="force")
             if base_version is not None:
@@ -193,7 +268,7 @@ def update(
         response = self.session.response_or_error(
             method="PATCH",
             url=self.session.urlformat(
-                self.urls.patch, project_id=pid, el_name=eln, entity_id=uuid
+                self.urls.patch, project_id=pid, el_name=eln, entity_id=eid
             ),
             logger=log,
             params=params,
@@ -202,6 +277,39 @@ def update(
         data = response.json()
         return Entity(**data)
 
+    def delete(
+        self,
+        uuid: str,
+        entity_list_name: str | None = None,
+        project_id: int | None = None,
+    ) -> bool:
+        """
+        Delete an Entity.
+
+        :param uuid: The unique identifier for the Entity.
+        :param entity_list_name: The name of the Entity List (Dataset) being referenced.
+        :param project_id: The id of the project this Entity belongs to.
+        """
+        try:
+            pid = pv.validate_project_id(project_id, self.default_project_id)
+            eln = pv.validate_entity_list_name(
+                entity_list_name, self.default_entity_list_name
+            )
+            eid = pv.validate_str(uuid, key="uuid")
+        except PyODKError as err:
+            log.error(err, exc_info=True)
+            raise
+
+        response = self.session.response_or_error(
+            method="DELETE",
+            url=self.session.urlformat(
+                self.urls.delete, project_id=pid, el_name=eln, entity_id=eid
+            ),
+            logger=log,
+        )
+        data = response.json()
+        return data["success"]
+
     def get_table(
         self,
         entity_list_name: str | None = None,
@@ -216,7 +324,7 @@ def get_table(
         Read Entity List data.
 
         :param entity_list_name: The name of the Entity List (Dataset) being referenced.
-        :param project_id: The id of the project this form belongs to.
+        :param project_id: The id of the project this Entity belongs to.
         :param skip: The first n rows will be omitted from the results.
         :param top: Only up to n rows will be returned in the results.
         :param count: If True, an @odata.count property will be added to the result to
@@ -258,3 +366,180 @@ def get_table(
             params=params,
         )
         return response.json()
+
+    @staticmethod
+    def _prep_data_for_merge(
+        source_data: Iterable[Mapping[str, Any]],
+        target_data: Iterable[Mapping[str, Any]],
+        match_keys: Iterable[str] | None = None,
+        source_label_key: str = "label",
+        source_keys: Iterable[str] | None = None,
+    ) -> MergeActions:
+        """
+        Compare source and target data to identify rows to insert, update, or delete.
+
+        :param source_data: Incoming data to be sent to the target database.
+        :param target_data: Existing data from the target database.
+        :param match_keys: Dictionary keys common to source and target used to match rows.
+        :param source_label_key: The key in the source data to use as the label.
+        :param source_keys: If provided, process only these keys in the source data.
+        """
+        default_key = "label"
+        if source_label_key is None:
+            source_label_key = default_key
+        if match_keys is None:
+            match_keys = (default_key,)
+        match_keys_sorted = sorted(match_keys)
+
+        if source_keys is not None and source_label_key not in source_keys:
+            raise PyODKError(
+                "Parameter 'source_keys' must include \"label\" or the "
+                "'source_label_key' parameter value"
+            )
+
+        def get_key(entity: Mapping[str, Any], keys: list) -> tuple:
+            try:
+                return tuple(entity[i] for i in keys)
+            except KeyError as e:
+                raise PyODKError(
+                    f"Found Entity that did not have all expected match_keys: {e}"
+                ) from e
+
+        result = MergeActions(match_keys=match_keys_sorted)
+        # Dict conversion uses memory, but original list of dict has worst case O(n*m).
+        src = {}
+        source_data_len = 0  # Not using len() since it might not be a collection.
+        for s in source_data:
+            row = {default_key: s[source_label_key]}
+            if source_keys is None:
+                row.update({k: s[k] for k in s if k != source_label_key})
+            else:
+                row.update(
+                    {k: s[k] for k in s if k != source_label_key and k in source_keys}
+                )
+            src[get_key(row, match_keys_sorted)] = row
+            result.source_keys.update(row.keys())
+            source_data_len += 1
+
+        if len(src) != source_data_len:
+            raise PyODKError(
+                "Parameter 'match_keys' not unique across all 'source_data'."
+            )
+
+        for t in target_data:
+            key = get_key(t, match_keys_sorted)
+            result.target_keys.update(t.keys())
+            match = src.pop(key, None)
+            if match is None:
+                result.to_delete[key] = t
+            else:
+                for_update = False
+                new_data = {}
+                for k, v in t.items():
+                    # Add all the ID fields from the target data.
+                    if k in result.reserved_keys:
+                        new_data[k] = v
+                        continue
+                    # Ignore values where source has no key (nothing to update).
+                    # Uses sentinel differentiate None as a value, without a keys check.
+                    new_value = match.get(k, SENTINEL)
+                    if new_value is SENTINEL:
+                        continue
+                    # Add the source value if it is different.
+                    # Entity values are stored in Central as strings.
+                    if str(new_value) != v:
+                        new_data[k] = new_value
+                        for_update = True
+                for k, v in match.items():
+                    # Add values for any new keys not in the target.
+                    if k not in t:
+                        new_data[k] = v
+                        for_update = True
+                if for_update:
+                    result.to_update[key] = new_data
+
+        result.to_insert = src
+        return result
+
+    def merge(
+        self,
+        source_data: Iterable[Mapping[str, Any]],
+        entity_list_name: str | None = None,
+        project_id: int | None = None,
+        match_keys: Iterable[str] | None = None,
+        add_new_properties: bool = True,
+        delete_not_matched: bool = False,
+        source_label_key: str = "label",
+        source_keys: Iterable[str] | None = None,
+        create_source: str = "pyodk",
+    ) -> MergeActions:
+        """
+        Update Entities in Central based on the provided source data.
+
+        1. Create Entities in the source data that don't exist in Central.
+        2. Update Entities in Central that match the source data.
+        3. Optionally, delete any Entities in Central that aren't in the source data.
+
+        :param source_data: Data to use for updating Entities in Central.
+        :param entity_list_name: The name of the Entity List (Dataset) being referenced.
+        :param project_id: The id of the project this Entity belongs to.
+        :param match_keys: Dictionary keys common to source and target used to match rows.
+          Defaults to ("label",). If a custom source_label_key is provided, specify that
+          key as "label", because it is translated to "label" for matching.
+        :param add_new_properties: If True, add any Entity List properties from the
+          source data that aren't in Central.
+        :param delete_not_matched: If True, delete any Entities in Central that aren't
+          in the source data.
+        :param source_label_key: The key in the source data to use as the label. The
+          target label key is always "label" because this key is required by Central.
+        :param source_keys: If provided, process only these keys in the source data.
+        :param create_source: When creating Entities in bulk, this value is used to
+          capture the source of the change in Central.
+        """
+        pid = pv.validate_project_id(project_id, self.default_project_id)
+        eln = pv.validate_entity_list_name(
+            entity_list_name, self.default_entity_list_name
+        )
+        target_data = self.get_table(entity_list_name=entity_list_name)["value"]
+        merge_actions = self._prep_data_for_merge(
+            source_data=source_data,
+            target_data=target_data,
+            match_keys=match_keys,
+            source_label_key=source_label_key,
+            source_keys=source_keys,
+        )
+        if add_new_properties:
+            elps = EntityListPropertyService(
+                session=self.session,
+                default_project_id=pid,
+                default_entity_list_name=eln,
+            )
+            for k in merge_actions.keys_difference:
+                elps.create(name=k)
+            merge_actions.final_keys = merge_actions.keys_union
+        else:
+            merge_actions.final_keys = merge_actions.keys_intersect
+        if len(merge_actions.to_insert) > 0:
+            insert_reshape = [
+                {
+                    "label": i["label"],
+                    "data": {k: i.get(k) for k in i if k in merge_actions.final_keys},
+                }
+                for i in merge_actions.to_insert.values()
+            ]
+            self.create_many(
+                data={"entities": insert_reshape, "source": {"name": create_source}},
+                entity_list_name=eln,
+            )
+        for u in merge_actions.to_update.values():
+            self.update(
+                uuid=u["__id"],
+                entity_list_name=eln,
+                label=u["label"],
+                data={k: u.get(k) for k in u if k in merge_actions.final_keys},
+                base_version=u["__system"]["version"],
+            )
+        if delete_not_matched:
+            for d in merge_actions.to_delete.values():
+                self.delete(uuid=d["__id"], entity_list_name=eln)
+        return merge_actions
diff --git a/pyodk/_endpoints/entity_lists.py b/pyodk/_endpoints/entity_lists.py
index cb94f36..ee478c9 100644
--- a/pyodk/_endpoints/entity_lists.py
+++ b/pyodk/_endpoints/entity_lists.py
@@ -67,7 +67,11 @@ def __init__(
     ):
         self.urls: URLs = urls if urls is not None else URLs()
         self.session: Session = session
-        self._property_service = EntityListPropertyService(session=self.session)
+        self._property_service = EntityListPropertyService(
+            session=self.session,
+            default_project_id=default_project_id,
+            default_entity_list_name=default_entity_list_name,
+        )
         self.add_property = self._property_service.create
 
         self._default_project_id: int | None = None
diff --git a/pyodk/_utils/validators.py b/pyodk/_utils/validators.py
index 77ad52c..666999b 100644
--- a/pyodk/_utils/validators.py
+++ b/pyodk/_utils/validators.py
@@ -104,3 +104,10 @@ def validate_fp(f):
         return v.path_exists_validator(p)
 
     return wrap_error(validator=validate_fp, key=key, value=coalesce(*args))
+
+
+def validate_is_instance(*args: Any, typ: Any, key: str):
+    val = coalesce(*args)
+    if not isinstance(val, typ):
+        raise PyODKError(f"{key}: Unexpected type. Expected '{typ}'.")
+    return val
diff --git a/tests/endpoints/test_entities.py b/tests/endpoints/test_entities.py
index f486ea9..119471e 100644
--- a/tests/endpoints/test_entities.py
+++ b/tests/endpoints/test_entities.py
@@ -1,7 +1,10 @@
+from csv import DictReader
+from io import StringIO
 from unittest import TestCase
 from unittest.mock import MagicMock, patch
 
 from pyodk._endpoints.entities import Entity
+from pyodk._endpoints.entities import EntityService as es
 from pyodk._utils.session import Session
 from pyodk.client import Client
 from pyodk.errors import PyODKError
@@ -116,3 +119,461 @@ def test_update__raise_if_invalid_force_or_base_version(self):
                         "Must specify one of 'force' or 'base_version'.",
                         err.exception.args[0],
                     )
+
+
+class TestPrepDataForMerge(TestCase):
+    def test_noop__source_same_as_target(self):
+        """Should identify no rows for insert/update/delete"""
+        source = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+        ]
+        target = source
+        observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual({}, observed.to_insert, observed.to_insert)
+        self.assertEqual({}, observed.to_update, observed.to_update)
+        self.assertEqual({}, observed.to_delete, observed.to_delete)
+        self.assertEqual({"label", "state", "postcode"}, observed.source_keys)
+        self.assertEqual({"label", "state", "postcode"}, observed.target_keys)
+
+    def test_noop__source_has_no_value_for_key(self):
+        """Should identify no rows for insert/update/delete"""
+        source = [
+            {"label": "Sydney", "state": "NSW"},
+        ]
+        target = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+        ]
+        observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual({}, observed.to_insert, observed.to_insert)
+        self.assertEqual({}, observed.to_update, observed.to_update)
+        self.assertEqual({}, observed.to_delete, observed.to_delete)
+        self.assertEqual({"label", "state"}, observed.source_keys)
+        self.assertEqual({"label", "state", "postcode"}, observed.target_keys)
+
+    def test_to_insert__source_has_new_row__empty(self):
+        """Should identify row to_insert only."""
+        source = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+        ]
+        target = []
+        observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual(
+            source[0]["label"],
+            next(iter(observed.to_insert.keys()))[0],
+            observed.to_insert,
+        )
+        self.assertEqual({}, observed.to_update, observed.to_update)
+        self.assertEqual({}, observed.to_delete, observed.to_delete)
+        self.assertEqual({"label", "state", "postcode"}, observed.source_keys)
+        self.assertEqual(set(), observed.target_keys)
+
+    def test_to_insert__source_has_new_row__existing(self):
+        """Should identify row to_insert only."""
+        source = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+            {"label": "Brisbane", "state": "QLD", "postcode": "4000"},
+        ]
+        target = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+        ]
+        observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual(
+            source[1]["label"],
+            next(iter(observed.to_insert.keys()))[0],
+            observed.to_insert,
+        )
+        self.assertEqual({}, observed.to_update, observed.to_update)
+        self.assertEqual({}, observed.to_delete, observed.to_delete)
+        self.assertEqual({"label", "state", "postcode"}, observed.source_keys)
+        self.assertEqual({"label", "state", "postcode"}, observed.target_keys)
+
+    def test_to_delete__target_has_extra_row__empty(self):
+        """Should identify row to_delete only."""
+        source = []
+        target = [
+            {"label": "Sydney", "state": "VIC", "postcode": "2000"},
+        ]
+        observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual({}, observed.to_insert, observed.to_insert)
+        self.assertEqual({}, observed.to_update, observed.to_update)
+        self.assertEqual(
+            target[0]["label"],
+            next(iter(observed.to_delete.keys()))[0],
+            observed.to_delete,
+        )
+        self.assertEqual(set(), observed.source_keys)
+        self.assertEqual({"label", "state", "postcode"}, observed.target_keys)
+
+    def test_to_delete__target_has_extra_row__existing(self):
+        """Should identify row to_delete only."""
+        source = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+        ]
+        target = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+            {"label": "Brisbane", "state": "QLD", "postcode": "4000"},
+        ]
+        observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual({}, observed.to_insert, observed.to_insert)
+        self.assertEqual({}, observed.to_update, observed.to_update)
+        self.assertEqual(
+            target[1]["label"],
+            next(iter(observed.to_delete.keys()))[0],
+            observed.to_delete,
+        )
+        self.assertEqual({"label", "state", "postcode"}, observed.source_keys)
+        self.assertEqual({"label", "state", "postcode"}, observed.target_keys)
+
+    def test_to_update__source_value_changed__from_existing(self):
+        """Should identify row to_update only."""
+        source = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+        ]
+        target = [
+            {"label": "Sydney", "state": "NSW", "postcode": "3000"},
+        ]
+        observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual({}, observed.to_insert, observed.to_insert)
+        self.assertEqual(
+            source[0]["label"],
+            next(iter(observed.to_update.keys()))[0],
+            observed.to_update,
+        )
+        self.assertEqual({}, observed.to_delete, observed.to_delete)
+        self.assertEqual({"label", "state", "postcode"}, observed.source_keys)
+        self.assertEqual({"label", "state", "postcode"}, observed.target_keys)
+
+    def test_to_update__source_value_changed__from_none(self):
+        """Should identify row to_update only."""
+        source = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+        ]
+        target = [
+            {"label": "Sydney", "state": "NSW", "postcode": None},
+        ]
+        observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual({}, observed.to_insert, observed.to_insert)
+        self.assertEqual(
+            source[0]["label"],
+            next(iter(observed.to_update.keys()))[0],
+            observed.to_update,
+        )
+        self.assertEqual({}, observed.to_delete, observed.to_delete)
+        self.assertEqual({"label", "state", "postcode"}, observed.source_keys)
+        self.assertEqual({"label", "state", "postcode"}, observed.target_keys)
+
+    def test_to_update__source_value_changed__to_none(self):
+        """Should identify row to_update only."""
+        source = [
+            {"label": "Sydney", "state": "NSW", "postcode": None},
+        ]
+        target = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+        ]
+        observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual({}, observed.to_insert, observed.to_insert)
+        self.assertEqual(
+            source[0]["label"],
+            next(iter(observed.to_update.keys()))[0],
+            observed.to_update,
+        )
+        self.assertEqual({}, observed.to_delete, observed.to_delete)
+        self.assertEqual({"label", "state", "postcode"}, observed.source_keys)
+        self.assertEqual({"label", "state", "postcode"}, observed.target_keys)
+
+    def test_to_update__new_source_field(self):
+        """Should identify row to_update only."""
+        source = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+        ]
+        target = [
+            {"label": "Sydney", "state": "NSW"},
+        ]
+        observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual({}, observed.to_insert, observed.to_insert)
+        self.assertEqual(
+            source[0]["label"],
+            next(iter(observed.to_update.keys()))[0],
+            observed.to_update,
+        )
+        self.assertEqual({}, observed.to_delete, observed.to_delete)
+        self.assertEqual({"label", "state", "postcode"}, observed.source_keys)
+        self.assertEqual({"label", "state"}, observed.target_keys)
+
+    def test_to_update__new_source_field__with_other_change(self):
+        """Should identify row to_update only."""
+        source = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+        ]
+        target = [
+            {"label": "Sydney", "state": "QLD"},
+        ]
+        observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual({}, observed.to_insert, observed.to_insert)
+        self.assertEqual(
+            source[0]["label"],
+            next(iter(observed.to_update.keys()))[0],
+            observed.to_update,
+        )
+        self.assertEqual({}, observed.to_delete, observed.to_delete)
+        self.assertEqual({"label", "state", "postcode"}, observed.source_keys)
+        self.assertEqual({"label", "state"}, observed.target_keys)
+
+    def test_to_update__new_source_field__with_no_old_data(self):
+        """Should identify row to_update only."""
+        source = [
+            {"label": "Sydney", "postcode": "2000"},
+        ]
+        target = [
+            {"label": "Sydney", "state": "NSW"},
+        ]
+        observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual({}, observed.to_insert, observed.to_insert)
+        self.assertEqual(
+            source[0]["label"],
+            next(iter(observed.to_update.keys()))[0],
+            observed.to_update,
+        )
+        self.assertEqual({}, observed.to_delete, observed.to_delete)
+        self.assertEqual({"label", "postcode"}, observed.source_keys)
+        self.assertEqual({"label", "state"}, observed.target_keys)
+
+    def test_merge__all_ops(self):
+        """Should identify a row for each op type at the same time."""
+        source = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},  # update
+            {"label": "Brisbane", "state": "QLD", "postcode": "4000"},  # insert
+            {"label": "Melbourne", "state": "VIC"},  # noop
+        ]
+        target = [
+            {"label": "Sydney", "state": "VIC"},
+            {"label": "Darwin", "state": "NT"},  # delete
+            {"label": "Melbourne", "state": "VIC"},
+        ]
+        observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual(1, len(observed.to_insert))
+        self.assertEqual(
+            source[1]["label"],
+            next(iter(observed.to_insert.keys()))[0],
+            observed.to_insert,
+        )
+        self.assertEqual(1, len(observed.to_update))
+        self.assertEqual(
+            source[0]["label"],
+            next(iter(observed.to_update.keys()))[0],
+            observed.to_update,
+        )
+        self.assertEqual(1, len(observed.to_delete))
+        self.assertEqual(
+            target[1]["label"],
+            next(iter(observed.to_delete.keys()))[0],
+            observed.to_delete,
+        )
+        self.assertEqual({"label", "state", "postcode"}, observed.source_keys)
+        self.assertEqual({"label", "state"}, observed.target_keys)
+
+    def test_merge__all_ops__alternative_source_label_key(self):
+        """Should identify a row for each op type at the same time."""
+        source = [
+            {"city": "Sydney", "state": "NSW", "postcode": "2000"},  # update
+            {"city": "Brisbane", "state": "QLD", "postcode": "4000"},  # insert
+            {"city": "Melbourne", "state": "VIC"},  # noop
+        ]
+        target = [
+            {"label": "Sydney", "state": "VIC"},
+            {"label": "Darwin", "state": "NT"},  # delete
+            {"label": "Melbourne", "state": "VIC"},
+        ]
+        observed = es._prep_data_for_merge(
+            source_data=source, target_data=target, source_label_key="city"
+        )
+        self.assertEqual(1, len(observed.to_insert))
+        self.assertEqual(
+            source[1]["city"],
+            next(iter(observed.to_insert.keys()))[0],
+            observed.to_insert,
+        )
+        self.assertEqual(1, len(observed.to_update))
+        self.assertEqual(
+            source[0]["city"],
+            next(iter(observed.to_update.keys()))[0],
+            observed.to_update,
+        )
+        self.assertEqual(1, len(observed.to_delete))
+        self.assertEqual(
+            target[1]["label"],
+            next(iter(observed.to_delete.keys()))[0],
+            observed.to_delete,
+        )
+        self.assertEqual({"label", "state", "postcode"}, observed.source_keys)
+        self.assertEqual({"label", "state"}, observed.target_keys)
+
+    def test_merge__all_ops__source_data_not_strings(self):
+        """Should identify a row for each op type at the same time."""
+        source = [
+            {"label": "Sydney", "postcode": 2000},  # update
+            {"label": "Brisbane", "postcode": 4000},  # insert
+            {"label": "Melbourne", "postcode": 3000},  # noop
+        ]
+        target = [
+            {"label": "Sydney", "postcode": "3000"},
+            {"label": "Darwin", "postcode": "4000"},  # delete
+            {"label": "Melbourne", "postcode": "3000"},
+        ]
+        observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual(1, len(observed.to_insert))
+        self.assertEqual(
+            source[1]["label"],
+            next(iter(observed.to_insert.keys()))[0],
+            observed.to_insert,
+        )
+        self.assertEqual(1, len(observed.to_update))
+        self.assertEqual(
+            source[0]["label"],
+            next(iter(observed.to_update.keys()))[0],
+            observed.to_update,
+        )
+        self.assertEqual(1, len(observed.to_delete))
+        self.assertEqual(
+            target[1]["label"],
+            next(iter(observed.to_delete.keys()))[0],
+            observed.to_delete,
+        )
+        self.assertEqual({"label", "postcode"}, observed.source_keys)
+        self.assertEqual({"label", "postcode"}, observed.target_keys)
+
+    def test_merge__all_ops__match_keys_not_including_label(self):
+        """Should identify a row for each op type at the same time."""
+        source = [
+            {"label": "Sydney", "id": "2", "state": "NSW", "postcode": "2000"},  # update
+            {
+                "label": "Brisbane",
+                "id": "4",
+                "state": "QLD",
+                "postcode": "4000",
+            },  # insert
+            {"label": "Melbourne", "id": "3", "state": "VIC"},  # noop
+        ]
+        target = [
+            {"label": "Sydney", "id": "2", "state": "VIC"},
+            {"label": "Darwin", "id": "1", "state": "NT"},  # delete
+            {"label": "Melbourne", "id": "3", "state": "VIC"},
+        ]
+        observed = es._prep_data_for_merge(
+            source_data=source, target_data=target, match_keys=("id",)
+        )
+        self.assertEqual(1, len(observed.to_insert))
+        self.assertEqual(
+            source[1]["id"],
+            next(iter(observed.to_insert.keys()))[0],
+            observed.to_insert,
+        )
+        self.assertEqual(1, len(observed.to_update))
+        self.assertEqual(
+            source[0]["id"],
+            next(iter(observed.to_update.keys()))[0],
+            observed.to_update,
+        )
+        self.assertEqual(1, len(observed.to_delete))
+        self.assertEqual(
+            target[1]["id"],
+            next(iter(observed.to_delete.keys()))[0],
+            observed.to_delete,
+        )
+        self.assertEqual({"id", "label", "postcode", "state"}, observed.source_keys)
+        self.assertEqual({"id", "label", "state"}, observed.target_keys)
+
+    def test_source_has_duplicate_match_key(self):
+        """Should detect duplicate rows in source."""
+        source = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+        ]
+        target = []
+        with self.assertRaises(PyODKError) as err:
+            es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertEqual(
+            "Parameter 'match_keys' not unique across all 'source_data'.",
+            err.exception.args[0],
+        )
+
+    def test_source_has_row_missing_match_key(self):
+        """Should detect rows in source missing a match key."""
+        source = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+            {"label": "Brisbane", "postcode": "4000"},
+        ]
+        target = []
+        with self.assertRaises(PyODKError) as err:
+            es._prep_data_for_merge(
+                source_data=source, target_data=target, match_keys={"label", "state"}
+            )
+        self.assertEqual(
+            "Found Entity that did not have all expected match_keys: 'state'",
+            err.exception.args[0],
+        )
+
+    def test_source_keys_limits_columns_of_interest(self):
+        """Should only process source_keys if specified."""
+        source = [
+            {"city": "Sydney", "state": "NSW", "postcode": "2000"},
+            {"city": "Brisbane", "state": "QLD", "postcode": "4000"},
+            {"city": "Hobart"},
+        ]
+        target = [
+            {"label": "Sydney", "state": "VIC", "postcode": "3000"},
+            {"label": "Brisbane", "state": "QLD"},
+            {"label": "Hobart"},
+        ]
+        observed = es._prep_data_for_merge(
+            source_data=source,
+            target_data=target,
+            source_label_key="city",
+            source_keys={"city", "state"},
+        )
+        # "city" is translated to "label", "postcode" is ignored, "state" is updated.
+        self.assertEqual(1, len(observed.to_update))
+        self.assertEqual(
+            {"label": "Sydney", "state": "NSW"},
+            next(iter(observed.to_update.values())),
+            observed.to_update,
+        )
+        self.assertEqual(0, len(observed.to_insert))
+        self.assertEqual(0, len(observed.to_delete))
+        self.assertEqual(["label"], observed.match_keys)
+
+    def test_source_keys_does_not_include_label_or_source_label_key(self):
+        """Should raise an error if the source column specifications don't make sense."""
+        source = [
+            {"city": "Sydney", "state": "NSW", "postcode": "2000"},
+            {"city": "Brisbane", "postcode": "4000"},
+        ]
+        target = []
+        with self.assertRaises(PyODKError) as err:
+            es._prep_data_for_merge(
+                source_data=source,
+                target_data=target,
+                source_label_key="city",
+                source_keys={"state", "postcode"},
+            )
+        self.assertEqual(
+            "Parameter 'source_keys' must include \"label\" or the "
+            "'source_label_key' parameter value",
+            err.exception.args[0],
+        )
+
+    def test_csv_as_source_data(self):
+        """Should be able to pass in CSV DictReader as a the source_data."""
+        csv = """label,state,postcode\nSydney,NSW,2000\nBrisbane,QLD,4000"""
+        target = [{"label": "Brisbane", "state": "QLD", "postcode": "4000"}]
+        observed = es._prep_data_for_merge(
+            source_data=list(DictReader(StringIO(csv))),
+            target_data=target,
+        )
+        self.assertEqual(1, len(observed.to_insert))
+        self.assertEqual(
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+            next(iter(observed.to_insert.values())),
+        )
+        self.assertEqual(0, len(observed.to_update))
+        self.assertEqual(0, len(observed.to_delete))
diff --git a/tests/test_client.py b/tests/test_client.py
index a480e3a..f2c7a43 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -295,6 +295,112 @@ def test_entities__update(self):
         )
         self.assertEqual("test_value3", forced.currentVersion.data["test_label"])
 
+    def test_entity__merge__new(self):
+        """Should create a new Entity List, and merge in some new data."""
+        self.client.entity_lists.default_entity_list_name = (
+            self.client.session.get_xform_uuid()
+        )
+        entity_list = self.client.entity_lists.create()
+        self.client.entities.merge(
+            source_data=[
+                {"label": "Sydney", "state": "NSW"},
+                {"label": "Melbourne", "state": "VIC"},
+            ],
+            entity_list_name=entity_list.name,
+        )
+        entity_data = self.client.entities.get_table(entity_list_name=entity_list.name)
+        self.assertEqual(2, len(entity_data["value"]))
+
+    def test_entity__merge__existing__add_props__delete_unmatched(self):
+        """Should create a new Entity List, and merge in some new data."""
+        self.client.entity_lists.default_entity_list_name = (
+            self.client.session.get_xform_uuid()
+        )
+        entity_list = self.client.entity_lists.create()
+        self.client.entity_lists.add_property(
+            name="state", entity_list_name=entity_list.name
+        )
+        self.client.entities.create_many(
+            data={
+                "entities": [
+                    {"label": "Sydney", "data": {"state": "VIC"}},
+                    {"label": "Darwin", "data": {"state": "NT"}},
+                ],
+                "source": {"name": "pyodk"},
+            },
+            entity_list_name=entity_list.name,
+        )
+        # Add postcode property, Add Brisbane, update Sydney, delete Darwin.
+        self.client.entities.merge(
+            source_data=[
+                {"label": "Sydney", "state": "NSW", "postcode": "2001"},
+                {"label": "Brisbane", "state": "QLD", "postcode": "4000"},
+            ],
+            entity_list_name=entity_list.name,
+            add_new_properties=True,
+            delete_not_matched=True,
+        )
+        entity_data = self.client.entities.get_table(entity_list_name=entity_list.name)
+        expected = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2001"},
+            {"label": "Brisbane", "state": "QLD", "postcode": "4000"},
+        ]
+        observed = [
+            {k: o.get(k) for k in ("state", "label", "postcode")}
+            for o in entity_data["value"]
+        ]
+        self.assertTrue(
+            len(expected) == len(observed)
+            and all(e in observed for e in expected)
+            and expected[0].keys() == observed[0].keys(),
+            observed,
+        )
+
+    def test_entity__merge__existing__ignore_props__keep_unmatched(self):
+        """Should create a new Entity List, and merge in some new data."""
+        self.client.entity_lists.default_entity_list_name = (
+            self.client.session.get_xform_uuid()
+        )
+        entity_list = self.client.entity_lists.create()
+        self.client.entity_lists.add_property(
+            name="state", entity_list_name=entity_list.name
+        )
+        self.client.entities.create_many(
+            data={
+                "entities": [
+                    {"label": "Sydney", "data": {"state": "VIC"}},
+                    {"label": "Darwin", "data": {"state": "NT"}},
+                ],
+                "source": {"name": "pyodk"},
+            },
+            entity_list_name=entity_list.name,
+        )
+        # Skip postcode property, add Brisbane, update Sydney, keep Darwin.
+        self.client.entities.merge(
+            source_data=[
+                {"label": "Sydney", "state": "NSW", "postcode": "2000"},  # update
+                {"label": "Brisbane", "state": "QLD", "postcode": "4000"},  # insert
+            ],
+            entity_list_name=entity_list.name,
+            add_new_properties=False,
+            delete_not_matched=False,
+        )
+        entity_data = self.client.entities.get_table(entity_list_name=entity_list.name)
+        expected = [
+            {"label": "Sydney", "state": "NSW"},
+            {"label": "Brisbane", "state": "QLD"},
+            {"label": "Darwin", "state": "NT"},
+        ]
+        observed = [
+            {k: o.get(k) for k in ("state", "label")} for o in entity_data["value"]
+        ]
+        self.assertTrue(
+            len(expected) == len(observed)
+            and all(e in observed for e in expected)
+            and expected[0].keys() == observed[0].keys(),
+            observed,
+        )
+
     def test_entity_lists__list(self):
         """Should return a list of Entity Lists."""
         observed = self.client.entity_lists.list()

From d657b0fbc4bf526968c196bd49e71f7b44db1e59 Mon Sep 17 00:00:00 2001
From: lindsay stevens <lstevens@getodk.org>
Date: Fri, 16 Aug 2024 17:03:38 +1000
Subject: [PATCH 2/4] add: option to skip merge updates, docs for bulk
 update/delete

---
 pyodk/_endpoints/entities.py | 26 ++++++++++++++++++--------
 1 file changed, 18 insertions(+), 8 deletions(-)

diff --git a/pyodk/_endpoints/entities.py b/pyodk/_endpoints/entities.py
index 3a10c31..8ff4da0 100644
--- a/pyodk/_endpoints/entities.py
+++ b/pyodk/_endpoints/entities.py
@@ -468,6 +468,7 @@ def merge(
         project_id: int | None = None,
         match_keys: Iterable[str] | None = None,
         add_new_properties: bool = True,
+        update_matched: bool = True,
         delete_not_matched: bool = False,
         source_label_key: str = "label",
         source_keys: Iterable[str] | None = None,
@@ -480,6 +481,12 @@ def merge(
         2. Update Entities in Central that match the source data.
         3. Optionally, delete any Entities in Central that aren't in the source data.
 
+        Creation is performed using the bulk creation endpoint. This method may be slow
+        for large quantities of updates or deletes, since for these operations each
+        change is a request in a loop. If this is a concern, set the parameters
+        `update_matched` and `delete_not_matched` to False and use the return value to
+        perform threaded or async requests for these data.
+
         :param source_data: Data to use for updating Entities in Central.
         :param entity_list_name: The name of the Entity List (Dataset) being referenced.
         :param project_id: The id of the project this Entity belongs to.
@@ -488,6 +495,8 @@ def merge(
           key as "label", because it is translated to "label" for matching.
         :param add_new_properties: If True, add any Entity List properties from the
           source data that aren't in Central.
+        :param update_matched: If True, update any Entities in Central that match the
+          source data but have different properties.
         :param delete_not_matched: If True, delete any Entities in Central that aren't
           in the source data.
         :param source_label_key: The key in the source data to use as the label. The
@@ -531,14 +540,15 @@ def merge(
                 data={"entities": insert_reshape, "source": {"name": create_source}},
                 entity_list_name=eln,
             )
-        for u in merge_actions.to_update.values():
-            self.update(
-                uuid=u["__id"],
-                entity_list_name=eln,
-                label=u["label"],
-                data={k: u.get(k) for k in u if k in merge_actions.final_keys},
-                base_version=u["__system"]["version"],
-            )
+        if update_matched:
+            for u in merge_actions.to_update.values():
+                self.update(
+                    uuid=u["__id"],
+                    entity_list_name=eln,
+                    label=u["label"],
+                    data={k: u.get(k) for k in u if k in merge_actions.final_keys},
+                    base_version=u["__system"]["version"],
+                )
         if delete_not_matched:
             for d in merge_actions.to_delete.values():
                 self.delete(uuid=d["__id"], entity_list_name=eln)

From 72bb68e6ced57057e247532c71d4ca7228aa7208 Mon Sep 17 00:00:00 2001
From: lindsay stevens <lstevens@getodk.org>
Date: Thu, 29 Aug 2024 21:53:36 +1000
Subject: [PATCH 3/4] chg: accept same input data shape for merge and
 create_many

- chg: move insert data reshaping to create_many so that it's
  easier to use and accepts same kind of data as merge.
- add: create_source/source_size parameter for both methods to
  allow specifying this info separately to the data.
- chg: docstring improvements
---
 .../create_entities_from_submissions.py       |  15 ++-
 pyodk/_endpoints/entities.py                  | 124 ++++++++++++------
 tests/endpoints/test_entities.py              |   3 +-
 tests/test_client.py                          |  28 ++--
 4 files changed, 104 insertions(+), 66 deletions(-)

diff --git a/docs/examples/create_entities_from_submissions/create_entities_from_submissions.py b/docs/examples/create_entities_from_submissions/create_entities_from_submissions.py
index 4130c68..c649d38 100644
--- a/docs/examples/create_entities_from_submissions/create_entities_from_submissions.py
+++ b/docs/examples/create_entities_from_submissions/create_entities_from_submissions.py
@@ -9,7 +9,6 @@
 from pyodk import Client
 
 project_id = 1
-entity_list_name = f"previous_survey_{uuid4()}"
 entity_label_field = "first_name"
 entity_properties = ("age", "location")
 csv_path = Path("./imported_answers.csv")
@@ -18,28 +17,30 @@
 def create_one_at_a_time():
     with Client(project_id=project_id) as client, open(csv_path) as csv_file:
         # Create the entity list.
-        client.entity_lists.create(entity_list_name=entity_list_name)
+        entity_list = client.entity_lists.create(
+            entity_list_name=f"previous_survey_{uuid4()}"
+        )
         for prop in entity_properties:
-            client.entity_lists.add_property(name=prop, entity_list_name=entity_list_name)
+            client.entity_lists.add_property(name=prop, entity_list_name=entity_list.name)
 
         # Create the entities from the CSV data.
         for row in DictReader(csv_file):
             client.entities.create(
                 label=row[entity_label_field],
                 data={k: str(v) for k, v in row.items() if k in entity_properties},
-                entity_list_name=entity_list_name,
+                entity_list_name=entity_list.name,
             )
 
 
 def create_with_merge():
     with Client(project_id=project_id) as client, open(csv_path) as csv_file:
-        client.entity_lists.default_entity_list_name = client.session.get_xform_uuid()
+        client.entity_lists.default_entity_list_name = f"previous_survey_{uuid4()}"
         entity_list = client.entity_lists.create()
         client.entities.merge(
-            source_data=list(DictReader(csv_file)),
+            data=DictReader(csv_file),
             entity_list_name=entity_list.name,
             source_label_key=entity_label_field,
-            source_keys=entity_properties,
+            source_keys=(entity_label_field, *entity_properties),
         )
 
 
diff --git a/pyodk/_endpoints/entities.py b/pyodk/_endpoints/entities.py
index 8ff4da0..35b7640 100644
--- a/pyodk/_endpoints/entities.py
+++ b/pyodk/_endpoints/entities.py
@@ -5,6 +5,7 @@
 from typing import Any
 from uuid import uuid4
 
+from pyodk.__version__ import __version__
 from pyodk._endpoints import bases
 from pyodk._endpoints.entity_list_properties import EntityListPropertyService
 from pyodk._utils import validators as pv
@@ -179,34 +180,65 @@ def create(
 
     def create_many(
         self,
-        data: dict,
+        data: Iterable[Mapping[str, Any]],
         entity_list_name: str | None = None,
         project_id: int | None = None,
-    ) -> Entity:
+        create_source: str | None = None,
+        source_size: str | None = None,
+    ) -> bool:
         """
         Create one or more Entities in a single request.
 
-        Required keys in data: entities[].label, entities[].data, source.name.
-        Example of the required data format:
+        Example input for `data` would be a list of dictionaries from a CSV file:
+
+        data = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+            {"label": "Melbourne", "state": "VIC", "postcode": "3000"},
+        ]
 
-        {
-            "entities": [
-                {"label": "Sydney", "data": {"state": "NSW", "postcode": "2000"}},
-                {"label": "Melbourne", "data": {"state": "VIC", "postcode": "3000"}},
-            ],
-            "source": {"name": "pyodk", "size": 1},
-        }
+        Each Entity in `data` must include a "label" key. An Entity List property must be
+        created in advance for each key in `data` that is not "label". The `merge` method
+        can be used to automatically add properties (or a subset) and create Entities.
 
         :param data: Data to store for the Entities.
         :param entity_list_name: The name of the Entity List (Dataset) being referenced.
         :param project_id: The id of the project this Entity belongs to.
+        :param create_source: Used to capture the source of the change in Central, for
+          example a file name. Defaults to the PyODK version.
+        :param source_size: Used to capture the size of the source data in Central, for
+          example a file size or row count. Excluded if None.
         """
+        if create_source is None:
+            create_source = f"pyodk v{__version__}"
+        if source_size is None:
+            size = {}
+        else:
+            size = {"size": source_size}
+
+        def reshape(d):
+            try:
+                new = [
+                    {
+                        "label": i["label"],
+                        "data": {k: i.get(k) for k in i if k != "label"},
+                    }
+                    for i in d
+                ]
+            except KeyError as kerr:
+                raise PyODKError("All data must include a 'label' key.") from kerr
+            else:
+                return new
+
         try:
             pid = pv.validate_project_id(project_id, self.default_project_id)
             eln = pv.validate_entity_list_name(
                 entity_list_name, self.default_entity_list_name
             )
             data = pv.validate_is_instance(data, typ=Iterable, key="data")
+            final_data = {
+                "entities": reshape(data),
+                "source": {"name": create_source, **size},
+            }
         except PyODKError as err:
             log.error(err, exc_info=True)
             raise
@@ -215,7 +247,7 @@ def create_many(
             method="POST",
             url=self.session.urlformat(self.urls.post, project_id=pid, el_name=eln),
             logger=log,
-            json=data,
+            json=final_data,
         )
         data = response.json()
         return data["success"]
@@ -463,7 +495,7 @@ def get_key(entity: Mapping[str, Any], keys: list) -> tuple:
 
     def merge(
         self,
-        source_data: Iterable[Mapping[str, Any]],
+        data: Iterable[Mapping[str, Any]],
         entity_list_name: str | None = None,
         project_id: int | None = None,
         match_keys: Iterable[str] | None = None,
@@ -472,38 +504,48 @@ def merge(
         delete_not_matched: bool = False,
         source_label_key: str = "label",
         source_keys: Iterable[str] | None = None,
-        create_source: str = "pyodk",
+        create_source: str | None = None,
+        source_size: str | None = None,
     ) -> MergeActions:
         """
-        Update Entities in Central based on the provided source data.
+        Update Entities in Central based on the provided data:
 
-        1. Create Entities in the source data that don't exist in Central.
-        2. Update Entities in Central that match the source data.
-        3. Optionally, delete any Entities in Central that aren't in the source data.
+        1. Create Entities from `data` that don't exist in Central.
+        2. Update Entities from `data` that exist in Central.
+        3. Optionally, delete any Entities in Central that don't exist in `data`.
 
-        Creation is performed using the bulk creation endpoint. This method may be slow
-        for large quantities of updates or deletes, since for these operations each
-        change is a request in a loop. If this is a concern, set the parameters
-        `update_matched` and `delete_not_matched` to False and use the return value to
-        perform threaded or async requests for these data.
+        Example input for `source_data` would be a list of dictionaries from a CSV file:
 
-        :param source_data: Data to use for updating Entities in Central.
+        data = [
+            {"label": "Sydney", "state": "NSW", "postcode": "2000"},
+            {"label": "Melbourne", "state": "VIC", "postcode": "3000"},
+        ]
+
+        Entity creation is performed in one request using `create_many`. The merge
+        operation may be slow if large quantities of updates or deletes are required,
+        since for these operations each change is a request in a loop. If this is a
+        concern, set the parameters `update_matched` and `delete_not_matched` to False and
+        use the return value to perform threaded or async requests for these data.
+
+        :param data: Data to use for updating Entities in Central.
         :param entity_list_name: The name of the Entity List (Dataset) being referenced.
         :param project_id: The id of the project this Entity belongs to.
         :param match_keys: Dictionary keys common to source and target used to match rows.
           Defaults to ("label",). If a custom source_label_key is provided, specify that
           key as "label", because it is translated to "label" for matching.
-        :param add_new_properties: If True, add any Entity List properties from the
-          source data that aren't in Central.
-        :param update_matched: If True, update any Entities in Central that match the
-          source data but have different properties.
+        :param add_new_properties: If True, add any Entity List properties from `data`
+          that aren't in Central.
+        :param update_matched: If True, update any Entities in Central that match `data`
+          but have different properties.
         :param delete_not_matched: If True, delete any Entities in Central that aren't
-          in the source data.
-        :param source_label_key: The key in the source data to use as the label. The
-          target label key is always "label" because this key is required by Central.
-        :param source_keys: If provided, process only these keys in the source data.
-        :param create_source: When creating Entities in bulk, this value is used to
-          capture the source of the change in Central.
+          in `data`.
+        :param source_label_key: The key in `data` to use as the label. The target label
+          key is always "label" because this key is required by Central.
+        :param source_keys: If provided, process only these keys in `data`.
+        :param create_source: If Entities are created, this is used to capture the source
+          of the change in Central, for example a file name. Defaults to the PyODK version.
+        :param source_size: If Entities are created, this is used to capture the size of
+          `data` in Central, for example a file size. Excluded if None.
         """
         pid = pv.validate_project_id(project_id, self.default_project_id)
         eln = pv.validate_entity_list_name(
@@ -511,7 +553,7 @@ def merge(
         )
         target_data = self.get_table(entity_list_name=entity_list_name)["value"]
         merge_actions = self._prep_data_for_merge(
-            source_data=source_data,
+            source_data=data,
             target_data=target_data,
             match_keys=match_keys,
             source_label_key=source_label_key,
@@ -529,16 +571,16 @@ def merge(
         else:
             merge_actions.final_keys = merge_actions.keys_intersect
         if len(merge_actions.to_insert) > 0:
-            insert_reshape = [
-                {
-                    "label": i["label"],
-                    "data": {k: i.get(k) for k in i if k in merge_actions.final_keys},
-                }
+            relevant_keys = {"label", *merge_actions.final_keys}
+            insert_filter = [
+                {k: i.get(k) for k in i if k in relevant_keys}
                 for i in merge_actions.to_insert.values()
             ]
             self.create_many(
-                data={"entities": insert_reshape, "source": {"name": create_source}},
+                data=insert_filter,
                 entity_list_name=eln,
+                create_source=create_source,
+                source_size=source_size,
             )
         if update_matched:
             for u in merge_actions.to_update.values():
diff --git a/tests/endpoints/test_entities.py b/tests/endpoints/test_entities.py
index 119471e..7b0514f 100644
--- a/tests/endpoints/test_entities.py
+++ b/tests/endpoints/test_entities.py
@@ -3,7 +3,7 @@
 from unittest import TestCase
 from unittest.mock import MagicMock, patch
 
-from pyodk._endpoints.entities import Entity
+from pyodk._endpoints.entities import Entity, MergeActions
 from pyodk._endpoints.entities import EntityService as es
 from pyodk._utils.session import Session
 from pyodk.client import Client
@@ -351,6 +351,7 @@ def test_merge__all_ops(self):
             {"label": "Melbourne", "state": "VIC"},
         ]
         observed = es._prep_data_for_merge(source_data=source, target_data=target)
+        self.assertIsInstance(observed, MergeActions)
         self.assertEqual(1, len(observed.to_insert))
         self.assertEqual(
             source[1]["label"],
diff --git a/tests/test_client.py b/tests/test_client.py
index f2c7a43..ff669b8 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -302,7 +302,7 @@ def test_entity__merge__new(self):
         )
         entity_list = self.client.entity_lists.create()
         self.client.entities.merge(
-            source_data=[
+            data=[
                 {"label": "Sydney", "state": "NSW"},
                 {"label": "Melbourne", "state": "VIC"},
             ],
@@ -321,18 +321,15 @@ def test_entity__merge__existing__add_props__delete_unmatched(self):
             name="state", entity_list_name=entity_list.name
         )
         self.client.entities.create_many(
-            data={
-                "entities": [
-                    {"label": "Sydney", "data": {"state": "VIC"}},
-                    {"label": "Darwin", "data": {"state": "NT"}},
-                ],
-                "source": {"name": "pyodk"},
-            },
+            data=[
+                {"label": "Sydney", "state": "VIC"},
+                {"label": "Darwin", "state": "NT"},
+            ],
             entity_list_name=entity_list.name,
         )
         # Add postcode property, Add Brisbane, update Sydney, delete Darwin.
         self.client.entities.merge(
-            source_data=[
+            data=[
                 {"label": "Sydney", "state": "NSW", "postcode": "2001"},
                 {"label": "Brisbane", "state": "QLD", "postcode": "4000"},
             ],
@@ -366,18 +363,15 @@ def test_entity__merge__existing__ignore_props__keep_unmatched(self):
             name="state", entity_list_name=entity_list.name
         )
         self.client.entities.create_many(
-            data={
-                "entities": [
-                    {"label": "Sydney", "data": {"state": "VIC"}},
-                    {"label": "Darwin", "data": {"state": "NT"}},
-                ],
-                "source": {"name": "pyodk"},
-            },
+            data=[
+                {"label": "Sydney", "state": "VIC"},
+                {"label": "Darwin", "state": "NT"},
+            ],
             entity_list_name=entity_list.name,
         )
         # Skip postcode property, add Brisbane, update Sydney, keep Darwin.
         self.client.entities.merge(
-            source_data=[
+            data=[
                 {"label": "Sydney", "state": "NSW", "postcode": "2000"},  # update
                 {"label": "Brisbane", "state": "QLD", "postcode": "4000"},  # insert
             ],

From b95ee5622e55ab2190500deb6b12f4618e1c5499 Mon Sep 17 00:00:00 2001
From: lindsay stevens <lstevens@getodk.org>
Date: Thu, 29 Aug 2024 22:18:04 +1000
Subject: [PATCH 4/4] fix: pin docs dependency with breaking update that breaks
 CI

- from mkdocstrings-python dependency on griffe>=0.37
- griffe public API changed in v1.0.0 released 2 weeks ago.
---
 pyproject.toml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pyproject.toml b/pyproject.toml
index dc60785..58a88c2 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -23,6 +23,7 @@ dev = [
 docs = [
     "mkdocs==1.5.3",
     "mkdocstrings==0.24.1",
+    "griffe==0.37",
     "mkdocstrings-python==1.9.0",
     "mkdocs-jupyter==0.24.6",
 ]