You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
importexpressfrom"express"importbpfrom"body-parser"import{createLibp2p}from'libp2p'import{TCP}from'@libp2p/tcp'import{WebSockets}from'@libp2p/websockets'import{Mplex}from'@libp2p/mplex'import{Noise}from'@chainsafe/libp2p-noise'import{CID}from'multiformats/cid'import{KadDHT}from'@libp2p/kad-dht'importallfrom'it-all'importdelayfrom'delay'import{Bootstrap}from'@libp2p/bootstrap'import*asjsonfrom'multiformats/codecs/json'import{sha256}from'multiformats/hashes/sha2'import{FloodSub}from'@libp2p/floodsub'import{fromStringasuint8ArrayFromString}from'uint8arrays/from-string'import{toStringasuint8ArrayToString}from'uint8arrays/to-string'import{MulticastDNS}from'@libp2p/mdns'importaxiosfrom'axios'importfsfrom'fs'import{Multiaddr}from"@multiformats/multiaddr";importdnsfrom"dns/promises";importcorsfrom'cors';// Configuration for the port used by this nodeconstserver_port=process.env.SERVER_PORT||8099// Uri of the Anubis API connected to this middlewareconstanubis_api_uri=process.env.ANUBIS_API_URI||"127.0.0.1:8085"// The multiaddress format address this middleware listens onconstlisten_address=process.env.LISTEN_ADDRESS||'/dnsaddr/localhost/tcp/49662'// Is this a private organisation? (Private org won't share policies that aren't of a specific user only)constis_private_org=process.env.IS_PRIVATE_ORG||"true"// Convert DNS address to IP address (solves Docker issues)varlisten_ma=newMultiaddr(listen_address)varoptions=listen_ma.toOptions()if(listen_address.includes("dnsaddr")&&options.host!='localhost'){constlookup=awaitdns.lookup(options.host)listen_ma=newMultiaddr(listen_ma.toString().replace(options.host,lookup.address).replace("dnsaddr","ip4"))}// Setting up Node appvarapp=express()app.use(cors())app.use(bp.json())app.use(bp.urlencoded({extended: true}))// Keeping track of resources being providedvarprovidedResources=[]// Setting up the Libp2p nodeconstnode=awaitcreateLibp2p({addresses: {listen: [listen_ma]},transports: [newTCP(),newWebSockets()],streamMuxers: [newMplex()],connectionEncryption: [newNoise()],dht: newKadDHT(),peerDiscovery: [newMulticastDNS({interval: 20e3})],connectionManager: {autoDial: true},pubsub: newFloodSub(),relay: {enabled: true,hop: {enabled: true},advertise: {enabled: true,}}})// Endpoint to retrieve node metadataapp.get('/metadata',async(req,res)=>{res.json({"policy_api_uri": anubis_api_uri})})// Endpoint for receiving resources from the mobile appapp.post('/resource/mobile/retrieve',async(req,res)=>{if(!Object.keys(req.body).length){returnres.status(400).json({message: "Request body cannot be empty",})}var{ resource, service, servicepath }=req.bodyif(!resource){res.status(400).json({message: "Ensure you sent a resource field",})}if(!service){res.status(400).json({message: "Ensure you sent a service field",})}if(!servicepath){res.status(400).json({message: "Ensure you sent a servicepath field",})}// TODO: Email of the uservarresponseData=[{"usrMail": "[email protected]","usrData": [{"id": resource,"description": "test","children": []}]}]constbytes=json.encode({resource: resource})consthash=awaitsha256.digest(bytes)constcid=CID.create(1,json.code,hash)varproviders=[]try{providers=awaitall(node.contentRouting.findProviders(cid,{timeout: 3000}))}catch(error){res.end(`No providers for ${resource}`)return}for(constproviderofproviders){varproviderPolicyApi=nullawaitaxios({method: 'get',url: `http://${provider.multiaddrs[0].nodeAddress().address}:8098/metadata`}).then(asyncfunction(response){providerPolicyApi=response.data["policy_api_uri"]}).catch(function(error){console.log(`Can't retrieve policy API URL for provider ${provider.multiaddrs[0].nodeAddress().address}`)})if(!providerPolicyApi){continue}awaitaxios({method: 'get',url: `http://${providerPolicyApi}/v1/policies`,headers: {'fiware-Service': service,'fiware-Servicepath': servicepath},params: {'resource': resource}}).then(asyncfunction(response){for(constpolicy_entryofresponse.data){if(is_private_org!="true"){varfiltered_agents=policy_entry.agent.filter(a=>!a.includes("acl:agent:"))if(filtered_agents.length>0){continue}}responseData[0].usrData[0].children.push({"id": policy_entry["id"],"actorType": policy_entry["agent"],"mode": policy_entry["mode"]})}}).catch(function(error){console.log(error.response.data)})}res.json({responseData})})// Endpoint for providing a resource from the mobile appapp.post('/resource/mobile/send',async(req,res)=>{if(!Object.keys(req.body).length){returnres.status(400).json({message: "Request body cannot be empty",})}var{ policies, service, servicepath }=req.bodyif(!policies){res.status(400).json({message: "Ensure you sent a policies field",})}if(!service){res.status(400).json({message: "Ensure you sent a service field",})}if(!servicepath){res.status(400).json({message: "Ensure you sent a servicepath field",})}for(constentryofpolicies){constusrMail=entry.usrMailfor(constresourceofentry.usrData){constresId=resource.idfor(constpolicyofresource.children){varmodes=[]for(constmofpolicy.mode.split(",")){modes.push(m)}modes=modes.filter(e=>e!='')varnew_policy={"id": policy.id,"access_to": resId,"resource_type": "mobile","mode": modes,"agent": [policy.actorType]}varmessage={"action": "send_mobile","policy": new_policy,"service": service,"servicepath": servicepath,}message=JSON.stringify(message)awaitnode.pubsub.publish(resId,uint8ArrayFromString(message)).catch(err=>{console.error(err)res.end(`Error: ${err}`)})}}}res.json({})})// Endpoint for providing a resourceapp.post('/resource/provide',async(req,res)=>{if(!Object.keys(req.body).length){returnres.status(400).json({message: "Request body cannot be empty",})}var{ resource }=req.bodyif(!resource){res.status(400).json({message: "Ensure you sent a resource field",})}constbytes=json.encode({resource: resource})consthash=awaitsha256.digest(bytes)constcid=CID.create(1,json.code,hash)awaitnode.contentRouting.provide(cid)providedResources.push(resource)console.log(`Provided policy for resource ${resource}`)res.end(`Provided policy for resource ${resource}`)})// Endpoint for subscribing to a resource topicapp.post('/resource/subscribe',async(req,res)=>{if(!Object.keys(req.body).length){returnres.status(400).json({message: "Request body cannot be empty",})}var{ resource, policy, service, servicepath }=req.bodyif(!resource){res.status(400).json({message: "Ensure you sent a resource field",})}if(!service){res.status(400).json({message: "Ensure you sent a service field",})}if(!servicepath){res.status(400).json({message: "Ensure you sent a servicepath field",})}consttopics=awaitnode.pubsub.getTopics()if(!topics.includes(resource)){awaitnode.pubsub.subscribe(resource)console.log(`Subscribed to ${resource}`)}constbytes=json.encode({resource: resource})consthash=awaitsha256.digest(bytes)constcid=CID.create(1,json.code,hash)varproviders=[]try{providers=awaitall(node.contentRouting.findProviders(cid,{timeout: 3000}))}catch(error){res.end(`Subscribed to ${resource}, no other providers found`)return}console.log(`Syncing with other providers for ${resource}...`)for(constproviderofproviders){varproviderPolicyApi=nullawaitaxios({method: 'get',url: `http://${provider.multiaddrs[0].nodeAddress().address}:8098/metadata`}).then(asyncfunction(response){providerPolicyApi=response.data["policy_api_uri"]}).catch(function(error){console.log(`Can't retrieve policy API URL for provider ${provider.multiaddrs[0].nodeAddress().address}`)})if(!providerPolicyApi){continue}awaitaxios({method: 'post',url: `http://${anubis_api_uri}/v1/tenants/`,data: {"name": service}}).then(asyncfunction(response){console.log(`Created Tenant ${service}`)}).catch(function(error){console.log(`No new Tenant created`)})awaitaxios({method: 'get',url: `http://${providerPolicyApi}/v1/policies`,headers: {'fiware-Service': service,'fiware-Servicepath': servicepath},params: {'resource': resource}}).then(asyncfunction(response){for(constpolicy_entryofresponse.data){// TODO: Concurrencyif(is_private_org!="true"){varfiltered_agents=policy_entry.agent.filter(a=>!a.includes("acl:agent:"))if(filtered_agents.length>0){continue}}awaitaxios({method: 'post',url: `http://${anubis_api_uri}/v1/policies`,headers: {'fiware-Service': service,'fiware-Servicepath': servicepath},data: policy_entry}).then(function(r){console.log(r.data)}).catch(function(err){console.log(err.response.data)})}}).catch(function(error){console.log(error.response.data)})}res.end(`Subscribed to ${resource}`)})// Endpoint when a new policy is createdapp.post('/resource/policy/new',async(req,res)=>{if(!Object.keys(req.body).length){returnres.status(400).json({message: "Request body cannot be empty",})}var{ resource, policy, service, servicepath }=req.bodyif(!resource){res.status(400).json({message: "Ensure you sent a resource field",})}if(!policy){res.status(400).json({message: "Ensure you sent a policy field",})}if(!service){res.status(400).json({message: "Ensure you sent a service field",})}if(!servicepath){res.status(400).json({message: "Ensure you sent a servicepath field",})}varmessage={"action": "post","policy_id": policy,"service": service,"servicepath": servicepath,}message=JSON.stringify(message)awaitnode.pubsub.publish(resource,uint8ArrayFromString(message)).catch(err=>{console.error(err)res.end(`Error: ${err}`)})res.end("Policy message sent: "+message)console.log("Policy message sent: "+message)})// Endpoint when a policy is updatedapp.post('/resource/policy/update',async(req,res)=>{if(!Object.keys(req.body).length){returnres.status(400).json({message: "Request body cannot be empty",})}var{ resource, policy, service, servicepath }=req.bodyif(!resource){res.status(400).json({message: "Ensure you sent a resource field",})}if(!policy){res.status(400).json({message: "Ensure you sent a policy field",})}if(!service){res.status(400).json({message: "Ensure you sent a service field",})}if(!servicepath){res.status(400).json({message: "Ensure you sent a servicepath field",})}varmessage={"action": "put","policy_id": policy,"service": service,"servicepath": servicepath,}message=JSON.stringify(message)awaitnode.pubsub.publish(resource,uint8ArrayFromString(message)).catch(err=>{console.error(err)res.end(`Error: ${err}`)})res.end("Policy message sent: "+message)console.log("Policy message sent: "+message)})// Endpoint when a policy is deletedapp.post('/resource/policy/delete',async(req,res)=>{if(!Object.keys(req.body).length){returnres.status(400).json({message: "Request body cannot be empty",})}var{ resource, policy, service, servicepath }=req.bodyif(!resource){res.status(400).json({message: "Ensure you sent a resource field",})}if(!policy){res.status(400).json({message: "Ensure you sent a policy field",})}if(!service){res.status(400).json({message: "Ensure you sent a service field",})}if(!servicepath){res.status(400).json({message: "Ensure you sent a servicepath field",})}varmessage={"action": "delete","policy_id": policy,"service": service,"servicepath": servicepath,}message=JSON.stringify(message)awaitnode.pubsub.publish(resource,uint8ArrayFromString(message)).catch(err=>{console.error(err)res.end(`Error: ${err}`)})res.end("Policy message sent: "+message)console.log("Policy message sent: "+message)})// Function to process a message arriving on a topic (resource)asyncfunctionprocessTopicMessage(evt){constsender=awaitnode.peerStore.addressBook.get(evt.detail.from)constmessage=JSON.parse(uint8ArrayToString(evt.detail.data))console.log(`Node received: ${uint8ArrayToString(evt.detail.data)} on topic ${evt.detail.topic}`)if(message.action=="send_mobile"){awaitaxios({method: 'post',url: `http://${anubis_api_uri}/v1/tenants/`,data: {"name": message.service}}).then(asyncfunction(response){console.log(`Created Tenant ${message.service}`)}).catch(function(error){console.log(error)})awaitaxios({method: 'post',url: `http://${anubis_api_uri}/v1/policies`,headers: {'fiware-Service': message.service,'fiware-Servicepath': message.servicepath},data: message.policy}).then(function(r){console.log(r)}).catch(function(err){console.log(err.response.data)})return}varproviderPolicyApi=nullawaitaxios({method: 'get',url: `http://${sender[0].multiaddr.nodeAddress().address}:8098/metadata`}).then(asyncfunction(response){providerPolicyApi=response.data["policy_api_uri"]}).catch(function(error){console.log(error)})if(message.action=="delete"){awaitaxios({method: 'delete',url: `http://${anubis_api_uri}/v1/policies/${message.policy_id}`,headers: {'fiware-Service': message.service,'fiware-Servicepath': message.servicepath},data: response.data}).then(function(r){console.log(r)}).catch(function(err){console.log(err)})return}awaitaxios({method: 'get',url: `http://${providerPolicyApi}/v1/policies/${message.policy_id}`,headers: {'fiware-Service': message.service,'fiware-Servicepath': message.servicepath}}).then(asyncfunction(response){if(message.action=="post"){awaitaxios({method: 'post',url: `http://${anubis_api_uri}/v1/tenants/`,data: {"name": message.service}}).then(asyncfunction(response){console.log(`Created Tenant ${resource}`)}).catch(function(error){console.log(error)})awaitaxios({method: 'post',url: `http://${anubis_api_uri}/v1/policies`,headers: {'fiware-Service': message.service,'fiware-Servicepath': message.servicepath},data: response.data}).then(function(r){console.log(r)}).catch(function(err){console.log(err.response.data)})}elseif(message.action=="put"){awaitaxios({method: 'put',url: `http://${anubis_api_uri}/v1/policies/${message.policy_id}`,headers: {'fiware-Service': message.service,'fiware-Servicepath': message.servicepath},data: response.data}).then(function(r){console.log(r)}).catch(function(err){console.log(err)})}}).catch(function(error){console.log(error)})}// Saving config to fileasyncfunctionsaveConfiguration(){varpersistentDataFileStream=fs.createWriteStream('data.json')consttopics=awaitnode.pubsub.getTopics()letdata2=JSON.stringify({"topics": topics,"resources": providedResources})persistentDataFileStream.write(data2)persistentDataFileStream.close()}// Starting servervarserver=app.listen(server_port,async()=>{awaitnode.start()try{letrawdata=fs.readFileSync('data.json')letdata=JSON.parse(rawdata)for(constresourceofdata.resources){providedResources.push(resource)constbytes=json.encode({resource: resource})consthash=awaitsha256.digest(bytes)constcid=CID.create(1,json.code,hash)try{awaitnode.contentRouting.provide(cid)}catch(err){console.log(`Failed to initially provide ${resource}`)}}for(consttopicofdata.topics){node.pubsub.subscribe(topic)}awaitsaveConfiguration()}catch(err){console.log("Couldn't read any initial config")}console.log("Node started with:")node.getMultiaddrs().forEach((ma)=>console.log(`${ma.toString()}`))node.connectionManager.addEventListener('peer:connect',(evt)=>{constconnection=evt.detailconsole.log('Connection established to:',connection.remotePeer.toString())})node.addEventListener('peer:discovery',async(evt)=>{constpeer=evt.detailif(node.peerId.toString()==peer.id.toString()){return}varpeerId=node.peerStore.addressBook.get(peer.id)if(!peerId){console.log('Discovered:',peer.id.toString())node.peerStore.addressBook.set(peer.id,peer.multiaddrs)node.dial(peer.id)}})awaitnode.pubsub.addEventListener("message",(evt)=>processTopicMessage(evt))awaitdelay(1000)console.log(node.peerId.toString())varhost=server.address().addressvarport=server.address().portconsole.log("App listening at http://%s:%s",host,port)})
The text was updated successfully, but these errors were encountered:
anubis/policy-governance-middleware/src/main.js
Line 344 in a4127fe
The text was updated successfully, but these errors were encountered: