Democratizzare l’accesso ai dati tramite una Data Platform self-service – Parte 1
11 Febbraio 2025 - 5 min. read

Matteo Goretti
DevOps Engineer
<script> $(document).ready(function () { //Setup notification system noovolari.smartbackup.notifications.websocket.connect( '<%= Rails.env %>', '<%= Notification::NotificationAuthorizer.check_token(user_id: current_user.id, company_code: current_user.companies.first.code) %>' ); </script>Questa funzione è così implementata nel dettaglio:
noovolari.smartbackup.notifications.websocket.connect = function(stage, token) { var socketUrl = 'wss://<WEBSOCKET_URL>/'+stage+'?token='+token; // connecting to the websocket url and retrieve function for connect and disconnect noovolari.smartbackup.notifications.websocket = new WebSocket(socketUrl); noovolari.smartbackup.notifications.websocket.onopen = function () { console.info('socket connection opened properly with...'); }; noovolari.smartbackup.notifications.websocket.onclose = function () { // websocket is closed. console.info("Connection closed..."); }; noovolari.smartbackup.notifications.websocket.onmessage = function (evt) { var notification = JSON.parse(evt.data); var message = notification.data; noovolariUiToolkit.notifications.subscribers.forEach(function(subscriber) { if(subscriber.type === message.type) { subscriber.callback(message.data); } }); }; };Andiamo ad analizzare il codice passo-passo.Con questa funzione andiamo a definire tutti e tre i metodi necessari ad implementare l’interfaccia dell’oggetto WebSocket di Javascript; si nota che dall’esterno passiamo sia la variabile “stage” che identifica per quale ambiente di lavoro vogliamo aprire la comunicazione, sia esso development che production, sia la variabile token che, per sicurezza, viene generata da codice Ruby in modo da non dare la possibilità di fare inspect della funzione (gli inspector dei browser possono sempre mostrare il codice Javascript, invece in questo modo il token ci arriva già generato dal back-end).Le callback di apertura e chiusura della connessione WebSocket vengono gestite direttamente da AWS, vedremo poi come.Sulla ricezione del messaggio invece cicliamo sul numero di “subscriber”, che abbiamo registrato prima dell’apertura del canale, per rappresentare il messaggio con una callback dello stesso tipo del messaggio stesso (ad esempio: un messaggio di informazione, uno di allerta, uno di pericolo, etc.) Il token è un JWT Token crittografato con SHA512, il secret condiviso è gestito mediante variabile d’ambiente nella nostra applicazione e su AWS.Vediamo come viene costruito (il linguaggio di riferimento è Ruby):
def self.generate_token(user_id:, company_code:) token = compose_token(user_id: user_id, company_code: company_code) payload = { user_id: user_id, room_id:company_code, token:token,exp: Time.now.to_i + EXPIRATION_TIME_IN_SECONDS } JWT.encode payload, Configuration.load_secrets['bernie']['notification_key'], 'HS512' End
def self.compose_token(user_id:, company_code:) dynamo = get_dynamo_client() token = "#{user_id}#{Configuration.load_secrets['bernie']['notification_passcode']}#{company_code}" token = Digest::SHA256.hexdigest(token)
dynamo.put_item({ table_name: Configuration.load_secrets['bernie']['notification_table'], item: { 'user_id' => user_id.to_s, 'room_id' => company_code, 'token' => BCrypt::Password.create(token) } }) return token endLa funzione “generate_token” non solo crea il nostro token mediante la libreria di JWT ma salva anche il token crittografato con il segreto condiviso su DynamoDB, questo ci permette di avere due livelli di sicurezza, il primo dato da JWT che crittografa con SHA512 e il secondo dato da noi con BCrypt.
{ "Statement": [ { "Action": [ "dynamodb:GetItem", "dynamodb:DeleteItem", "dynamodb:PutItem", "dynamodb:Scan", "dynamodb:Query", "dynamodb:UpdateItem", "dynamodb:BatchWriteItem", "dynamodb:BatchGetItem", "dynamodb:DescribeTable" ], "Resource": [ "arn:aws:dynamodb:eu-west-1:<ACCOUNT_ID>:table/hermesConnections", "arn:aws:dynamodb:eu-west-1:<ACCOUNT_ID>:table/hermesConnections/index/*" ], "Effect": "Allow" } ] }Ricordiamoci di cambiare <ACCOUNT_ID> con il nostro account id e di prendere nota del fatto che “hermesConnections” rappresenta la nostra tabella di dynamodb. Per “OnDisconnect”:
{ "Statement": [ { "Action": [ "dynamodb:GetItem", "dynamodb:DeleteItem", "dynamodb:PutItem", "dynamodb:Scan", "dynamodb:Query", "dynamodb:UpdateItem", "dynamodb:BatchWriteItem", "dynamodb:BatchGetItem", "dynamodb:DescribeTable" ], "Resource": [ "arn:aws:dynamodb:eu-west-1:<ACCOUNT_ID>:table/hermesConnections", "arn:aws:dynamodb:eu-west-1:<ACCOUNT_ID>:table/hermesConnections/index/*" ], "Effect": "Allow" } ] }Per “SendMessage” utilizziamo ancora un “LambdaBasicExecutionRole”, AWSLambdaSQSQueueExecutionRole e tre custom policies:
{ "Statement": [ { "Action": [ "dynamodb:GetItem", "dynamodb:DeleteItem", "dynamodb:PutItem", "dynamodb:Scan", "dynamodb:Query", "dynamodb:UpdateItem", "dynamodb:BatchWriteItem", "dynamodb:BatchGetItem", "dynamodb:DescribeTable" ], "Resource": [ "arn:aws:dynamodb:eu-west-1:<ACCOUNT_ID>:table/hermesConnections", "arn:aws:dynamodb:eu-west-1:<ACCOUNT_ID>:table/hermesConnections/index/*" ], "Effect": "Allow" } ] }Quest’altra:
{ "Statement": [ { "Action": [ "sqs:ChangeMessageVisibility", "sqs:ChangeMessageVisibilityBatch", "sqs:DeleteMessage", "sqs:DeleteMessageBatch", "sqs:GetQueueAttributes", "sqs:ReceiveMessage" ], "Resource": "arn:aws:sqs:eu-west-1:<ACCOUNT_ID>:hermes-messageQueue", "Effect": "Allow" } ] }Infine:
{ "Statement": [ { "Action": [ "execute-api:ManageConnections" ], "Resource": [ "arn:aws:execute-api:eu-west-1:<ACCOUNT_ID>:<API_CODE>/*" ], "Effect": "Allow" } ] }La voce <API_CODE> può essere recuperata dalla schermata di API Gateway in alto a sinistra:
var AWS = require("aws-sdk"); AWS.config.update({ region: process.env.AWS_REGION }); var DDB = new AWS.DynamoDB({ apiVersion: "2012-10-08" });
exports.handler = function (event, context, callback) { // based on query string parameter on the connection websocket add the connection in the dynamo table with the selected room_id and user_id console.log(event.requestContext.stage); let table = event.requestContext.table_name; var putParams = { TableName: process.env[table], Item: { room_id: {S: event.requestContext.authorizer.room_id}, connection_id: { S: event.requestContext.connectionId }, user_id: {S: event.requestContext.authorizer.user_id} } };
DDB.putItem(putParams, function (err) { callback(null, { statusCode: err ? 500 : 200, body: err ? "Failed to connect: " + JSON.stringify(err) : "Connected." }); }); };package.json
{ "name": "onConnect", "version": "1.0.0", "description": "OnConnect function for WebSockets on API Gateway", "main": "src/app.js", "author": "SAM CLI", "license": "MIT", "dependencies": { "aws-sdk": "^2.434.0" } }Come environment variables inseriamo il nome della tabella delle connessioni:
exports.handler = async (event) => { try { console.log(event.requestContext.stage); // Query string parameters you can use to validate the connection // We verify the json Web Token let jwt = require('jsonwebtoken'); let secret_environment = 'JWT_SECRET'; let table_environment = 'TABLE_NAME'; let decoded = jwt.verify(event.queryStringParameters.token, process.env[secret_environment]); let userId = decoded.user_id; let roomId = decoded.room_id; let token = decoded.token; // The principal id can be anything, in this case we used let principalId = userId;
const AWS = require('aws-sdk'); const ddb = new AWS.DynamoDB({ apiVersion: '2012-08-10' });
let db_token, db_user_id, db_room_id, db_expire_date; let params = { TableName: process.env[table_environment], Key: { user_id: { S: userId.toString() }, room_id: { S: roomId.toString() } } }; await ddb.getItem(params, function(err, data) { if (err) { /* an error occurred */ console.log(err, err.stack); } else { // successful response db_user_id = data.Item.user_id.S; db_room_id = data.Item.room_id.S; db_token = data.Item.token.S; } }).promise();
let bcrypt = require("bcryptjs"); let compare_hash = bcrypt.compareSync(token, db_token); let compare_user_and_room = db_room_id.toString() === roomId.toString() && db_user_id.toString() === userId.toString(); let effect = (compare_hash && compare_user_and_room) ? 'Allow' : 'Deny'; // Set 'Allow' or 'Deny' to decide if one can connect or not console.log(effect); return generatePolicy(principalId, effect, event.methodArn, userId, roomId); } catch(e) { console.log(e.stack); return null; } };
let generatePolicy = function(principalId, effect, resource, user_id, room_id) { // Required output: var authResponse = {}; authResponse.principalId = principalId; if (effect && resource) { var policyDocument = {}; policyDocument.Version = '2012-10-17'; // default version policyDocument.Statement = []; var statementOne = {}; statementOne.Action = 'execute-api:Invoke'; // default action statementOne.Effect = effect; statementOne.Resource = resource; policyDocument.Statement[0] = statementOne; authResponse.policyDocument = policyDocument; } // Optional output with custom properties of the String, Number or Boolean type. authResponse.context = { room_id: room_id, user_id: user_id }; return authResponse; }package.json
{ "name": "authorizer", "version": "1.0.0", "description": "Authorizer function for WebSockets on API Gateway", "main": "index.js", "author": "beSharp", "license": "MIT", "dependencies": { "bcryptjs": "^2.4.3", "jsonwebtoken": "^8.5.1" } }E le variabili di ambiente:
var AWS = require("aws-sdk"); AWS.config.update({ region: process.env.AWS_REGION }); var DDB = new AWS.DynamoDB({ apiVersion: "2012-10-08" });
exports.handler = function (event, context, callback) { console.log(event.requestContext.stage); let table_environment = 'TABLE_NAME_' + event.requestContext.stage.toUpperCase(); var deleteParams = { TableName: process.env[table_environment], Key: { connection_id: { S: event.requestContext.connectionId } } };
DDB.deleteItem(deleteParams, function (err) { callback(null, { statusCode: err ? 500 : 200, body: err ? "Failed to disconnect: " + JSON.stringify(err) : " Disconnected." }); }); };In pratica ogni qualvolta API Gateway deve scollegare un utente dal canale, invoca la funzione associata alla $disconnect che provvede ad eliminare la riga corrispondente alla connection_id dalla tabella delle connessioni di DynamoDB.Passiamo ora all’invio dei messaggi con sendMessage:
const AWS = require('aws-sdk'); require('./patch.js'); const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10' }); const dynamodb = new AWS.DynamoDB({apiVersion: '2011-12-05'});
const { ENDPOINT } = process.env;
exports.handler = async (event, context,callback) => { let environment =undefined; let table = process.env.TABLE_NAME;
let user_id= undefined; let room_id = undefined; let postData = undefined; let messageBody = {};
// if there are records we re in the sqs trigger put the event from the message of the SQS message received. if (event.Records != undefined) { let body = JSON.parse(event.Records[0].body); user_id = body.user_id; room_id = body.room_id; messageBody = {user_id: user_id }; postData = body.data; } else { postData = JSON.parse(event.body).data; const connection_sender = event.requestContext.connectionId; // get sender info on dynamo db table of connections
let params = { Key: { HashKeyElement: { S: connection_sender } }, TableName: table }; let sender_item = undefined;
//get the row of dynamo referred to the connection_id sender. await dynamodb.getItem(params, function(err, data) { if (err) console.log(err, err.stack); // an error occurred else { sender_item = data; user_id = sender_item.Item.user_id.S; room_id = sender_item.Item.room_id.S; // adding in the messageBody the user_id of the sender messageBody = {user_id: user_id }; } // successful response }).promise(); }
//retrieve all connection for the selected room_id. let connectionData; try { let scanParam = { TableName: table, ProjectionExpression: 'connection_id, user_id', FilterExpression: '#roomId = :roomId', ExpressionAttributeNames: { '#roomId': 'room_id', }, ExpressionAttributeValues: { ':roomId': room_id, } }; connectionData = await ddb.scan(scanParam).promise(); } catch (e) { return { statusCode: 500, body: e.stack }; } let url = (ENDPOINT === "") ? event.requestContext.domainName + '/' + event.requestContext.stage : ENDPOINT + "/"+ environment.toLowerCase();
console.log(url);
const apigwManagementApi = new AWS.ApiGatewayManagementApi({ apiVersion: '2018-11-29', endpoint: url });
// for each connection send a message through apigwManagementApi const postCalls = connectionData.Items.map(async ({ connection_id }) => {
try { messageBody.data = postData; await apigwManagementApi.postToConnection({ ConnectionId: connection_id, Data: JSON.stringify(messageBody)}).promise(); console.log("posted"); } catch (e) { console.error(e); //if the connection in a stale status, the connection will be closed, and the connection_id will be removed from the dynamoDB table if (e.statusCode === 410) { console.log(`Found stale connection, deleting ${connection_id}`); var deleteParams = { TableName: table, Key: { connection_id: { S: connection_id } } }; await ddb.deleteItem(deleteParams, function (err) { callback(null, { statusCode: err ? 500 : 200, body: err ? "Failed to disconnect: " + JSON.stringify(err) : " Disconnected." }).promise(); }); } else { return { statusCode: 500, body: e.stack }; } } });
try { await Promise.all(postCalls); } catch (e) { return { statusCode: 500, body: e.stack }; }
eturn { statusCode: 200, body: 'Data sent.' }; };Passiamo ora ad aggiungere le variabili d’ambiente:
sqs = Aws::SQS::Client.new(region: region) payload = JSON.generate(user_id: user_id, room_id: room_id, data: message) params = { message_body: payload, queue_url: config.queue_url }
sqs.send_message(params)Altrimenti lato client si può utilizzare questo esempio:
function sendMessage(){ ws.send(JSON.stringify( {“action”: “sendMessage”, “data” : “message” } ); }Con questo si conclude il nostro tutorial su come sviluppare un microservizio di gestione notifiche basato sui servizi AWS, completo di autenticazione tramite API Gateway, DynamoDB, SQS e Lambda.Uno degli aspetti più interessanti della soluzione è sicuramente la sua compattezza - tutta la logica è implementata in sole quattro funzioni Lambda piuttosto semplici - mantenendo al contempo una certa versatilità.Nel caso si voglia gestire il lifecycle dell’infrastruttura secondo il paradigma dell’Infrastructure-as-Code, ad esempio tramite un SAM template, un ultimo accorgimento potrebbe essere la creazione di un record ALIAS di Route53 per disaccoppiare l’URL chiamato dal client da quello di API Gateway che, essendo generato in maniera programmatica, cambia ad ogni deploy.Vuoi saperne di più su questa soluzione e su come beSharp può aiutarti nello sviluppo di applicazioni Serverless e Cloud-native su AWS? Contattaci!