Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check potential concurrency issues when retrieving policies on subscription #153

Open
github-actions bot opened this issue Oct 24, 2022 · 0 comments
Labels
enhancement New feature or request todo
Milestone

Comments

@github-actions
Copy link
Contributor

// TODO: Concurrency

import express from "express"
import bp from "body-parser"
import { createLibp2p } from 'libp2p'
import { TCP } from '@libp2p/tcp'
import { WebSockets } from '@libp2p/websockets'
import { Mplex } from '@libp2p/mplex'
import { Noise } from '@chainsafe/libp2p-noise'
import { CID } from 'multiformats/cid'
import { KadDHT } from '@libp2p/kad-dht'
import all from 'it-all'
import delay from 'delay'
import { Bootstrap } from '@libp2p/bootstrap'
import * as json from 'multiformats/codecs/json'
import { sha256 } from 'multiformats/hashes/sha2'
import { FloodSub } from '@libp2p/floodsub'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { MulticastDNS } from '@libp2p/mdns'
import axios from 'axios'
import fs from 'fs'
import { Multiaddr } from "@multiformats/multiaddr";
import dns from "dns/promises";
import cors from 'cors';

// Configuration for the port used by this node
const server_port = process.env.SERVER_PORT || 8099
// Uri of the Anubis API connected to this middleware
const anubis_api_uri = process.env.ANUBIS_API_URI || "127.0.0.1:8085"
// The multiaddress format address this middleware listens on
const listen_address = process.env.LISTEN_ADDRESS || '/dnsaddr/localhost/tcp/49662'
// Is this a private organisation? (Private org won't share policies that aren't of a specific user only)
const is_private_org = process.env.IS_PRIVATE_ORG || "true"

// Convert DNS address to IP address (solves Docker issues)
var listen_ma = new Multiaddr(listen_address)
var options = listen_ma.toOptions()
if(listen_address.includes("dnsaddr") && options.host != 'localhost') {
  const lookup = await dns.lookup(options.host)
  listen_ma = new Multiaddr(listen_ma.toString().replace(options.host, lookup.address).replace("dnsaddr", "ip4"))
}

// Setting up Node app
var app = express()
app.use(cors())
app.use(bp.json())
app.use(bp.urlencoded({ extended: true }))

// Keeping track of resources being provided
var providedResources = []

// Setting up the Libp2p node
const node = await createLibp2p({
  addresses: {
    listen: [listen_ma]
  },
  transports: [new TCP(), new WebSockets()],
  streamMuxers: [new Mplex()],
  connectionEncryption: [new Noise()],
  dht: new KadDHT(),
  peerDiscovery: [
    new MulticastDNS({
      interval: 20e3
    })
  ],
  connectionManager: {
    autoDial: true
  },
  pubsub: new FloodSub(),
  relay: {
    enabled: true,
    hop: {
      enabled: true
    },
    advertise: {
      enabled: true,
    }
  }
})

// Endpoint to retrieve node metadata
app.get('/metadata', async(req, res) => {
  res.json({"policy_api_uri": anubis_api_uri})
})

