-
Notifications
You must be signed in to change notification settings - Fork 0
/
RepoSocketPublisher.py
53 lines (45 loc) · 2.05 KB
/
RepoSocketPublisher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import socket
class RepoSocketPublisher:
def __init__(self, repo_port):
self.repo_dest = ('127.0.0.1', int(repo_port))
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(self.repo_dest)
def put(self, data):
wire = data.wireEncode()
self.sock.sendall(str(bytearray(wire.toBuffer())))
if __name__ == "__main__":
from pyndn import Name
from pyndn import Data
from pyndn import ContentType
from pyndn import KeyLocatorType
from pyndn import Sha256WithRsaSignature
from pyndn.security import KeyType
from pyndn.security import KeyChain
from pyndn.security.identity import IdentityManager
from pyndn.security.identity import MemoryIdentityStorage
from pyndn.security.identity import MemoryPrivateKeyStorage
from pyndn.security.policy import SelfVerifyPolicyManager
from pyndn.util import Blob
from Crypto.PublicKey import RSA
from utils import getKeyID
publisher = RepoSocketPublisher(12345)
data = Data(Name("/localhost/repo-ng/test/001"))
data.setContent("SUCCESS!")
data.getMetaInfo().setFreshnessPeriod(1000000)
identityStorage = MemoryIdentityStorage()
privateKeyStorage = MemoryPrivateKeyStorage()
keyChain = KeyChain(IdentityManager(identityStorage, privateKeyStorage),
SelfVerifyPolicyManager(identityStorage))
keyfile = "../keychain/keys/pub_user.pem"
f = open(keyfile, "r")
key = RSA.importKey(f.read())
key_name = Name("/localhost/repo-ng/test/signer")
signer_name = Name(key_name).append(getKeyID(key))
key_pub_der = bytearray(key.publickey().exportKey(format="DER"))
key_pri_der = bytearray(key.exportKey(format="DER"))
identityStorage.addKey(signer_name, KeyType.RSA, Blob(key_pub_der))
privateKeyStorage.setKeyPairForKeyName(signer_name, key_pub_der, key_pri_der)
cert_name = signer_name.getSubName(0, signer_name.size() - 1).append(
"KEY").append(signer_name[-1]).append("ID-CERT").append("0")
keyChain.sign(data, cert_name)
publisher.put(data)