In recent years, the widespread diffusion of Cloud computing has led to a massive adoption of the “microservices” application development paradigm. Refactoring of monolithic applications in microservices allows getting the most out of some intrinsic features of the Cloud itself, including the use of many \u201cmanaged\u201d<\/em> services, useful for delegating to the Cloud provider the management and reliability of critical tasks which aren\u2019t part of the application core<\/em>.<\/p>\n
beSharp is no exception: this refactoring process, in fact, also involved Noovolari Smart Backup<\/a> – our product for the management of Backup and Disaster Recovery of AWS infrastructures.<\/p>\n
To implement this solution we will use the following languages and services:<\/p>\n
To begin with, let’s see how to start a WebSocket channel with API Gateway using the URL that will be generated when creating the back-end API.<\/p>\n
In the main application template we added the following javascript function:<\/p>\n
<<\/span>script<\/span>><\/span> $<\/b>(<\/span>document<\/i><\/b>).<\/span>ready<\/b>(<\/span>function <\/b>() {<\/span> \u00a0\u00a0\u00a0<\/span>\/\/Setup notification system<\/span><\/i> \u00a0\u00a0\u00a0<\/span><\/i>noovolari<\/i><\/b>.<\/span>smartbackup<\/i><\/b>.<\/span>notifications<\/i><\/b>.<\/span>websocket<\/i><\/b>.<\/span>connect<\/span>(<\/span> '<\/b><%=<\/b> Rails<\/i><\/b>.<\/b>env <\/b>%><\/b>'<\/b>,<\/span> '<\/b><%=<\/b> Notification<\/i><\/b>::<\/b>NotificationAuthorizer<\/i><\/b>.<\/b>check_token<\/b>(<\/b>user_id<\/b>: <\/b>current_user<\/b>.<\/b>id<\/b>, <\/b>company_code<\/b>: <\/b>current_user<\/b>.<\/b>companies<\/b>.<\/b>first<\/b>.<\/b>code<\/b>) <\/b>%><\/b>'<\/b> \u00a0\u00a0);<\/span> <\/<\/span>script<\/span>>\r\n<\/span><\/pre>\nThis is how the function is implemented in detail:<\/p>\n
noovolari<\/i><\/b>.<\/span>smartbackup<\/i><\/b>.<\/span>notifications<\/i><\/b>.<\/span>websocket<\/i><\/b>.<\/span>connect <\/span>= <\/span>function<\/b>(stage, token) {<\/span> \r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0<\/span>var <\/b>socketUrl <\/span>= <\/span>'wss:\/\/<\/b><WEBSOCKET_URL><\/b>\/'<\/b>+stage+<\/span>'?token='<\/b>+token;\r\n\r\n <\/span> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0<\/span>\/\/ connecting to the websocket url and retrieve function for connect and disconnect\r\n<\/span><\/i> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0<\/span><\/i>noovolari<\/i><\/b>.<\/span>smartbackup<\/i><\/b>.<\/span>notifications<\/i><\/b>.<\/span>websocket <\/b>=\u00a0 <\/span>new <\/b>WebSocket<\/i><\/b>(<\/span>socketUrl<\/span>);\r\n\r\n <\/span> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0<\/span>noovolari<\/i><\/b>.<\/span>smartbackup<\/i><\/b>.<\/span>notifications<\/i><\/b>.<\/span>websocket<\/i><\/b>.<\/span>onopen <\/span>= <\/span>function <\/b>() {\r\n<\/span> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0<\/span>console<\/i><\/b>.<\/span>info<\/span>(<\/span>'socket connection opened properly with...'<\/b>);\r\n<\/span> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0};\r\n\r\n<\/span> \u00a0\u00a0\u00a0\u00a0\u00a0<\/span>noovolari<\/i><\/b>.<\/span>smartbackup<\/i><\/b>.<\/span>notifications<\/i><\/b>.<\/span>websocket<\/i><\/b>.<\/span>onclose <\/span>= <\/span>function <\/b>() {\r\n<\/span> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0<\/span>\/\/ websocket is closed.\r\n<\/span><\/i> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0<\/span><\/i>console<\/i><\/b>.<\/span>info<\/span>(<\/span>\"Connection closed...\"<\/b>);\r\n\r\n<\/span> \u00a0\u00a0\u00a0\u00a0\u00a0};\r\n\r\n<\/span> \u00a0\u00a0\u00a0\u00a0\u00a0<\/span>noovolari<\/i><\/b>.<\/span>smartbackup<\/i><\/b>.<\/span>notifications<\/i><\/b>.<\/span>websocket<\/i><\/b>.<\/span>onmessage <\/span>= <\/span>function <\/b>(evt) {\r\n<\/span> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0<\/span>var <\/b>notification <\/span>= <\/span>JSON<\/i><\/b>.<\/span>parse<\/span>(evt.<\/span>data<\/b>);\r\n<\/span> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0<\/span>var <\/b>message <\/span>= <\/span>notification<\/span>.<\/span>data<\/b>;\r\n<\/span> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0<\/span>noovolariUiToolkit<\/b>.<\/span>notifications<\/i><\/b>.<\/span>subscribers<\/b>.<\/span>forEach<\/span>(<\/span>function<\/b>(subscriber) {\r\n<\/span> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0<\/span>if<\/b>(subscriber.<\/span>type <\/b>=== <\/span>message<\/span>.<\/span>type<\/b>) {\r\n<\/span> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0subscriber.<\/span>callback<\/span>(<\/span>message<\/span>.<\/span>data<\/b>);\r\n<\/span> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0}<\/span> \u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0});\r\n<\/span> \u00a0\u00a0\u00a0\u00a0\u00a0};\r\n<\/span> };<\/span><\/pre>\nLet’s analyse the code step by step.<\/p>\n
With this function we define three methods needed for implementing the Javascript WebSocket object interface; is worth noting that as parameters, we pass both the “stage” variable, that identifies the working environment in which we want to open the communication channel, whether it is development <\/strong>or production<\/strong>, and the token variable which, for security, is generated by Ruby so the possibility to have the function inspected is negated (the browser inspectors can always show the Javascript code instead, this way, the token is already generated by the back-end).<\/p>\n
The opening and closing callbacks of the WebSocket connection are managed directly by AWS; we’ll see later how this is accomplished.<\/p>\n
Upon the receiving of a message we cycle on the number of “subscribers”, registered before the opening of the channel, to represent the message using a callback depending on the same type of the message itself (for example an information message, an alert message, a danger message, etc.)<\/p>\n
The token <\/strong>is a JWT Token encrypted with SHA512; the shared secret is managed by an environment variable in both our application and AWS.<\/p>\n
Let’s see how the token is built (the reference language is Ruby):<\/p>\n
def self.generate_token(user_id:, company_code:)\r\ntoken = compose_token(user_id: user_id, company_code: company_code)\r\npayload = { user_id: user_id, room_id:company_code, token:token,exp: Time.now.to_i + EXPIRATION_TIME_IN_SECONDS }\r\nJWT.encode payload, Configuration.load_secrets['bernie']['notification_key'], 'HS512'\r\nEnd\r\n\r\ndef self.compose_token(user_id:, company_code:)\r\n\u00a0\u00a0\u00a0\u00a0\u00a0dynamo = get_dynamo_client()\r\n\u00a0\u00a0\u00a0\u00a0\u00a0token = \"#{user_id}#{Configuration.load_secrets['bernie']['notification_passcode']}#{company_code}\"\r\n\u00a0\u00a0\u00a0\u00a0\u00a0token = Digest::SHA256.hexdigest(token)\r\n\r\ndynamo.put_item({\r\n\u00a0\u00a0table_name: Configuration.load_secrets['bernie']['notification_table'],\r\n\u00a0\u00a0item: {\r\n\u00a0\u00a0\u00a0\u00a0'user_id' => user_id.to_s,\r\n\u00a0\u00a0\u00a0\u00a0'room_id' => company_code,\r\n\u00a0\u00a0\u00a0\u00a0'token' => BCrypt::Password.create(token)\r\n\u00a0}\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0})\r\n\u00a0\u00a0\u00a0\u00a0\u00a0\u00a0return token\r\nend<\/pre>\nThe “generate_token” function not only creates our token through the JWT library but also saves the encrypted token with the shared secret on DynamoDB; this allows us to have two security levels: the first given by JWT that encrypts with SHA512; the second given by us with BCrypt.<\/p>\n
AWS side implementation<\/p>\n
\u00a0<\/strong>API GATEWAY<\/strong><\/p>\n
\u00a0<\/strong>Let\u2019s access our AWS account and look for the API Gateway <\/strong>service. We open the service page and create a new API.<\/p>\n
<\/p>\n
We set the parameters as follows:<\/p>\n
As soon as the API is created, under the “Routes” heading we will find $connect, $disconnect and $default; the first two are very important routes because they are used for the connection and disconnection phases of the WebSocket channel both on the AWS side (with lambda functions) and the client side (as we saw in the previous scripts).<\/p>\n
Click on $connect <\/strong>and set the parameters as in the image below:<\/p>\n
<\/p>\n
We want to link the $connect <\/strong>phase with an appropriate lambda function, so we check it under “Integration type”; we also check “Lambda Proxy Integration” because we want our context <\/strong>variable to contain the complete list of properties of the API Gateway call.<\/p>\n
The Region must be set according to your needs, in our case eu-west-1<\/strong>; “lambda function” represents the name of the lambda to be called which we can load from the drop-down menu. For this, we use the default value and press save.<\/p>\n
We do the same for $disconnect<\/strong> and sendMessage<\/strong>, pointing to the respective Lambda functions.<\/p>\n
If you have completed these steps correctly you will find yourself in the following situation:<\/p>\n
<\/p>\n
Now we have four routes configured with the corresponding Lambdas; we have to create IAM roles to let them communicate with the necessary services. In addition to this, we also create DynamoDB tables.<\/p>\n
IAM<\/strong><\/p>\n
First of all we create four IAM roles: OnConnectFunctionRole<\/strong>, OnDisconnectFunctionRole<\/strong>, SendMessageFunctionRole.<\/strong><\/p>\n
<\/p>\n
Let’s give them the name we prefer; to use the OnConnect function we need a\u00a0 “LambdaBasicExecutionRole”<\/strong>; and a “custom policy” <\/strong>with this json:<\/p>\n
{\r\n\"Statement\": [\r\n {\r\n \"Action\": [\r\n \"dynamodb:GetItem\",\r\n \"dynamodb:DeleteItem\",\r\n \"dynamodb:PutItem\",\r\n \"dynamodb:Scan\",\r\n \"dynamodb:Query\",\r\n \"dynamodb:UpdateItem\",\r\n \"dynamodb:BatchWriteItem\",\r\n \"dynamodb:BatchGetItem\",\r\n \"dynamodb:DescribeTable\"\r\n ],\r\n \"Resource\": [\r\n \"arn:aws:dynamodb:eu-west-1:<ACCOUNT_ID>:table\/hermesConnections\",\r\n \"arn:aws:dynamodb:eu-west-1:<ACCOUNT_ID>:table\/hermesConnections\/index\/*\"\r\n ],\r\n \"Effect\": \"Allow\"\r\n }\r\n]\r\n}<\/pre>\nRemember to change <ACCOUNT_ID><\/strong> with our account id and take note that \u201chermesConnections<\/strong>\u201d represents our dynamodb table.<\/p>\n
For “OnDisconnect”:<\/strong><\/p>\n
{\r\n\"Statement\": [\r\n {\r\n \"Action\": [\r\n \"dynamodb:GetItem\",\r\n \"dynamodb:DeleteItem\",\r\n \"dynamodb:PutItem\",\r\n \"dynamodb:Scan\",\r\n \"dynamodb:Query\",\r\n \"dynamodb:UpdateItem\",\r\n \"dynamodb:BatchWriteItem\",\r\n \"dynamodb:BatchGetItem\",\r\n \"dynamodb:DescribeTable\"\r\n ],\r\n \"Resource\": [\r\n \"arn:aws:dynamodb:eu-west-1:<ACCOUNT_ID>:table\/hermesConnections\",\r\n \"arn:aws:dynamodb:eu-west-1:<ACCOUNT_ID>:table\/hermesConnections\/index\/*\"\r\n ],\r\n \"Effect\": \"Allow\"\r\n }\r\n]\r\n}<\/pre>\nFor “SendMessage” we still use a “LambdaBasicExecutionRole”, AWSLambdaSQSQueueExecutionRole and three custom policies:<\/p>\n
{\r\n\"Statement\": [\r\n {\r\n \"Action\": [\r\n \"dynamodb:GetItem\",\r\n \"dynamodb:DeleteItem\",\r\n \"dynamodb:PutItem\",\r\n \"dynamodb:Scan\",\r\n \"dynamodb:Query\",\r\n \"dynamodb:UpdateItem\",\r\n \"dynamodb:BatchWriteItem\",\r\n \"dynamodb:BatchGetItem\",\r\n \"dynamodb:DescribeTable\"\r\n ],\r\n \"Resource\": [\r\n \"arn:aws:dynamodb:eu-west-1:<ACCOUNT_ID>:table\/hermesConnections\",\r\n \"arn:aws:dynamodb:eu-west-1:<ACCOUNT_ID>:table\/hermesConnections\/index\/*\"\r\n ],\r\n \"Effect\": \"Allow\"\r\n }\r\n]\r\n}<\/pre>\nThis one:<\/p>\n
{\r\n\"Statement\": [\r\n {\r\n \"Action\": [\r\n \"sqs:ChangeMessageVisibility\",\r\n \"sqs:ChangeMessageVisibilityBatch\",\r\n \"sqs:DeleteMessage\",\r\n \"sqs:DeleteMessageBatch\",\r\n \"sqs:GetQueueAttributes\",\r\n \"sqs:ReceiveMessage\"\r\n ],\r\n \"Resource\": \"arn:aws:sqs:eu-west-1::hermes-messageQueue\",\r\n \"Effect\": \"Allow\"\r\n }\r\n]\r\n}<\/pre>\nAnd finally:<\/p>\n
{\r\n\"Statement\": [\r\n {\r\n \"Action\": [\r\n \"execute-api:ManageConnections\"\r\n ],\r\n \"Resource\": [\r\n \"arn:aws:execute-api:eu-west-1::\/*\"\r\n ],\r\n \"Effect\": \"Allow\"\r\n }\r\n]\r\n}<\/pre>\nThe variable can be retrieved from the API Gateway dashboard inspecting the top left corner:<\/p>\n
<\/p>\n
Now we can create the DynamoDB table to save the connectionIDs provided to us by API Gateway during the $connect <\/strong>phase. This parameter is very important because it identifies the user who opened the connection. We will then see how to authenticate the request to open the connection, but also how to filter the messages on a per ROOM basis.<\/p>\n
DYNAMODB<\/strong><\/p>\n
Let’s connect to the DynamoDB service and click on “create table”; we’ll find ourselves in the following situation:<\/p>\n
<\/p>\n
We set the same name as we set in the IAM role previously, in this case, “hermesConnections”; as partition key, <\/strong>we set connection_id<\/strong> of type string<\/strong>; leave the rest of the table settings with the default\u00a0<\/strong>values.<\/p>\n
The table will be created in a few seconds; as users connect to API Gateway, the function associated with $connect will create tuples in this table.<\/p>\n
Now let’s define our lambda functions starting from the routes previously created on API Gateway.<\/p>\n
Lambda for $connect and authorizer<\/h1>\n
Let’s go to the Lambda function management page and create a new function.<\/p>\n
<\/p>\n
We give the name we used when we created the $connect route<\/strong>, then we set the other values as shown below:<\/p>\n
app.js\r\n\r\nvar AWS = require(\"aws-sdk\");\r\nAWS.config.update({ region: process.env.AWS_REGION });\r\nvar DDB = new AWS.DynamoDB({ apiVersion: \"2012-10-08\" });\r\nexports.handler = function (event, context, callback) {\r\n \/\/ based on query string parameter on the connection websocket add the connection in the dynamo table with the selected room_id and user_id\r\n console.log(event.requestContext.stage);\r\n let table = event.requestContext.table_name;\r\n var putParams = {\r\n TableName: process.env[table],\r\n Item: {\r\n room_id: {S: event.requestContext.authorizer.room_id},\r\n connection_id: { S: event.requestContext.connectionId },\r\n user_id: {S: event.requestContext.authorizer.user_id}\r\n }\r\n };\r\nDDB.putItem(putParams, function (err) {\r\n callback(null, {\r\n statusCode: err ? 500 : 200,\r\n body: err ? \"Failed to connect: \" + JSON.stringify(err) : \"Connected.\"\r\n });\r\n });\r\n};\r\n<\/pre>\npackage.json\r\n\r\n{\r\n \"name\": \"onConnect\",\r\n \"version\": \"1.0.0\",\r\n \"description\": \"OnConnect function for WebSockets on API Gateway\",\r\n \"main\": \"src\/app.js\",\r\n \"author\": \"SAM CLI\",\r\n \"license\": \"MIT\",\r\n \"dependencies\": {\r\n \"aws-sdk\": \"^2.434.0\"\r\n } \r\n}<\/pre>\nUnder environment variables we insert the name of the connection table:<\/p>\n
<\/p>\n
We also add the Authorizer Lambda:<\/p>\n
index.js\r\n\r\nexports.handler = async (event) => {\r\n try {\r\n console.log(event.requestContext.stage);\r\n \/\/ Query string parameters you can use to validate the connection\r\n \/\/ We verify the json Web Token\r\n let jwt = require('jsonwebtoken');\r\n let secret_environment = 'JWT_SECRET';\r\n let table_environment = 'TABLE_NAME';\r\n let decoded = jwt.verify(event.queryStringParameters.token, process.env[secret_environment]);\r\n let userId = decoded.user_id;\r\n let roomId = decoded.room_id;\r\n let token = decoded.token;\r\n \/\/ The principal id can be anything, in this case we used \r\n let principalId = userId;\r\n const AWS = require('aws-sdk');\r\n const ddb = new AWS.DynamoDB({ apiVersion: '2012-08-10' });\r\nlet db_token, db_user_id, db_room_id, db_expire_date;\r\n let params = {\r\n TableName: process.env[table_environment],\r\n Key: { \r\n user_id: { S: userId.toString() },\r\n room_id: { S: roomId.toString() } \r\n }\r\n };\r\n await ddb.getItem(params, function(err, data) {\r\n if (err) { \r\n \/* an error occurred *\/ \r\n console.log(err, err.stack);\r\n } else { \r\n \/\/ successful response\r\n db_user_id = data.Item.user_id.S;\r\n db_room_id = data.Item.room_id.S;\r\n db_token = data.Item.token.S;\r\n }\r\n }).promise();\r\nlet bcrypt = require(\"bcryptjs\");\r\n let compare_hash = bcrypt.compareSync(token, db_token);\r\n let compare_user_and_room = db_room_id.toString() === roomId.toString() && db_user_id.toString() === userId.toString();\r\n let effect = (compare_hash && compare_user_and_room) ? 'Allow' : 'Deny'; \/\/ Set 'Allow' or 'Deny' to decide if one can connect or not\r\n console.log(effect);\r\n return generatePolicy(principalId, effect, event.methodArn, userId, roomId);\r\n } catch(e) {\r\n console.log(e.stack);\r\n return null;\r\n }\r\n};\r\nlet generatePolicy = function(principalId, effect, resource, user_id, room_id) {\r\n \/\/ Required output:\r\n var authResponse = {};\r\n authResponse.principalId = principalId;\r\n if (effect && resource) {\r\n var policyDocument = {};\r\n policyDocument.Version = '2012-10-17'; \/\/ default version\r\n policyDocument.Statement = [];\r\n var statementOne = {};\r\n statementOne.Action = 'execute-api:Invoke'; \/\/ default action\r\n statementOne.Effect = effect;\r\n statementOne.Resource = resource;\r\n policyDocument.Statement[0] = statementOne;\r\n authResponse.policyDocument = policyDocument;\r\n }\r\n \/\/ Optional output with custom properties of the String, Number or Boolean type.\r\n authResponse.context = {\r\n room_id: room_id,\r\n user_id: user_id\r\n }; \r\n return authResponse;\r\n}<\/pre>\npackage.json\r\n\r\n{\r\n \"name\": \"authorizer\",\r\n \"version\": \"1.0.0\",\r\n \"description\": \"Authorizer function for WebSockets on API Gateway\",\r\n \"main\": \"index.js\",\r\n \"author\": \"beSharp\",\r\n \"license\": \"MIT\",\r\n \"dependencies\": {\r\n \"bcryptjs\": \"^2.4.3\",\r\n \"jsonwebtoken\": \"^8.5.1\"\r\n }\r\n}<\/pre>\nAnd the relative environment variables:<\/p>\n
<\/p>\n
The authorization Lambda accesses the JWT Token sent through API Gateway, decodes it, obtains the token encrypted with BCrypt by the client application that opened the connection, then compares it with the encrypted one saved on DynamoDB; if they coincide, the authorizer grants permission to open the connection by creating an ad hoc IAM authorization policy<\/strong>.<\/p>\n
We return to the API Gateway screen and click on $connect<\/strong>, so we can edit the authorizer<\/strong> field, adding the lambda we have just created.<\/p>\n
<\/p>\n
Where \u201cwebsocketAuth<\/strong>\u201d is the name we gave to the authorisation Lambda.<\/p>\n
Now we need to add the $disconnect<\/strong> and sendMessage<\/strong> lambdas; so let’s proceed to add them starting with $disconnect which is very simple:<\/p>\n
app.js\r\n\r\nvar AWS = require(\"aws-sdk\");\r\nAWS.config.update({ region: process.env.AWS_REGION });\r\nvar DDB = new AWS.DynamoDB({ apiVersion: \"2012-10-08\" }); \r\nexports.handler = function (event, context, callback) {\r\n console.log(event.requestContext.stage);\r\n let table_environment = 'TABLE_NAME_' + event.requestContext.stage.toUpperCase();\r\n var deleteParams = {\r\n TableName: process.env[table_environment],\r\n Key: {\r\n connection_id: { S: event.requestContext.connectionId }\r\n }\r\n };\r\nDDB.deleteItem(deleteParams, function (err) {\r\n callback(null, {\r\n statusCode: err ? 500 : 200,\r\n body: err ? \"Failed to disconnect: \" + JSON.stringify(err) : \" Disconnected.\"\r\n });\r\n });\r\n};<\/pre>\nIn practice, whenever API Gateway has to disconnect a user from the channel, it invokes the function associated with the $disconnect lambda which, in return, deletes the line corresponding to the specified connection_id from the DynamoDB connection table.<\/p>\n
We can now move on to the send message part, creating the sendMessage lambda:<\/p>\n
<\/p>\n
First of all in this function we need to add SQS as a trigger because it will be called not only by API Gateway but also by SQS. We add this trigger as shown below.<\/p>\n
Then we can see the function code:<\/p>\n
index.js\r\n\r\nconst AWS = require('aws-sdk');\r\nrequire('.\/patch.js');\r\nconst ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10' });\r\nconst dynamodb = new AWS.DynamoDB({apiVersion: '2011-12-05'});\r\nconst { ENDPOINT } = process.env;\r\nexports.handler = async (event, context,callback) => {\r\n let environment =undefined;\r\n let table = process.env.TABLE_NAME;\r\nlet user_id= undefined;\r\n let room_id = undefined;\r\n let postData = undefined;\r\n let messageBody = {};\r\n \/\/ if there are records we re in the sqs trigger put the event from the message of the SQS message received.\r\n if (event.Records != undefined) {\r\n let body = JSON.parse(event.Records[0].body);\r\n user_id = body.user_id;\r\n room_id = body.room_id;\r\n messageBody = {user_id: user_id };\r\n postData = body.data; \r\n } else {\r\n postData = JSON.parse(event.body).data;\r\n const connection_sender = event.requestContext.connectionId;\r\n \/\/ get sender info on dynamo db table of connections\r\n let params = {\r\n Key: { HashKeyElement: { S: connection_sender } }, \r\n TableName: table\r\n };\r\n let sender_item = undefined;\r\n \/\/get the row of dynamo referred to the connection_id sender.\r\n await dynamodb.getItem(params, function(err, data) {\r\n if (err) console.log(err, err.stack); \/\/ an error occurred\r\n else { \r\n sender_item = data; \r\n user_id = sender_item.Item.user_id.S;\r\n room_id = sender_item.Item.room_id.S;\r\n \/\/ adding in the messageBody the user_id of the sender\r\n messageBody = {user_id: user_id };\r\n } \/\/ successful response\r\n }).promise();\r\n }\r\n\/\/retrieve all connection for the selected room_id.\r\n let connectionData;\r\n try {\r\n let scanParam = { TableName: table, ProjectionExpression: 'connection_id, user_id',\r\n FilterExpression: '#roomId = :roomId',\r\n ExpressionAttributeNames: {\r\n '#roomId': 'room_id',\r\n },\r\n ExpressionAttributeValues: {\r\n ':roomId': room_id,\r\n }\r\n };\r\n connectionData = await ddb.scan(scanParam).promise();\r\n } catch (e) {\r\n return { statusCode: 500, body: e.stack };\r\n }\r\n let url = (ENDPOINT === \"\") ? event.requestContext.domainName + '\/' + \r\n event.requestContext.stage : ENDPOINT + \"\/\"+ environment.toLowerCase();\r\nconsole.log(url);\r\nconst apigwManagementApi = new AWS.ApiGatewayManagementApi({\r\n apiVersion: '2018-11-29',\r\n endpoint: url\r\n });\r\n \/\/ for each connection send a message through apigwManagementApi \r\n const postCalls = connectionData.Items.map(async ({ connection_id }) => {\r\n try {\r\n messageBody.data = postData;\r\n await apigwManagementApi.postToConnection({ ConnectionId: connection_id, Data: JSON.stringify(messageBody)}).promise();\r\n console.log(\"posted\");\r\n } catch (e) {\r\n console.error(e);\r\n \/\/if the connection in a stale status, the connection will be closed, and the connection_id will be removed from the dynamoDB table\r\n if (e.statusCode === 410) {\r\n console.log(`Found stale connection, deleting ${connection_id}`);\r\n var deleteParams = {\r\n TableName: table,\r\n Key: {\r\n connection_id: { S: connection_id }\r\n }\r\n };\r\n await ddb.deleteItem(deleteParams, function (err) {\r\n callback(null, {\r\n statusCode: err ? 500 : 200,\r\n body: err ? \"Failed to disconnect: \" + JSON.stringify(err) : \" Disconnected.\"\r\n }).promise();\r\n });\r\n } else {\r\n return { statusCode: 500, body: e.stack };\r\n }\r\n }\r\n });\r\ntry {\r\n await Promise.all(postCalls);\r\n } catch (e) {\r\n return { statusCode: 500, body: e.stack };\r\n }\r\neturn { statusCode: 200, body: 'Data sent.' };\r\n};<\/pre>\nNow let’s add the environment variables:<\/p>\n
<\/p>\n
In this Lambda function, the code is able to distinguish the calls coming from SQS by those from API Gateway; based on this information, the body of the message and information about the user are reconstructed. Using the room_id<\/strong> we obtain the user_id<\/strong> and connection_id<\/strong> from the DynamoDB connection table. Then we proceed to send the message to all the users found this way.<\/p>\n
If there is a connection problem with a user, the Lambda will delete the relevant line from the connection table.<\/p>\n
Once all these functions have been added, we verify that the names used to describe them are consistent with those indicated in $connect<\/strong>, $disconnect <\/strong>and sendMessage <\/strong>respectively. Once done, the architecture of our notification system is completed and we can test it directly by entering a message following this example:<\/p>\n
sqs = Aws::SQS::Client.new(region: region)\r\npayload = JSON.generate(user_id: user_id, room_id: room_id, data: message)\r\nparams = {\r\n message_body: payload,\r\n queue_url: config.queue_url\r\n}\r\nsqs.send_message(params)<\/pre>\nReferring to the Client instead, the following will be a good example:<\/p>\n
function sendMessage(){ ws.send(JSON.stringify( {\u201caction\u201d: \u201csendMessage\u201d, \u201cdata\u201d : \u201cmessage\u201d } ); }<\/pre>\nThis concludes our tutorial on how to develop a notification system\u2019s microservice based on AWS services, complete with authentication via API Gateway, DynamoDB, SQS, and Lambda.<\/p>\n
One of the most interesting aspects of the solution is certainly its compactness – all the logic is implemented in just four rather simple Lambda functions – while maintaining a certain versatility.<\/p>\n
In the event that you want to manage the infrastructure lifecycle according to the Infrastructure-as-Code paradigm, for example through a SAM template, a viable expedient could be the creation of a Route53 ALIAS record to decouple the URL called by the client from that of API Gateway which, being generated programmatically, changes with each deployment.<\/p>\n
Want to learn more about this solution and how beSharp can help you develop Serverless and Cloud-native applications on AWS? Contact us!<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"