Skip to content

Commit

Permalink
add ModifySubscription handling (FreeOpcUa#244)
Browse files Browse the repository at this point in the history
* add ModifySubscription handling

* minimize parameters to ModifySubscription
  • Loading branch information
ralf1070 authored and oroulet committed May 30, 2017
1 parent 710a16c commit 66acd64
Show file tree
Hide file tree
Showing 16 changed files with 92 additions and 56 deletions.
3 changes: 3 additions & 0 deletions include/opc/ua/protocol/message_identifiers.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ namespace OpcUa
DELETE_SUBSCRIPTION_REQUEST = 0x34f, //847
DELETE_SUBSCRIPTION_RESPONSE = 0x352, //850

MODIFY_SUBSCRIPTION_REQUEST = 0x319, //793
MODIFY_SUBSCRIPTION_RESPONSE = 0x31c, //796

PUBLISH_REQUEST = 0x33A, // 826
PUBLISH_RESPONSE = 0x33D, // 829

Expand Down
12 changes: 0 additions & 12 deletions include/opc/ua/protocol/protocol_auto.h
Original file line number Diff line number Diff line change
Expand Up @@ -2622,8 +2622,6 @@ namespace OpcUa
CreateSubscriptionResponse();
};

/* DISABLED
struct ModifySubscriptionParameters
{
uint32_t SubscriptionId;
Expand All @@ -2633,9 +2631,6 @@ namespace OpcUa
uint32_t MaxNotificationsPerPublish;
uint8_t Priority;
};
*/

/* DISABLED

struct ModifySubscriptionRequest
{
Expand All @@ -2645,19 +2640,13 @@ namespace OpcUa

ModifySubscriptionRequest();
};
*/

/* DISABLED

struct ModifySubscriptionResult
{
double RevisedPublishingInterval;
uint32_t RevisedLifetimeCount;
uint32_t RevisedMaxKeepAliveCount;
};
*/

/* DISABLED

struct ModifySubscriptionResponse
{
Expand All @@ -2667,7 +2656,6 @@ namespace OpcUa

ModifySubscriptionResponse();
};
*/

struct PublishingModeParameters
{
Expand Down
1 change: 1 addition & 0 deletions include/opc/ua/services/subscriptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace OpcUa

public:
virtual SubscriptionData CreateSubscription(const CreateSubscriptionRequest&, std::function<void (PublishResult)> callbackPublish) = 0;
virtual ModifySubscriptionResponse ModifySubscription(const ModifySubscriptionParameters& parameters) = 0;
virtual std::vector<StatusCode> DeleteSubscriptions(const std::vector<uint32_t>& subscriptions) = 0;
virtual void Publish(const PublishRequest& request) = 0;
virtual RepublishResponse Republish(const RepublishParameters& params) = 0;
Expand Down
6 changes: 4 additions & 2 deletions schemas/generate_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,10 @@
'CreateSubscriptionParameters',
'SubscriptionData',
'CreateSubscriptionResponse',
#'ModifySubscriptionRequest',
#'ModifySubscriptionResponse',
'ModifySubscriptionRequest',
'ModifySubscriptionParameters',
'ModifySubscriptionResult',
'ModifySubscriptionResponse',
'SetPublishingModeRequest',
'PublishingModeParameters',
'SetPublishingModeResponse',
Expand Down
10 changes: 10 additions & 0 deletions src/client/binary_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,16 @@ namespace
return response.Data;
}

virtual ModifySubscriptionResponse ModifySubscription(const ModifySubscriptionParameters& parameters) override
{
if (Debug) { std::cout << "binary_client| ModifySubscription -->" << std::endl; }
ModifySubscriptionRequest request;
request.Parameters = parameters;
const ModifySubscriptionResponse response = Send<ModifySubscriptionResponse>(request);
if (Debug) { std::cout << "binary_client| ModifySubscription <--" << std::endl; }
return response;
}

virtual std::vector<StatusCode> DeleteSubscriptions(const std::vector<uint32_t>& subscriptions) override
{
if (Debug) { std::cout << "binary_client| DeleteSubscriptions -->" << std::endl; }
Expand Down
6 changes: 0 additions & 6 deletions src/protocol/constructors_auto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,21 +483,15 @@ namespace OpcUa
{
}

/* DISABLED
ModifySubscriptionRequest::ModifySubscriptionRequest()
: TypeId(FourByteNodeId((uint16_t)ObjectId::ModifySubscriptionRequest_Encoding_DefaultBinary))
{
}
*/

/* DISABLED

ModifySubscriptionResponse::ModifySubscriptionResponse()
: TypeId(FourByteNodeId((uint16_t)ObjectId::ModifySubscriptionResponse_Encoding_DefaultBinary))
{
}
*/

SetPublishingModeRequest::SetPublishingModeRequest()
: TypeId(FourByteNodeId((uint16_t)ObjectId::SetPublishingModeRequest_Encoding_DefaultBinary))
Expand Down
12 changes: 0 additions & 12 deletions src/protocol/deserialize_auto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3042,8 +3042,6 @@ namespace OpcUa
}