// Endpoint for receiving resources from the mobile app
app.post('/resource/mobile/retrieve', async(req, res) => {
  if (!Object.keys(req.body).length) {
   return res.status(400).json({
     message: "Request body cannot be empty",
   })
  }
  var { resource, service, servicepath } = req.body
  if (!resource) {
   res.status(400).json({
     message: "Ensure you sent a resource field",
   })
  }
  if (!service) {
   res.status(400).json({
     message: "Ensure you sent a service field",
   })
  }
  if (!servicepath) {
   res.status(400).json({
     message: "Ensure you sent a servicepath field",
   })
  }

  // TODO: Email of the user
  var responseData = [
      {
          "usrMail": "[email protected]",
          "usrData": [
              {
                  "id": resource,
                  "description": "test",
                  "children": []
              }
          ]
      }
  ]

  const bytes = json.encode({ resource: resource })
  const hash = await sha256.digest(bytes)
  const cid = CID.create(1, json.code, hash)

  var providers = []
  try {
    providers = await all(node.contentRouting.findProviders(cid, { timeout: 3000 }))
  }
  catch(error) {
    res.end(`No providers for ${resource}`)
    return
  }
  for (const provider of providers) {
    var providerPolicyApi = null
    await axios({
      method: 'get',
      url: `http://${provider.multiaddrs[0].nodeAddress().address}:8098/metadata`
    })
    .then(async function (response) {
      providerPolicyApi = response.data["policy_api_uri"]
    })
    .catch(function (error) {
      console.log(`Can't retrieve policy API URL for provider ${provider.multiaddrs[0].nodeAddress().address}`)
    })
    if(!providerPolicyApi) {
      continue
    }
    await axios({
      method: 'get',
      url: `http://${providerPolicyApi}/v1/policies`,
      headers: {
        'fiware-Service': service,
        'fiware-Servicepath': servicepath
      },
      params: {
        'resource': resource
      }
    })
    .then(async function (response) {
      for (const policy_entry of response.data) {
        if(is_private_org != "true") {
          var filtered_agents = policy_entry.agent.filter(a => !a.includes("acl:agent:"))
          if(filtered_agents.length > 0) {
            continue
          }
        }
        responseData[0].usrData[0].children.push({"id": policy_entry["id"], "actorType": policy_entry["agent"], "mode": policy_entry["mode"]})
      }
    })
    .catch(function (error) {
      console.log(error.response.data)
    })
  }
  res.json({responseData})
})

// Endpoint for providing a resource from the mobile app
app.post('/resource/mobile/send', async(req, res) => {
  if (!Object.keys(req.body).length) {
   return res.status(400).json({
     message: "Request body cannot be empty",
   })
  }
  var { policies, service, servicepath } = req.body
  if (!policies) {
   res.status(400).json({
     message: "Ensure you sent a policies field",
   })
  }
  if (!service) {
   res.status(400).json({
     message: "Ensure you sent a service field",
   })
  }
  if (!servicepath) {
   res.status(400).json({
     message: "Ensure you sent a servicepath field",
   })
  }
  for(const entry of policies) {
    const usrMail = entry.usrMail
    for(const resource of entry.usrData) {
      const resId = resource.id
      for(const policy of resource.children) {
        var modes = []
        for(const m of policy.mode.split(",")) {
          modes.push(m)
        }
        modes = modes.filter(e => e != '')
        var new_policy = {
            "id": policy.id,
            "access_to": resId,
            "resource_type": "mobile",
            "mode": modes,
            "agent": [
                policy.actorType
            ]
        }
        var message = {
          "action": "send_mobile",
          "policy": new_policy,
          "service": service,
          "servicepath": servicepath,
        }
        message = JSON.stringify(message)
        await node.pubsub.publish(resId, uint8ArrayFromString(message)).catch(err => {
          console.error(err)
          res.end(`Error: ${err}`)
        })
      }
    }
  }
  res.json({})
})

// Endpoint for providing a resource
app.post('/resource/provide', async(req, res) => {
  if (!Object.keys(req.body).length) {
   return res.status(400).json({
     message: "Request body cannot be empty",
   })
  }
  var { resource } = req.body
  if (!resource) {
   res.status(400).json({
     message: "Ensure you sent a resource field",
   })
  }

  const bytes = json.encode({ resource: resource })
  const hash = await sha256.digest(bytes)
  const cid = CID.create(1, json.code, hash)
  await node.contentRouting.provide(cid)
  providedResources.push(resource)

  console.log(`Provided policy for resource ${resource}`)
  res.end(`Provided policy for resource ${resource}`)
})

