crypto.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import base64
  2. from Crypto.PublicKey import RSA
  3. from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
  4. from Crypto.Cipher import AES
  5. from functools import wraps
  6. from utils.http import make_json_response
  7. import json
  8. IV = '16-Bytes--String'
  9. with open('public.rsa') as f:
  10. key = f.read()
  11. public_key = RSA.import_key(key)
  12. public_cipher = PKCS1_cipher.new(public_key)
  13. print(public_key.exportKey().decode(encoding='utf-8'))
  14. with open('private.rsa') as f:
  15. key = f.read()
  16. private_key = RSA.import_key(key)
  17. private_cipher = PKCS1_cipher.new(private_key)
  18. # print(private_key.exportKey().decode(encoding='utf-8'))
  19. # 安全传输decorator
  20. def secure_transport(view_func):
  21. @wraps(view_func)
  22. def _wrapped_view(request, *args, **kwargs):
  23. data = request.POST
  24. enc_key = data.get('enc_key')
  25. cipher_text = data.get('cipher_text')
  26. if not enc_key or not cipher_text:
  27. print('无加密')
  28. return view_func(request, *args, **kwargs)
  29. aes_key = private_cipher.decrypt(base64.b64decode(enc_key.encode('utf-8')), b'error').decode('utf-8')
  30. print(f'key={aes_key}')
  31. aes_cipher = AES.new(aes_key.encode('utf-8'), AES.MODE_CBC, IV.encode('utf-8'))
  32. decrypted = aes_cipher.decrypt(base64.b64decode(cipher_text.encode('utf-8')))
  33. # print(decrypted)
  34. decrypted = decrypted[:-decrypted[-1]]
  35. # print(decrypted)
  36. plain_text = decrypted.decode('utf-8')
  37. print(plain_text)
  38. try:
  39. loaded = json.loads(plain_text)
  40. except:
  41. print('不是json')
  42. loaded = {}
  43. for p in map(lambda s: s.split('='), plain_text.split('&')):
  44. loaded[p[0]] = p[1]
  45. dec_request = request
  46. dec_request.POST = {**request.POST, **loaded}
  47. raw_response = view_func(dec_request, *args, **kwargs)
  48. content = raw_response.content
  49. padding = 16 - len(content) % 16
  50. content += bytes([padding] * padding)
  51. print(content)
  52. aes_cipher = AES.new(aes_key.encode('utf-8'), AES.MODE_CBC, IV.encode('utf-8'))
  53. enc_content = base64.b64encode(aes_cipher.encrypt(content)).decode('utf-8')
  54. print(enc_content)
  55. return make_json_response(enc_content=enc_content)
  56. return _wrapped_view
  57. def test():
  58. plain_text = '{"username": "user1"}'
  59. encrypted = public_cipher.encrypt(bytes(plain_text.encode('utf8')))
  60. cipher_text = base64.b64encode(encrypted)
  61. print(cipher_text.decode('utf8'))
  62. decrypted = private_cipher.decrypt(base64.b64decode(cipher_text), b'error')
  63. print(decrypted.decode('utf8'))
  64. # print(public_key.exportKey().decode('utf-8'))
  65. # random_generator = Random.new().read
  66. # rsa = RSA.generate(2048, random_generator)
  67. # private_key = rsa.exportKey()
  68. # print(private_key.decode('utf-8'))