Skip to content

Commit

Permalink
Bulk entity creation via API (#1070)
Browse files Browse the repository at this point in the history
* Start of bulk entity upload

* Move bulk append to existing POST .../entities resource

* Test to check entity sources

* Make sure bulk entity create is in a transaction

* Return bulk create event and details in audits for a single entity

* Add test for auto-generating UUID

* Updating the docs

* Remove extra transaction by fixing test

* fixing tests, handling empty entities array or non-array

* mend

* exclude entity.bulk.create from non-verbose audit logs

* validate bulk entity source object

* updated docs to include bulk source
  • Loading branch information
ktuite authored Feb 24, 2024
1 parent 9a141ec commit f98187d
Show file tree
Hide file tree
Showing 8 changed files with 936 additions and 40 deletions.
104 changes: 82 additions & 22 deletions docs/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ info:

**Added**:

- Bulk Entity Creation!
* The existing [Entity Create](/central-api-entity-management/#creating-entities) endpoint now also accepts a list of Entities to append to a Dataset.
* The `uuid` property is no longer required and Central will generate a UUID for each new Entity if needed.
- OData Data Document for requests of Submissions and Entities now allow use of `$orderby`.
- ETag headers on all Blobs.

**Changed**:

- The [Entity Create](/central-api-entity-management/#creating-an-entity) endpoint will now generate a UUID if the `uuid` parameter is not provided.
- The [Entity Create](/central-api-entity-management/#creating-entities) endpoint will now generate a UUID if the `uuid` parameter is not provided.


## ODK Central v2023.5
Expand Down Expand Up @@ -96,7 +99,7 @@ info:
* New endpoint [GET /projects/:id/datasets/:name/entities/:uuid/versions](/central-api-entity-management/#listing-versions) for listing the versions of an Entity.
* New endpoint [GET /projects/:id/datasets/:name/entities/:uuid/diffs](/central-api-entity-management/#getting-changes-between-versions) for getting the changes between versions of an Entity.
* New endpoint [GET /projects/:id/datasets/:name/entities/:uuid/audits](/central-api-entity-management/#entity-audit-log) for getting the server audit logs about a specific Entity.
* New endpoint [POST /projects/:id/datasets/:name/entities](/central-api-entity-management/#creating-an-entity) for creating an Entity from JSON.
* New endpoint [POST /projects/:id/datasets/:name/entities](/central-api-entity-management/#creating-entities) for creating an Entity from JSON.
* New endpoint [PATCH /projects/:id/datasets/:name/entities/:uuid](/central-api-entity-management/#updating-an-entity) for updating the data or label of an Entity.
* New endpoint [DELETE /projects/:id/datasets/:name/entities/:uuid](/central-api-entity-management/#deleting-an-entity) for soft-deleting an Entity.

Expand Down Expand Up @@ -10070,15 +10073,35 @@ paths:
post:
tags:
- Entity Management
summary: Creating an Entity
summary: Creating Entities
description: |-
Creates an Entity in the Dataset. The request body takes a JSON representation of the Entity, which has the following properties:
Creates one or more Entities in the Dataset.

For creating **a single Entity**, the request body takes a JSON representation of the Entity, which has the following properties:

1. A `data` object containing values for the user-defined Dataset properties. (Not all properties have to have values.)
2. A `label` property, which cannot be blank or an empty string. (This is used as a human-readable label in Forms that consume Entities.)
3. An optional `uuid` property. If the `uuid` is not specified, Central will generate a UUID for an Entity with the provided data and label.

Value type of all properties is `string`.

`
{
"label": "John Doe",
"data": {
"firstName": "John",
"age": "22"
}
}
`

The value type of all properties is `string`.

For creating **multiple Entities** in bulk, the request body takes an array `entities` containing a list of Entity objects as described above. The bulk entity version also takes a `source` property with a required `name` field and optional `size`, for example to capture the filename and size of a bulk upload source.

`
{
"entities": [...], "source": {"name": "file.csv", "size": 100}
}
`

You can provide header `X-Action-Notes` to store the metadata about the request. The metadata can retrieved using [Entity Audit Log](/central-api-entity-management/#entity-audit-log)
operationId: Creating an Entity
Expand All @@ -10100,23 +10123,27 @@ paths:
requestBody:
content:
'*/*':
schema:
type: object
properties:
uuid:
type: string
description: The `uuid` of the Entity that uniquely identifies the Entity.
label:
type: string
description: Label of the Entity
data:
$ref: '#/components/schemas/DataExample'
schema:
oneOf:
- $ref: '#/components/schemas/EntityCreateSingle'
- $ref: '#/components/schemas/EntityCreateBulk'
example:
uuid: 54a405a0-53ce-4748-9788-d23a30cc3afa
label: John Doe (88)
data:
firstName: John
age: '88'
entities:
-
uuid: 54a405a0-53ce-4748-9788-d23a30cc3afa
label: John Doe (22)
data:
firstName: John
age: '22'
-
uuid: 0c3a7922-b611-42ca-a961-944e09fa9aa2
label: Amy Jane (38)
data:
firstName: Amy
age: '38'
source:
name: my_dataset.csv
size: 100
responses:
200:
description: OK
Expand Down Expand Up @@ -13547,6 +13574,18 @@ components:
type: string
description: The name of the property that is changed.
example: name
EntityBulkSource:
type: object
properties:
name:
type: string
description: A name to identify the bulk source, filename or API run
example: myfile.csv
size:
type: number
description: Optional, meant to indicate filesize (in MB)
example: 100
description: An object describing the source of this bulk create action.
EntityOdata:
type: object
properties:
Expand Down Expand Up @@ -13580,6 +13619,27 @@ components:
type: string
circumference_cm:
type: string
EntityCreateSingle:
type: object
properties:
uuid:
type: string
description: (Optional) The `uuid` of the Entity that uniquely identifies the Entity.
label:
type: string
description: Label of the Entity
data:
$ref: '#/components/schemas/DataExample'
EntityCreateBulk:
type: object
properties:
entities:
type: array
items:
$ref: '#/components/schemas/EntityCreateSingle'
description: A list of Entities
source:
$ref: '#/components/schemas/EntityBulkSource'
DataExample:
type: object
properties:
Expand Down
25 changes: 24 additions & 1 deletion lib/data/entity.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const { PartialPipe } = require('../util/stream');
const Problem = require('../util/problem');
const { submissionXmlToFieldStream } = require('./submission');
const { nextUrlFor, getServiceRoot, jsonDataFooter, extractPaging } = require('../util/odata');
const { sanitizeOdataIdentifier } = require('../util/util');
const { sanitizeOdataIdentifier, blankStringToNull } = require('../util/util');

const odataToColumnMap = new Map([
['__system/createdAt', 'entities.createdAt'],
Expand Down Expand Up @@ -142,6 +142,28 @@ const extractEntity = (body, propertyNames, existingEntity) => {
return entity;
};

// Input: object representing source (file name and size), sent via API
// Also handles userAgent string
// Returns validated and sanitized source object
const extractBulkSource = (source, count, userAgent) => {
if (!source)
throw Problem.user.missingParameter({ field: 'source' });

const { name, size } = source;

if (!name)
throw Problem.user.missingParameter({ field: 'source.name' });

if (typeof name !== 'string')
throw Problem.user.invalidDataTypeOfParameter({ field: 'name', value: typeof name, expected: 'string' });

if (size != null && typeof size !== 'number')
throw Problem.user.invalidDataTypeOfParameter({ field: 'size', value: typeof size, expected: 'number' });


return { name, ...(size) && { size }, count, userAgent: blankStringToNull(userAgent) };
};

////////////////////////////////////////////////////////////////////////////
// ENTITY STREAMING

Expand Down Expand Up @@ -429,6 +451,7 @@ module.exports = {
normalizeUuid,
extractLabelFromSubmission,
extractBaseVersionFromSubmission,
extractBulkSource,
streamEntityCsv, streamEntityCsvAttachment,
streamEntityOdata, odataToColumnMap,
extractSelectedProperties, selectFields,
Expand Down
23 changes: 14 additions & 9 deletions lib/model/query/audits.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const actionCondition = (action) => {
// The backup action was logged by a backup script that has been removed.
// Even though the script has been removed, the audit log entries it logged
// have not, so we should continue to exclude those.
return sql`action not in ('entity.create', 'entity.error', 'entity.update.version', 'entity.update.resolve', 'entity.delete', 'submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'backup', 'analytics')`;
return sql`action not in ('entity.create', 'entity.bulk.create', 'entity.error', 'entity.update.version', 'entity.update.resolve', 'entity.delete', 'submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'backup', 'analytics')`;
else if (action === 'user')
return sql`action in ('user.create', 'user.update', 'user.delete', 'user.assignment.create', 'user.assignment.delete', 'user.session.create')`;
else if (action === 'field_key')
Expand All @@ -52,7 +52,7 @@ const actionCondition = (action) => {
else if (action === 'dataset')
return sql`action in ('dataset.create', 'dataset.update')`;
else if (action === 'entity')
return sql`action in ('entity.create', 'entity.error', 'entity.update.version', 'entity.update.resolve', 'entity.delete')`;
return sql`action in ('entity.create', 'entity.bulk.create', 'entity.error', 'entity.update.version', 'entity.update.resolve', 'entity.delete')`;

return sql`action=${action}`;
};
Expand Down Expand Up @@ -133,11 +133,12 @@ const getBySubmissionId = (submissionId, options) => ({ all }) => _getBySubmissi


const _getByEntityId = (fields, options, entityId) => sql`
SELECT ${fields} FROM audits
LEFT JOIN actors ON actors.id=audits."actorId"
SELECT ${fields} FROM entity_defs
LEFT JOIN entity_defs ON (audits.details->'entityDefId')::INTEGER = entity_defs.id
LEFT JOIN entity_def_sources on entity_def_sources.id = entity_defs."sourceId"
INNER JOIN audits ON ((audits.details->'entityDefId')::INTEGER = entity_defs.id OR (audits.details->'sourceId')::INTEGER = entity_def_sources.id)
LEFT JOIN actors ON actors.id=audits."actorId"
LEFT JOIN audits triggering_event ON entity_def_sources."auditId" = triggering_event.id
LEFT JOIN actors triggering_event_actor ON triggering_event_actor.id = triggering_event."actorId"
Expand Down Expand Up @@ -167,15 +168,14 @@ SELECT ${fields} FROM audits
-- if some other kind of target object defined, add subquery here
-- ...
WHERE (audits.details->>'entityId')::INTEGER = ${entityId}
WHERE entity_defs."entityId" = ${entityId}
ORDER BY audits."loggedAt" DESC, audits.id DESC
${page(options)}`;

const getByEntityId = (entityId, options) => ({ all }) => {

const _unjoiner = unjoiner(
Audit, Actor,
Audit, Actor, Entity.Def.Source,
Option.of(Audit.alias('triggering_event', 'triggeringEvent')), Option.of(Actor.alias('triggering_event_actor', 'triggeringEventActor')),
Option.of(Audit.alias('submission_create_event', 'submissionCreateEvent')), Option.of(Actor.alias('submission_create_event_actor', 'submissionCreateEventActor')),
Option.of(Submission), Option.of(Submission.Def.alias('current_submission_def', 'currentVersion')),
Expand All @@ -188,6 +188,8 @@ const getByEntityId = (entityId, options) => ({ all }) => {
.then(map(_unjoiner))
.then(map(audit => {

const entitySourceDetails = audit.aux.source.forApi();

const sourceEvent = audit.aux.triggeringEvent
.map(a => a.withAux('actor', audit.aux.triggeringEventActor.orNull()))
.map(a => a.forApi());
Expand All @@ -212,9 +214,12 @@ const getByEntityId = (entityId, options) => ({ all }) => {
})
.orElse(undefined);

// Note: The source added to each audit event represents the source of the
// corresponding entity _version_, rather than the source of the event.
const details = mergeLeft(audit.details, sourceEvent
.map(event => ({ source: { event, submission } }))
.orElse({ source: {} })); // Add default empty source to all other entity audit events
.orElse({ source: entitySourceDetails }));


return new Audit({ ...audit, details }, { actor: audit.aux.actor });
}));
Expand Down
43 changes: 43 additions & 0 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,48 @@ createNew.audit = (newEntity, dataset, partial, subDef) => (log) => {
};
createNew.audit.withResult = true;

// createMany() inserts many entities and entity defs in bulk two main queries.
// it could be used in places of createNew() but createNew uses a single query so it may be faster
// in single entity situations (eg. processing submissions to make entities)
// Note: if the entity schema changes, createMany and createNew would both need to change.
const createMany = (dataset, entities, sourceId, userAgentIn) => async ({ all, context }) => {
const creatorId = context.auth.actor.map((actor) => actor.id).orNull();
const userAgent = blankStringToNull(userAgentIn);

const entityInsert = sql.join(entities.map(e => sql`(${sql.join([dataset.id, e.uuid, creatorId, sql`clock_timestamp()`], sql`,`)} )`), sql`,`);
const newEntities = await all(sql`
INSERT INTO entities ("datasetId", "uuid", "creatorId", "createdAt")
VALUES ${entityInsert}
RETURNING id`);

const defInsert = sql.join(entities.map((e, i) => sql`(${sql.join([
newEntities[i].id,
e.def.label,
JSON.stringify(e.def.data),
JSON.stringify(e.def.dataReceived),
sourceId,
creatorId,
userAgent,
'true',
'true',
sql`clock_timestamp()`,
'1'
], sql`,`)} )`), sql`,`);

const defs = await all(sql`
INSERT INTO entity_defs ("entityId", "label", "data", "dataReceived",
"sourceId", "creatorId", "userAgent", "root", "current", "createdAt", "version")
VALUES ${defInsert}
RETURNING *
`);

return defs;
};

createMany.audit = (dataset, entities, sourceId) => (log) =>
log('entity.bulk.create', dataset, { sourceId });
createMany.audit.withResult = false;


////////////////////////////////////////////////////////////////////////////////
// ENTITY UPDATE
Expand Down Expand Up @@ -491,6 +533,7 @@ del.audit = (entity, dataset) => (log) => log('entity.delete', entity.with({ act
module.exports = {
createNew, _processSubmissionEvent,
createSource,
createMany,
_createEntity, _updateEntity,
processSubmissionEvent, streamForExport,
getDefBySubmissionId,
Expand Down
34 changes: 27 additions & 7 deletions lib/resources/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const { getOrNotFound, reject } = require('../util/promise');
const { isTrue, success } = require('../util/http');
const { Entity } = require('../model/frames');
const Problem = require('../util/problem');
const { diffEntityData, getWithConflictDetails } = require('../data/entity');
const { diffEntityData, extractBulkSource, getWithConflictDetails } = require('../data/entity');
const { QueryOptions } = require('../util/db');

module.exports = (service, endpoint) => {
Expand Down Expand Up @@ -78,21 +78,41 @@ module.exports = (service, endpoint) => {

}));

// Create a single entity or bulk create multiple entities.
// In either case, this appends new entities to a dataset.
service.post('/projects/:id/datasets/:name/entities', endpoint(async ({ Datasets, Entities }, { auth, body, params, userAgent }) => {

const dataset = await Datasets.get(params.id, params.name, true).then(getOrNotFound);

await auth.canOrReject('entity.create', dataset);

const properties = await Datasets.getProperties(dataset.id);

const partial = await Entity.fromJson(body, properties, dataset);
// Destructure list of new entities and source if bulk operation
const { entities, source } = body;

const sourceId = await Entities.createSource();
const entity = await Entities.createNew(dataset, partial, null, sourceId, userAgent);
if (!entities) {
// not a bulk operation
const partial = await Entity.fromJson(body, properties, dataset);
const sourceId = await Entities.createSource();
const entity = await Entities.createNew(dataset, partial, null, sourceId, userAgent);
// Entities.createNew doesn't return enough information for a full response so re-fetch.
return Entities.getById(dataset.id, entity.uuid).then(getOrNotFound);
} else {
// bulk operation
if (!Array.isArray(body.entities))
return reject(Problem.user.unexpectedAttributes({ expected: ['entities: [...]'], actual: ['not an array'] }));

if (!body.entities.length)
return reject(Problem.user.unexpectedAttributes({ expected: ['entities: [...]'], actual: ['empty array'] }));

const partials = body.entities.map(e => Entity.fromJson(e, properties, dataset));

const sourceId = await Entities.createSource(extractBulkSource(source, partials.length, userAgent));

await Entities.createMany(dataset, partials, sourceId, userAgent);
return success();
}

// Entities.createNew doesn't return enough information for a full response so re-fetch.
return Entities.getById(dataset.id, entity.uuid).then(getOrNotFound);
}));

service.patch('/projects/:id/datasets/:name/entities/:uuid', endpoint(async ({ Datasets, Entities }, { auth, body, params, query, userAgent }) => {
Expand Down
Loading

0 comments on commit f98187d

Please sign in to comment.