// Endpoint for subscribing to a resource topic
app.post('/resource/subscribe', async(req, res) => {
  if (!Object.keys(req.body).length) {
   return res.status(400).json({
     message: "Request body cannot be empty",
   })
  }
  var { resource, policy, service, servicepath } = req.body
  if (!resource) {
   res.status(400).json({
     message: "Ensure you sent a resource field",
   })
  }
  if (!service) {
   res.status(400).json({
     message: "Ensure you sent a service field",
   })
  }
  if (!servicepath) {
   res.status(400).json({
     message: "Ensure you sent a servicepath field",
   })
  }

  const topics = await node.pubsub.getTopics()
  if (!topics.includes(resource)) {
    await node.pubsub.subscribe(resource)
    console.log(`Subscribed to ${resource}`)
  }

  const bytes = json.encode({ resource: resource })
  const hash = await sha256.digest(bytes)
  const cid = CID.create(1, json.code, hash)

  var providers = []
  try {
    providers = await all(node.contentRouting.findProviders(cid, { timeout: 3000 }))
  }
  catch(error) {
    res.end(`Subscribed to ${resource}, no other providers found`)
    return
  }
  console.log(`Syncing with other providers for ${resource}...`)
  for (const provider of providers) {
    var providerPolicyApi = null
    await axios({
      method: 'get',
      url: `http://${provider.multiaddrs[0].nodeAddress().address}:8098/metadata`
    })
    .then(async function (response) {
      providerPolicyApi = response.data["policy_api_uri"]
    })
    .catch(function (error) {
      console.log(`Can't retrieve policy API URL for provider ${provider.multiaddrs[0].nodeAddress().address}`)
    })
    if(!providerPolicyApi) {
      continue
    }
    await axios({
      method: 'post',
      url: `http://${anubis_api_uri}/v1/tenants/`,
      data: {"name": service}
    })
    .then(async function (response) {
      console.log(`Created Tenant ${service}`)
    })
    .catch(function (error) {
      console.log(`No new Tenant created`)
    })
    await axios({
      method: 'get',
      url: `http://${providerPolicyApi}/v1/policies`,
      headers: {
        'fiware-Service': service,
        'fiware-Servicepath': servicepath
      },
      params: {
        'resource': resource
      }
    })
    .then(async function (response) {
      for (const policy_entry of response.data) {
        // TODO: Concurrency
        if(is_private_org != "true") {
          var filtered_agents = policy_entry.agent.filter(a => !a.includes("acl:agent:"))
          if(filtered_agents.length > 0) {
            continue
          }
        }
        await axios({
          method: 'post',
          url: `http://${anubis_api_uri}/v1/policies`,
          headers: {
            'fiware-Service': service,
            'fiware-Servicepath': servicepath
          },
          data: policy_entry
        })
        .then(function (r) {
          console.log(r.data)
        })
        .catch(function (err) {
          console.log(err.response.data)
        })
      }
    })
    .catch(function (error) {
      console.log(error.response.data)
    })
  }
  res.end(`Subscribed to ${resource}`)
})

// Endpoint when a new policy is created
app.post('/resource/policy/new', async(req, res) => {
  if (!Object.keys(req.body).length) {
   return res.status(400).json({
     message: "Request body cannot be empty",
   })
  }
  var { resource, policy, service, servicepath } = req.body
  if (!resource) {
   res.status(400).json({
     message: "Ensure you sent a resource field",
   })
  }
  if (!policy) {
   res.status(400).json({
     message: "Ensure you sent a policy field",
   })
  }
  if (!service) {
   res.status(400).json({
     message: "Ensure you sent a service field",
   })
  }
  if (!servicepath) {
   res.status(400).json({
     message: "Ensure you sent a servicepath field",
   })
  }

  var message = {
    "action": "post",
    "policy_id": policy,
    "service": service,
    "servicepath": servicepath,
  }
  message = JSON.stringify(message)
  await node.pubsub.publish(resource, uint8ArrayFromString(message)).catch(err => {
    console.error(err)
    res.end(`Error: ${err}`)
  })

  res.end("Policy message sent: " + message)
  console.log("Policy message sent: " + message)
})

// Endpoint when a policy is updated
app.post('/resource/policy/update', async(req, res) => {
  if (!Object.keys(req.body).length) {
   return res.status(400).json({
     message: "Request body cannot be empty",
   })
  }
  var { resource, policy, service, servicepath } = req.body
  if (!resource) {
   res.status(400).json({
     message: "Ensure you sent a resource field",
   })
  }
  if (!policy) {
   res.status(400).json({
     message: "Ensure you sent a policy field",
   })
  }
  if (!service) {
   res.status(400).json({
     message: "Ensure you sent a service field",
   })
  }
  if (!servicepath) {
   res.status(400).json({
     message: "Ensure you sent a servicepath field",
   })
  }

  var message = {
    "action": "put",
    "policy_id": policy,
    "service": service,
    "servicepath": servicepath,
  }
  message = JSON.stringify(message)
  await node.pubsub.publish(resource, uint8ArrayFromString(message)).catch(err => {
    console.error(err)
    res.end(`Error: ${err}`)
  })

  res.end("Policy message sent: " + message)
  console.log("Policy message sent: " + message)
})

// Endpoint when a policy is deleted
app.post('/resource/policy/delete', async(req, res) => {
  if (!Object.keys(req.body).length) {
   return res.status(400).json({
     message: "Request body cannot be empty",
   })
  }
  var { resource, policy, service, servicepath } = req.body
  if (!resource) {
   res.status(400).json({
     message: "Ensure you sent a resource field",
   })
  }
  if (!policy) {
   res.status(400).json({
     message: "Ensure you sent a policy field",
   })
  }
  if (!service) {
   res.status(400).json({
     message: "Ensure you sent a service field",
   })
  }
  if (!servicepath) {
   res.status(400).json({
     message: "Ensure you sent a servicepath field",
   })
  }

  var message = {
    "action": "delete",
    "policy_id": policy,
    "service": service,
    "servicepath": servicepath,
  }
  message = JSON.stringify(message)
  await node.pubsub.publish(resource, uint8ArrayFromString(message)).catch(err => {
    console.error(err)
    res.end(`Error: ${err}`)
  })

  res.end("Policy message sent: " + message)
  console.log("Policy message sent: " + message)
})

// Function to process a message arriving on a topic (resource)
async function processTopicMessage(evt) {
  const sender = await node.peerStore.addressBook.get(evt.detail.from)
  const message = JSON.parse(uint8ArrayToString(evt.detail.data))
  console.log(`Node received: ${uint8ArrayToString(evt.detail.data)} on topic ${evt.detail.topic}`)
  if(message.action == "send_mobile") {
    await axios({
      method: 'post',
      url: `http://${anubis_api_uri}/v1/tenants/`,
      data: {"name": message.service}
    })
    .then(async function (response) {
      console.log(`Created Tenant ${message.service}`)
    })
    .catch(function (error) {
      console.log(error)
    })
    await axios({
      method: 'post',
      url: `http://${anubis_api_uri}/v1/policies`,
      headers: {
        'fiware-Service': message.service,
        'fiware-Servicepath': message.servicepath
      },
      data: message.policy
    })
    .then(function (r) {
      console.log(r)
    })
    .catch(function (err) {
      console.log(err.response.data)
    })
    return
  }
  var providerPolicyApi = null
  await axios({
    method: 'get',
    url: `http://${sender[0].multiaddr.nodeAddress().address}:8098/metadata`
  })
  .then(async function (response) {
    providerPolicyApi = response.data["policy_api_uri"]
  })
  .catch(function (error) {
    console.log(error)
  })
  if(message.action == "delete") {
    await axios({
      method: 'delete',
      url: `http://${anubis_api_uri}/v1/policies/${message.policy_id}`,
      headers: {
        'fiware-Service': message.service,
        'fiware-Servicepath': message.servicepath
      },
      data: response.data
    })
    .then(function (r) {
      console.log(r)
    })
    .catch(function (err) {
      console.log(err)
    })
    return
  }
  await axios({
    method: 'get',
    url: `http://${providerPolicyApi}/v1/policies/${message.policy_id}`,
    headers: {
      'fiware-Service': message.service,
      'fiware-Servicepath': message.servicepath
    }
  })
  .then(async function (response) {
    if(message.action == "post") {
      await axios({
        method: 'post',
        url: `http://${anubis_api_uri}/v1/tenants/`,
        data: {"name": message.service}
      })
      .then(async function (response) {
        console.log(`Created Tenant ${resource}`)
      })
      .catch(function (error) {
        console.log(error)
      })
      await axios({
        method: 'post',
        url: `http://${anubis_api_uri}/v1/policies`,
        headers: {
          'fiware-Service': message.service,
          'fiware-Servicepath': message.servicepath
        },
        data: response.data
      })
      .then(function (r) {
        console.log(r)
      })
      .catch(function (err) {
        console.log(err.response.data)
      })
    }
    else if(message.action == "put") {
      await axios({
        method: 'put',
        url: `http://${anubis_api_uri}/v1/policies/${message.policy_id}`,
        headers: {
          'fiware-Service': message.service,
          'fiware-Servicepath': message.servicepath
        },
        data: response.data
      })
      .then(function (r) {
        console.log(r)
      })
      .catch(function (err) {
        console.log(err)
      })
    }
  })
  .catch(function (error) {
    console.log(error)
  })
}