/* DISABLED
template<>
void DataDeserializer::Deserialize<ModifySubscriptionParameters>(ModifySubscriptionParameters& data)
{
Expand All @@ -3055,9 +3053,6 @@ namespace OpcUa
*this >> data.Priority;
}

*/

/* DISABLED

template<>
void DataDeserializer::Deserialize<ModifySubscriptionRequest>(ModifySubscriptionRequest& data)
Expand All @@ -3067,9 +3062,6 @@ namespace OpcUa
*this >> data.Parameters;
}

*/

/* DISABLED

template<>
void DataDeserializer::Deserialize<ModifySubscriptionResult>(ModifySubscriptionResult& data)
Expand All @@ -3079,9 +3071,6 @@ namespace OpcUa
*this >> data.RevisedMaxKeepAliveCount;
}

*/

/* DISABLED

template<>
void DataDeserializer::Deserialize<ModifySubscriptionResponse>(ModifySubscriptionResponse& data)
Expand All @@ -3091,7 +3080,6 @@ namespace OpcUa
*this >> data.Parameters;
}

*/

template<>
void DataDeserializer::Deserialize<PublishingModeParameters>(PublishingModeParameters& data)
Expand Down
12 changes: 0 additions & 12 deletions src/protocol/rawsize_auto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3438,8 +3438,6 @@ namespace OpcUa
}


/* DISABLED
template<>
std::size_t RawSize<ModifySubscriptionParameters>(const ModifySubscriptionParameters& data)
{
Expand All @@ -3453,9 +3451,6 @@ namespace OpcUa
return size;
}

*/

/* DISABLED

template<>
std::size_t RawSize<ModifySubscriptionRequest>(const ModifySubscriptionRequest& data)
Expand All @@ -3467,9 +3462,6 @@ namespace OpcUa
return size;
}

*/

/* DISABLED

template<>
std::size_t RawSize<ModifySubscriptionResult>(const ModifySubscriptionResult& data)
Expand All @@ -3481,9 +3473,6 @@ namespace OpcUa
return size;
}

*/

/* DISABLED

template<>
std::size_t RawSize<ModifySubscriptionResponse>(const ModifySubscriptionResponse& data)
Expand All @@ -3495,7 +3484,6 @@ namespace OpcUa
return size;
}

*/

template<>
std::size_t RawSize<PublishingModeParameters>(const PublishingModeParameters& data)
Expand Down
12 changes: 0 additions & 12 deletions src/protocol/serialize_auto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2994,8 +2994,6 @@ namespace OpcUa
}


/* DISABLED
template<>
void DataSerializer::Serialize<ModifySubscriptionParameters>(const ModifySubscriptionParameters& data)
{
Expand All @@ -3007,9 +3005,6 @@ namespace OpcUa
*this << data.Priority;
}

*/

/* DISABLED

template<>
void DataSerializer::Serialize<ModifySubscriptionRequest>(const ModifySubscriptionRequest& data)
Expand All @@ -3019,9 +3014,6 @@ namespace OpcUa
*this << data.Parameters;
}

*/

/* DISABLED

template<>
void DataSerializer::Serialize<ModifySubscriptionResult>(const ModifySubscriptionResult& data)
Expand All @@ -3031,9 +3023,6 @@ namespace OpcUa
*this << data.RevisedMaxKeepAliveCount;
}

*/

/* DISABLED

template<>
void DataSerializer::Serialize<ModifySubscriptionResponse>(const ModifySubscriptionResponse& data)
Expand All @@ -3043,7 +3032,6 @@ namespace OpcUa
*this << data.Parameters;
}

*/

template<>
void DataSerializer::Serialize<PublishingModeParameters>(const PublishingModeParameters& data)
Expand Down
22 changes: 22 additions & 0 deletions src/server/internal_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,28 @@ namespace OpcUa
return response;
}

ModifySubscriptionResult InternalSubscription::ModifySubscription(const ModifySubscriptionParameters& data)
{
ModifySubscriptionResult result;

if (data.RequestedLifetimeCount) {
Data.RevisedLifetimeCount = data.RequestedLifetimeCount;
}
LifeTimeCount = result.RevisedLifetimeCount = Data.RevisedLifetimeCount;

if (data.RequestedPublishingInterval) {
Data.RevisedPublishingInterval = data.RequestedPublishingInterval;
}
result.RevisedPublishingInterval = Data.RevisedPublishingInterval;

if (data.RequestedMaxKeepAliveCount) {
Data.RevisedMaxKeepAliveCount = data.RequestedMaxKeepAliveCount;
}
result.RevisedMaxKeepAliveCount = Data.RevisedMaxKeepAliveCount;

return result;
}

NotificationData InternalSubscription::GetNotificationData()
{
DataChangeNotification notification;
Expand Down
1 change: 1 addition & 0 deletions src/server/internal_subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ namespace OpcUa
bool HasExpired();
void TriggerEvent(NodeId node, Event event);
RepublishResponse Republish(const RepublishParameters& params);
ModifySubscriptionResult ModifySubscription(const ModifySubscriptionParameters& data);

private:
void DeleteAllMonitoredItems();
Expand Down
19 changes: 19 additions & 0 deletions src/server/opc_tcp_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,25 @@ namespace OpcUa
return;
}

case MODIFY_SUBSCRIPTION_REQUEST:
{
if (Debug) std::clog << "opc_tcp_processor| Processing modify subscription request." << std::endl;
ModifySubscriptionRequest request;
istream >> request.Parameters;
request.Header = requestHeader;

ModifySubscriptionResponse response = Server->Subscriptions()->ModifySubscription(request.Parameters);
FillResponseHeader(requestHeader, response.Header);

SecureHeader secureHeader(MT_SECURE_MESSAGE, CHT_SINGLE, ChannelId);
secureHeader.AddSize(RawSize(algorithmHeader));
secureHeader.AddSize(RawSize(sequence));
secureHeader.AddSize(RawSize(response));
if (Debug) std::clog << "opc_tcp_processor| Sending response to Modify Subscription Request." << std::endl;
ostream << secureHeader << algorithmHeader << sequence << response << flush;
return;
}

case DELETE_SUBSCRIPTION_REQUEST:
{
if (Debug) std::clog << "opc_tcp_processor| Processing delete subscription request." << std::endl;
Expand Down
5 changes: 5 additions & 0 deletions src/server/services_registry_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ namespace
return SubscriptionData();
}

virtual ModifySubscriptionResponse ModifySubscription(const ModifySubscriptionParameters& parameters)
{
return ModifySubscriptionResponse();
}

virtual std::vector<StatusCode> DeleteSubscriptions(const std::vector<uint32_t>& subscriptions)
{
return std::vector<StatusCode>();
Expand Down
5 changes: 5 additions & 0 deletions src/server/subscription_service_addon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ namespace
return Subscriptions->CreateSubscription(request, callback);
}

OpcUa::ModifySubscriptionResponse ModifySubscription(const OpcUa::ModifySubscriptionParameters& parameters)
{
return Subscriptions->ModifySubscription(parameters);
}

std::vector<OpcUa::StatusCode> DeleteSubscriptions(const std::vector<uint32_t>& subscriptions)
{
return Subscriptions->DeleteSubscriptions(subscriptions);
Expand Down
21 changes: 21 additions & 0 deletions src/server/subscription_service_internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,27 @@ namespace OpcUa
return result;
}

ModifySubscriptionResponse SubscriptionServiceInternal::ModifySubscription(const ModifySubscriptionParameters& parameters)
{
boost::unique_lock<boost::shared_mutex> lock(DbMutex);

ModifySubscriptionResponse response;

uint32_t subid = parameters.SubscriptionId;
SubscriptionsIdMap::iterator itsub = SubscriptionsMap.find(subid);
if (itsub == SubscriptionsMap.end())
{
std::cout << "SubscriptionService | Error, got request to modify non existing Subscription: " << subid << std::endl;
response.Header.ServiceResult = StatusCode::BadSubscriptionIdInvalid;
return response;
}
if (Debug) std::cout << "SubscriptionService | Modify Subscription with Id: " << subid << std::endl;

std::shared_ptr<InternalSubscription> sub = itsub->second;
response.Parameters = sub->ModifySubscription(parameters);
return response;
}

SubscriptionData SubscriptionServiceInternal::CreateSubscription(const CreateSubscriptionRequest& request, std::function<void (PublishResult)> callback)
{
boost::unique_lock<boost::shared_mutex> lock(DbMutex);
Expand Down
1 change: 1 addition & 0 deletions src/server/subscription_service_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ namespace OpcUa
~SubscriptionServiceInternal();

virtual std::vector<StatusCode> DeleteSubscriptions(const std::vector<uint32_t>& subscriptions);
virtual ModifySubscriptionResponse ModifySubscription(const ModifySubscriptionParameters& parameters);
virtual SubscriptionData CreateSubscription(const CreateSubscriptionRequest& request, std::function<void (PublishResult)> callback);
virtual std::vector<MonitoredItemCreateResult> CreateMonitoredItems(const MonitoredItemsParameters& params);
virtual std::vector<StatusCode> DeleteMonitoredItems(const DeleteMonitoredItemsParameters& params);
Expand Down

0 comments on commit 66acd64

Please sign in to comment.