// Saving config to file
async function saveConfiguration() {
  var persistentDataFileStream = fs.createWriteStream('data.json')
  const topics = await node.pubsub.getTopics()
  let data2 = JSON.stringify({"topics": topics, "resources": providedResources})
  persistentDataFileStream.write(data2)
  persistentDataFileStream.close()
}

// Starting server
var server = app.listen(server_port, async() => {

  await node.start()

  try {
    let rawdata = fs.readFileSync('data.json')
    let data = JSON.parse(rawdata)

    for(const resource of data.resources) {
      providedResources.push(resource)
      const bytes = json.encode({ resource: resource })
      const hash = await sha256.digest(bytes)
      const cid = CID.create(1, json.code, hash)
      try {
        await node.contentRouting.provide(cid)
      }
      catch(err) {
        console.log(`Failed to initially provide ${resource}`)
      }
    }

    for(const topic of data.topics) {
      node.pubsub.subscribe(topic)
    }

    await saveConfiguration()
  }
  catch(err) {
    console.log("Couldn't read any initial config")
  }

  console.log("Node started with:")
  node.getMultiaddrs().forEach((ma) => console.log(`${ma.toString()}`))

  node.connectionManager.addEventListener('peer:connect', (evt) => {
    const connection = evt.detail
    console.log('Connection established to:', connection.remotePeer.toString())
  })

  node.addEventListener('peer:discovery', async(evt) => {
    const peer = evt.detail
    if (node.peerId.toString() == peer.id.toString()) {
      return
    }
    var peerId = node.peerStore.addressBook.get(peer.id)
    if (!peerId) {
      console.log('Discovered:', peer.id.toString())
      node.peerStore.addressBook.set(peer.id, peer.multiaddrs)
      node.dial(peer.id)
    }
  })

  await node.pubsub.addEventListener("message", (evt) => processTopicMessage(evt))

  await delay(1000)

  console.log(node.peerId.toString())

  var host = server.address().address
  var port = server.address().port
  console.log("App listening at http://%s:%s", host, port)
})
@github-actions github-actions bot added the todo label Oct 24, 2022
@chicco785 chicco785 added this to the 1.0 milestone Oct 27, 2022
@chicco785 chicco785 changed the title Concurrency check potential concurrency issues when retrieving policies on subscription Oct 27, 2022
@chicco785 chicco785 added the enhancement New feature or request label Oct 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request todo
Projects
None yet
Development

No branches or pull requests

1 participant