Source code for PyKCS11

#   Copyright (C) 2006-2015 Ludovic Rousseau (ludovic.rousseau@free.fr)
#   Copyright (C) 2010 Giuseppe Amato (additions to original interface)
#
# This file is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA.

from __future__ import print_function

import PyKCS11.LowLevel
import os
import sys


# redefine PKCS#11 constants
CK_TRUE = PyKCS11.LowLevel.CK_TRUE
CK_FALSE = PyKCS11.LowLevel.CK_FALSE
CK_UNAVAILABLE_INFORMATION = PyKCS11.LowLevel.CK_UNAVAILABLE_INFORMATION
CK_EFFECTIVELY_INFINITE = PyKCS11.LowLevel.CK_EFFECTIVELY_INFINITE
CK_INVALID_HANDLE = PyKCS11.LowLevel.CK_INVALID_HANDLE

CKA = {}
CKC = {}
CKD = {}
CKF = {}
CKG = {}
CKH = {}
CKK = {}
CKM = {}
CKO = {}
CKR = {}
CKS = {}
CKU = {}
CKZ = {}

# redefine PKCS#11 constants using well known prefixes
for x in PyKCS11.LowLevel.__dict__.keys():
    if x[:4] == 'CKA_' \
      or x[:4] == 'CKC_' \
      or x[:4] == 'CKD_' \
      or x[:4] == 'CKF_' \
      or x[:4] == 'CKG_' \
      or x[:4] == 'CKH_' \
      or x[:4] == 'CKK_' \
      or x[:4] == 'CKM_' \
      or x[:4] == 'CKO_' \
      or x[:4] == 'CKR_' \
      or x[:4] == 'CKS_' \
      or x[:4] == 'CKU_' \
      or x[:4] == 'CKZ_':
        a = "{}=PyKCS11.LowLevel.{}".format(x, x)
        exec(a)
        if x[3:] != "_VENDOR_DEFINED":
            eval(x[:3])[eval(x)] = x  # => CKM[CKM_RSA_PKCS] = 'CKM_RSA_PKCS'
            eval(x[:3])[x] = eval(x)  # => CKM['CKM_RSA_PKCS'] = CKM_RSA_PKCS

# special CKR[] values
CKR[-4] = "C_GetFunctionList() not found"
CKR[-3] = "Unknown format"
CKR[-2] = "Unkown PKCS#11 type"
CKR[-1] = "Load"


[docs] class ckbytelist(PyKCS11.LowLevel.ckbytelist): """ add a __repr__() method to the LowLevel equivalent """ def __init__(self, data=None): if data is None: data = 0 elif isinstance(data, str): data = data.encode("utf-8") elif isinstance(data, (bytes, list, ckbytelist)): data = bytes(data) else: raise PyKCS11.PyKCS11Error(-3, text=str(type(data))) super(ckbytelist, self).__init__(data) def __repr__(self): """ return the representation of a tuple the __str__ method will use it also """ rep = [int(elt) for elt in self] return repr(rep)
[docs] class CK_OBJECT_HANDLE(PyKCS11.LowLevel.CK_OBJECT_HANDLE): """ add a __repr__() method to the LowLevel equivalent """ def __init__(self, session): PyKCS11.LowLevel.CK_OBJECT_HANDLE.__init__(self) self.session = session pass
[docs] def to_dict(self): """ convert the fields of the object into a dictionnary """ # all the attibutes defined by PKCS#11 all_attributes = PyKCS11.CKA.keys() # only use the integer values and not the strings like 'CKM_RSA_PKCS' all_attributes = [attr for attr in all_attributes if isinstance(attr, int)] # all the attributes of the object attributes = self.session.getAttributeValue(self, all_attributes) dico = dict() for key, attr in zip(all_attributes, attributes): if attr is None: continue if key == CKA_CLASS: dico[PyKCS11.CKA[key]] = PyKCS11.CKO[attr] elif key == CKA_CERTIFICATE_TYPE: dico[PyKCS11.CKA[key]] = PyKCS11.CKC[attr] elif key == CKA_KEY_TYPE: dico[PyKCS11.CKA[key]] = PyKCS11.CKK[attr] else: dico[PyKCS11.CKA[key]] = attr return dico
def __repr__(self): """ text representation of the object """ dico = self.to_dict() lines = list() for key in sorted(dico.keys()): lines.append("{}: {}".format(key, dico[key])) return "\n".join(lines)
[docs] class CkClass(object): """ Base class for CK_* classes """ # dictionnary of integer_value: text_value for the flags bits flags_dict = dict() # dictionnary of fields names and types # type can be "pair", "flags" or "text" fields = dict() flags = 0
[docs] def flags2text(self): """ parse the `self.flags` field and create a list of `CKF_*` strings corresponding to bits set in flags :return: a list of strings :rtype: list """ r = [] for v in self.flags_dict.keys(): if self.flags & v: r.append(self.flags_dict[v]) return r
[docs] def to_dict(self): """ convert the fields of the object into a dictionnary """ dico = dict() for field in self.fields.keys(): if field == "flags": dico[field] = self.flags2text() elif field == "state": dico[field] = self.state2text() else: dico[field] = eval("self." + field) return dico
def __str__(self): """ text representation of the object """ dico = self.to_dict() lines = list() for key in sorted(dico.keys()): type = self.fields[key] if type == "flags": lines.append("{}: {}".format(key, ", ".join(dico[key]))) elif type == "pair": lines.append("%s: " % key + "%d.%d" % dico[key]) else: lines.append("{}: {}".format(key, dico[key])) return "\n".join(lines)
[docs] class CK_SLOT_INFO(CkClass): """ matches the PKCS#11 CK_SLOT_INFO structure :ivar slotDescription: blank padded :type slotDescription: string :ivar manufacturerID: blank padded :type manufacturerID: string :ivar flags: See :func:`CkClass.flags2text` :type flags: integer :ivar hardwareVersion: 2 elements list :type hardwareVersion: list :ivar firmwareVersion: 2 elements list :type firmwareVersion: list """ flags_dict = { CKF_TOKEN_PRESENT: "CKF_TOKEN_PRESENT", CKF_REMOVABLE_DEVICE: "CKF_REMOVABLE_DEVICE", CKF_HW_SLOT: "CKF_HW_SLOT"} fields = {"slotDescription": "text", "manufacturerID": "text", "flags": "flags", "hardwareVersion": "text", "firmwareVersion": "text"}
[docs] class CK_INFO(CkClass): """ matches the PKCS#11 CK_INFO structure :ivar cryptokiVersion: Cryptoki interface version :type cryptokiVersion: integer :ivar manufacturerID: blank padded :type manufacturerID: string :ivar flags: must be zero :type flags: integer :ivar libraryDescription: blank padded :type libraryDescription: string :var libraryVersion: 2 elements list :type libraryVersion: list """ fields = {"cryptokiVersion": "pair", "manufacturerID": "text", "flags": "flags", "libraryDescription": "text", "libraryVersion": "pair"}
[docs] class CK_SESSION_INFO(CkClass): """ matches the PKCS#11 CK_SESSION_INFO structure :ivar slotID: ID of the slot that interfaces with the token :type slotID: integer :ivar state: state of the session :type state: integer :ivar flags: bit flags that define the type of session :type flags: integer :ivar ulDeviceError: an error code defined by the cryptographic token :type ulDeviceError: integer """ flags_dict = { CKF_RW_SESSION: "CKF_RW_SESSION", CKF_SERIAL_SESSION: "CKF_SERIAL_SESSION", }
[docs] def state2text(self): """ parse the `self.state` field and return a `CKS_*` string corresponding to the state :return: a string :rtype: string """ return CKS[self.state]
fields = {"slotID": "text", "state": "text", "flags": "flags", "ulDeviceError": "text"}
[docs] class CK_TOKEN_INFO(CkClass): """ matches the PKCS#11 CK_TOKEN_INFO structure :ivar label: blank padded :type label: string :ivar manufacturerID: blank padded :type manufacturerID: string :ivar model: string blank padded :type model: string :ivar serialNumber: string blank padded :type serialNumber: string :ivar flags: :type flags: integer :ivar ulMaxSessionCount: :type ulMaxSessionCount: integer :ivar ulSessionCount: :type ulSessionCount: integer :ivar ulMaxRwSessionCount: :type ulMaxRwSessionCount: integer :ivar ulRwSessionCount: :type ulRwSessionCount: integer :ivar ulMaxPinLen: :type ulMaxPinLen: integer :ivar ulMinPinLen: :type ulMinPinLen: integer :ivar ulTotalPublicMemory: :type ulTotalPublicMemory: integer :ivar ulFreePublicMemory: :type ulFreePublicMemory: integer :ivar ulTotalPrivateMemory: :type ulTotalPrivateMemory: integer :ivar ulFreePrivateMemory: :type ulFreePrivateMemory: integer :ivar hardwareVersion: 2 elements list :type hardwareVersion: list :ivar firmwareVersion: 2 elements list :type firmwareVersion: list :ivar utcTime: string :type utcTime: string """ flags_dict = { CKF_RNG: "CKF_RNG", CKF_WRITE_PROTECTED: "CKF_WRITE_PROTECTED", CKF_LOGIN_REQUIRED: "CKF_LOGIN_REQUIRED", CKF_USER_PIN_INITIALIZED: "CKF_USER_PIN_INITIALIZED", CKF_RESTORE_KEY_NOT_NEEDED: "CKF_RESTORE_KEY_NOT_NEEDED", CKF_CLOCK_ON_TOKEN: "CKF_CLOCK_ON_TOKEN", CKF_PROTECTED_AUTHENTICATION_PATH: "CKF_PROTECTED_AUTHENTICATION_PATH", CKF_DUAL_CRYPTO_OPERATIONS: "CKF_DUAL_CRYPTO_OPERATIONS", CKF_TOKEN_INITIALIZED: "CKF_TOKEN_INITIALIZED", CKF_SECONDARY_AUTHENTICATION: "CKF_SECONDARY_AUTHENTICATION", CKF_USER_PIN_COUNT_LOW: "CKF_USER_PIN_COUNT_LOW", CKF_USER_PIN_FINAL_TRY: "CKF_USER_PIN_FINAL_TRY", CKF_USER_PIN_LOCKED: "CKF_USER_PIN_LOCKED", CKF_USER_PIN_TO_BE_CHANGED: "CKF_USER_PIN_TO_BE_CHANGED", CKF_SO_PIN_COUNT_LOW: "CKF_SO_PIN_COUNT_LOW", CKF_SO_PIN_FINAL_TRY: "CKF_SO_PIN_FINAL_TRY", CKF_SO_PIN_LOCKED: "CKF_SO_PIN_LOCKED", CKF_SO_PIN_TO_BE_CHANGED: "CKF_SO_PIN_TO_BE_CHANGED", } fields = {"label": "text", "manufacturerID": "text", "model": "text", "serialNumber": "text", "flags": "flags", "ulMaxSessionCount": "text", "ulSessionCount": "text", "ulMaxRwSessionCount": "text", "ulRwSessionCount": "text", "ulMaxPinLen": "text", "ulMinPinLen": "text", "ulTotalPublicMemory": "text", "ulFreePublicMemory": "text", "ulTotalPrivateMemory": "text", "ulFreePrivateMemory": "text", "hardwareVersion": "pair", "firmwareVersion": "pair", "utcTime": "text"}
[docs] class CK_MECHANISM_INFO(CkClass): """ matches the PKCS#11 CK_MECHANISM_INFO structure :ivar ulMinKeySize: minimum size of the key :type ulMinKeySize: integer :ivar ulMaxKeySize: maximum size of the key :type ulMaxKeySize: integer :ivar flags: bit flags specifying mechanism capabilities :type flags: integer """ flags_dict = { CKF_HW: "CKF_HW", CKF_ENCRYPT: "CKF_ENCRYPT", CKF_DECRYPT: "CKF_DECRYPT", CKF_DIGEST: "CKF_DIGEST", CKF_SIGN: "CKF_SIGN", CKF_SIGN_RECOVER: "CKF_SIGN_RECOVER", CKF_VERIFY: "CKF_VERIFY", CKF_VERIFY_RECOVER: "CKF_VERIFY_RECOVER", CKF_GENERATE: "CKF_GENERATE", CKF_GENERATE_KEY_PAIR: "CKF_GENERATE_KEY_PAIR", CKF_WRAP: "CKF_WRAP", CKF_UNWRAP: "CKF_UNWRAP", CKF_DERIVE: "CKF_DERIVE", CKF_EXTENSION: "CKF_EXTENSION", } fields = {"ulMinKeySize": "text", "ulMaxKeySize": "text", "flags": "flags"}
[docs] class PyKCS11Error(Exception): """ define the possible PyKCS11 exceptions """ def __init__(self, value, text=""): self.value = value self.text = text def __str__(self): """ The text representation of a PKCS#11 error is something like: "CKR_DEVICE_ERROR (0x00000030)" """ if self.value in CKR: if self.value < 0: return CKR[self.value] + " (%s)" % self.text else: return CKR[self.value] + " (0x%08X)" % self.value elif self.value & CKR_VENDOR_DEFINED: return "Vendor error (0x%08X)" % (self.value & 0xffffffff & ~CKR_VENDOR_DEFINED) else: return "Unknown error (0x%08X)" % self.value
[docs] class PyKCS11Lib(object): """ high level PKCS#11 binding """ # shared by all instances _loaded_libs = dict() def __init__(self): self.lib = PyKCS11.LowLevel.CPKCS11Lib() def __del__(self): if PyKCS11 and PyKCS11.__name__ and \ PyKCS11.LowLevel and PyKCS11.LowLevel.__name__ and \ PyKCS11.LowLevel._LowLevel and \ PyKCS11.LowLevel._LowLevel.__name__: # unload the library self.unload()
[docs] def load(self, pkcs11dll_filename=None, *init_string): """ load a PKCS#11 library :type pkcs11dll_filename: string :param pkcs11dll_filename: the library name. If this parameter is not set then the environment variable `PYKCS11LIB` is used instead :returns: a :class:`PyKCS11Lib` object :raises: :class:`PyKCS11Error` (-1): when the load fails """ if pkcs11dll_filename is None: pkcs11dll_filename = os.getenv("PYKCS11LIB") if pkcs11dll_filename is None: raise PyKCS11Error(-1, "No PKCS11 library specified (set PYKCS11LIB env variable)") if hasattr(self, "pkcs11dll_filename"): self.unload() # unload the previous library # if the instance was previously initialized, # create a new low level library object for it self.lib = PyKCS11.LowLevel.CPKCS11Lib() # if the lib is already in use: reuse it if pkcs11dll_filename in PyKCS11Lib._loaded_libs: self.lib.Duplicate(PyKCS11Lib._loaded_libs[pkcs11dll_filename]["ref"]) else: # else load it rv = self.lib.Load(pkcs11dll_filename) if rv != CKR_OK: raise PyKCS11Error(rv, pkcs11dll_filename) PyKCS11Lib._loaded_libs[pkcs11dll_filename] = { "ref": self.lib, "nb_users": 0 } # remember the lib file name self.pkcs11dll_filename = pkcs11dll_filename # increase user number PyKCS11Lib._loaded_libs[pkcs11dll_filename]["nb_users"] += 1 return self
[docs] def unload(self): """ unload the current instance of a PKCS#11 library """ # in case NO library was found and used if not hasattr(self, "pkcs11dll_filename"): return if self.pkcs11dll_filename not in PyKCS11Lib._loaded_libs: raise PyKCS11Error(PyKCS11.LowLevel.CKR_GENERAL_ERROR, "invalid PyKCS11Lib state") # decrease user number PyKCS11Lib._loaded_libs[self.pkcs11dll_filename]["nb_users"] -= 1 if PyKCS11Lib._loaded_libs[self.pkcs11dll_filename]["nb_users"] == 0: # unload only if no more used self.lib.Unload() # remove unused entry # the case < 0 happens if lib loading failed if PyKCS11Lib._loaded_libs[self.pkcs11dll_filename]["nb_users"] <= 0: del PyKCS11Lib._loaded_libs[self.pkcs11dll_filename] delattr(self, "pkcs11dll_filename")
[docs] def initToken(self, slot, pin, label): """ C_InitToken :param slot: slot number returned by :func:`getSlotList` :type slot: integer :param pin: Security Officer's initial PIN :param label: new label of the token """ pin1 = ckbytelist(pin) rv = self.lib.C_InitToken(slot, pin1, label) if rv != CKR_OK: raise PyKCS11Error(rv)
[docs] def getInfo(self): """ C_GetInfo :return: a :class:`CK_INFO` object """ info = PyKCS11.LowLevel.CK_INFO() rv = self.lib.C_GetInfo(info) if rv != CKR_OK: raise PyKCS11Error(rv) i = CK_INFO() i.cryptokiVersion = (info.cryptokiVersion.major, info.cryptokiVersion.minor) i.manufacturerID = info.GetManufacturerID() i.flags = info.flags i.libraryDescription = info.GetLibraryDescription() i.libraryVersion = (info.libraryVersion.major, info.libraryVersion.minor) return i
[docs] def getSlotList(self, tokenPresent=False): """ C_GetSlotList :param tokenPresent: `False` (default) to list all slots, `True` to list only slots with present tokens :type tokenPresent: bool :return: a list of available slots :rtype: list """ slotList = PyKCS11.LowLevel.ckintlist() rv = self.lib.C_GetSlotList(CK_TRUE if tokenPresent else CK_FALSE, slotList) if rv != CKR_OK: raise PyKCS11Error(rv) s = [] for x in range(len(slotList)): s.append(slotList[x]) return s
[docs] def getSlotInfo(self, slot): """ C_GetSlotInfo :param slot: slot number returned by :func:`getSlotList` :type slot: integer :return: a :class:`CK_SLOT_INFO` object """ slotInfo = PyKCS11.LowLevel.CK_SLOT_INFO() rv = self.lib.C_GetSlotInfo(slot, slotInfo) if rv != CKR_OK: raise PyKCS11Error(rv) s = CK_SLOT_INFO() s.slotDescription = slotInfo.GetSlotDescription() s.manufacturerID = slotInfo.GetManufacturerID() s.flags = slotInfo.flags s.hardwareVersion = slotInfo.GetHardwareVersion() s.firmwareVersion = slotInfo.GetFirmwareVersion() return s
[docs] def getTokenInfo(self, slot): """ C_GetTokenInfo :param slot: slot number returned by :func:`getSlotList` :type slot: integer :return: a :class:`CK_TOKEN_INFO` object """ tokeninfo = PyKCS11.LowLevel.CK_TOKEN_INFO() rv = self.lib.C_GetTokenInfo(slot, tokeninfo) if rv != CKR_OK: raise PyKCS11Error(rv) t = CK_TOKEN_INFO() t.label = tokeninfo.GetLabel() t.manufacturerID = tokeninfo.GetManufacturerID() t.model = tokeninfo.GetModel() t.serialNumber = tokeninfo.GetSerialNumber() t.flags = tokeninfo.flags t.ulMaxSessionCount = tokeninfo.ulMaxSessionCount if t.ulMaxSessionCount == CK_UNAVAILABLE_INFORMATION: t.ulMaxSessionCount = -1 t.ulSessionCount = tokeninfo.ulSessionCount if t.ulSessionCount == CK_UNAVAILABLE_INFORMATION: t.ulSessionCount = -1 t.ulMaxRwSessionCount = tokeninfo.ulMaxRwSessionCount if t.ulMaxRwSessionCount == CK_UNAVAILABLE_INFORMATION: t.ulMaxRwSessionCount = -1 t.ulRwSessionCount = tokeninfo.ulRwSessionCount if t.ulRwSessionCount == CK_UNAVAILABLE_INFORMATION: t.ulRwSessionCount = -1 t.ulMaxPinLen = tokeninfo.ulMaxPinLen t.ulMinPinLen = tokeninfo.ulMinPinLen t.ulTotalPublicMemory = tokeninfo.ulTotalPublicMemory if t.ulTotalPublicMemory == CK_UNAVAILABLE_INFORMATION: t.ulTotalPublicMemory = -1 t.ulFreePublicMemory = tokeninfo.ulFreePublicMemory if t.ulFreePublicMemory == CK_UNAVAILABLE_INFORMATION: t.ulFreePublicMemory = -1 t.ulTotalPrivateMemory = tokeninfo.ulTotalPrivateMemory if t.ulTotalPrivateMemory == CK_UNAVAILABLE_INFORMATION: t.ulTotalPrivateMemory = -1 t.ulFreePrivateMemory = tokeninfo.ulFreePrivateMemory if t.ulFreePrivateMemory == CK_UNAVAILABLE_INFORMATION: t.ulFreePrivateMemory = -1 t.hardwareVersion = (tokeninfo.hardwareVersion.major, tokeninfo.hardwareVersion.minor) t.firmwareVersion = (tokeninfo.firmwareVersion.major, tokeninfo.firmwareVersion.minor) t.utcTime = tokeninfo.GetUtcTime().replace('\000', ' ') return t
[docs] def openSession(self, slot, flags=0): """ C_OpenSession :param slot: slot number returned by :func:`getSlotList` :type slot: integer :param flags: 0 (default), `CKF_RW_SESSION` for RW session :type flags: integer :return: a :class:`Session` object """ se = PyKCS11.LowLevel.CK_SESSION_HANDLE() flags |= CKF_SERIAL_SESSION rv = self.lib.C_OpenSession(slot, flags, se) if rv != CKR_OK: raise PyKCS11Error(rv) return Session(self, se)
[docs] def closeAllSessions(self, slot): """ C_CloseAllSessions :param slot: slot number :type slot: integer """ rv = self.lib.C_CloseAllSessions(slot) if rv != CKR_OK: raise PyKCS11Error(rv)
[docs] def getMechanismList(self, slot): """ C_GetMechanismList :param slot: slot number returned by :func:`getSlotList` :type slot: integer :return: the list of available mechanisms for a slot :rtype: list """ mechanismList = PyKCS11.LowLevel.ckintlist() rv = self.lib.C_GetMechanismList(slot, mechanismList) if rv != CKR_OK: raise PyKCS11Error(rv) m = [] for x in range(len(mechanismList)): mechanism = mechanismList[x] if mechanism >= CKM_VENDOR_DEFINED: k = 'CKM_VENDOR_DEFINED_0x%X' % (mechanism - CKM_VENDOR_DEFINED) CKM[k] = mechanism CKM[mechanism] = k m.append(CKM[mechanism]) return m
[docs] def getMechanismInfo(self, slot, type): """ C_GetMechanismInfo :param slot: slot number returned by :func:`getSlotList` :type slot: integer :param type: a `CKM_*` type :type type: integer :return: information about a mechanism :rtype: a :class:`CK_MECHANISM_INFO` object """ info = PyKCS11.LowLevel.CK_MECHANISM_INFO() rv = self.lib.C_GetMechanismInfo(slot, CKM[type], info) if rv != CKR_OK: raise PyKCS11Error(rv) i = CK_MECHANISM_INFO() i.ulMinKeySize = info.ulMinKeySize i.ulMaxKeySize = info.ulMaxKeySize i.flags = info.flags return i
[docs] def waitForSlotEvent(self, flags=0): """ C_WaitForSlotEvent :param flags: 0 (default) or `CKF_DONT_BLOCK` :type flags: integer :return: slot :rtype: integer """ tmp = 0 (rv, slot) = self.lib.C_WaitForSlotEvent(flags, tmp) if rv != CKR_OK: raise PyKCS11Error(rv) return slot
[docs] class Mechanism(object): """Wraps CK_MECHANISM""" def __init__(self, mechanism, param=None): """ :param mechanism: the mechanism to be used :type mechanism: integer, any `CKM_*` value :param param: data to be used as crypto operation parameter (i.e. the IV for some algorithms) :type param: string or list/tuple of bytes :see: :func:`Session.decrypt`, :func:`Session.sign` """ self._mech = PyKCS11.LowLevel.CK_MECHANISM() self._mech.mechanism = mechanism self._param = None if param: self._param = ckbytelist(param) self._mech.pParameter = self._param self._mech.ulParameterLen = len(param)
[docs] def to_native(self): return self._mech
MechanismSHA1 = Mechanism(CKM_SHA_1, None) MechanismRSAPKCS1 = Mechanism(CKM_RSA_PKCS, None) MechanismRSAGENERATEKEYPAIR = Mechanism(CKM_RSA_PKCS_KEY_PAIR_GEN, None) MechanismECGENERATEKEYPAIR = Mechanism(CKM_EC_KEY_PAIR_GEN, None) MechanismAESGENERATEKEY = Mechanism(CKM_AES_KEY_GEN, None)
[docs] class AES_GCM_Mechanism(object): """CKM_AES_GCM warpping mechanism""" def __init__(self, iv, aad, tagBits): """ :param iv: initialization vector :param aad: additional authentication data :param tagBits: length of authentication tag in bits """ self._param = PyKCS11.LowLevel.CK_GCM_PARAMS() self._source_iv = ckbytelist(iv) self._param.pIv = self._source_iv self._param.ulIvLen = len(self._source_iv) self._source_aad = ckbytelist(aad) self._param.pAAD = self._source_aad self._param.ulAADLen = len(self._source_aad) self._param.ulTagBits = tagBits self._mech = PyKCS11.LowLevel.CK_MECHANISM() self._mech.mechanism = CKM_AES_GCM self._mech.pParameter = self._param self._mech.ulParameterLen = PyKCS11.LowLevel.CK_GCM_PARAMS_LENGTH
[docs] def to_native(self): return self._mech
[docs] class AES_CTR_Mechanism(object): """CKM_AES_CTR encryption mechanism""" def __init__(self, counterBits, counterBlock): """ :param counterBits: the number of incremented bits in the counter block :param counterBlock: a 16-byte initial value of the counter block """ self._param = PyKCS11.LowLevel.CK_AES_CTR_PARAMS() self._source_cb = ckbytelist(counterBlock) self._param.ulCounterBits = counterBits self._param.cb = self._source_cb self._mech = PyKCS11.LowLevel.CK_MECHANISM() self._mech.mechanism = CKM_AES_CTR self._mech.pParameter = self._param self._mech.ulParameterLen = PyKCS11.LowLevel.CK_AES_CTR_PARAMS_LENGTH
[docs] def to_native(self): return self._mech
[docs] class RSAOAEPMechanism(object): """RSA OAEP Wrapping mechanism""" def __init__(self, hashAlg, mgf, label=None): """ :param hashAlg: the hash algorithm to use (like `CKM_SHA256`) :param mgf: the mask generation function to use (like `CKG_MGF1_SHA256`) :param label: the (optional) label to use """ self._param = PyKCS11.LowLevel.CK_RSA_PKCS_OAEP_PARAMS() self._param.hashAlg = hashAlg self._param.mgf = mgf self._source = None self._param.src = CKZ_DATA_SPECIFIED if label: self._source = ckbytelist(label) self._param.ulSourceDataLen = len(self._source) else: self._param.ulSourceDataLen = 0 self._param.pSourceData = self._source self._mech = PyKCS11.LowLevel.CK_MECHANISM() self._mech.mechanism = CKM_RSA_PKCS_OAEP self._mech.pParameter = self._param self._mech.ulParameterLen = PyKCS11.LowLevel.CK_RSA_PKCS_OAEP_PARAMS_LENGTH
[docs] def to_native(self): return self._mech
[docs] class RSA_PSS_Mechanism(object): """RSA PSS Wrapping mechanism""" def __init__(self, mecha, hashAlg, mgf, sLen): """ :param mecha: the mechanism to use (like `CKM_SHA384_RSA_PKCS_PSS`) :param hashAlg: the hash algorithm to use (like `CKM_SHA384`) :param mgf: the mask generation function to use (like `CKG_MGF1_SHA384`) :param sLen: length, in bytes, of the salt value used in the PSS encoding (like 0 or the message length) """ self._param = PyKCS11.LowLevel.CK_RSA_PKCS_PSS_PARAMS() self._param.hashAlg = hashAlg self._param.mgf = mgf self._param.sLen = sLen self._mech = PyKCS11.LowLevel.CK_MECHANISM() self._mech.mechanism = mecha self._mech.pParameter = self._param self._mech.ulParameterLen = PyKCS11.LowLevel.CK_RSA_PKCS_PSS_PARAMS_LENGTH
[docs] def to_native(self): return self._mech
[docs] class ECDH1_DERIVE_Mechanism(object): """CKM_ECDH1_DERIVE key derivation mechanism""" def __init__(self, publicData, kdf = CKD_NULL, sharedData = None): """ :param publicData: Other party public key which is EC Point [PC || coord-x || coord-y]. :param kdf: Key derivation function. OPTIONAL. Defaults to CKD_NULL :param sharedData: additional shared data. OPTIONAL """ self._param = PyKCS11.LowLevel.CK_ECDH1_DERIVE_PARAMS() self._param.kdf = kdf if sharedData: self._shared_data = ckbytelist(sharedData) self._param.pSharedData = self._shared_data self._param.ulSharedDataLen = len(self._shared_data) else: self._source_shared_data = None self._param.ulSharedDataLen = 0 self._public_data = ckbytelist(publicData) self._param.pPublicData = self._public_data self._param.ulPublicDataLen = len(self._public_data) self._mech = PyKCS11.LowLevel.CK_MECHANISM() self._mech.mechanism = CKM_ECDH1_DERIVE self._mech.pParameter = self._param self._mech.ulParameterLen = PyKCS11.LowLevel.CK_ECDH1_DERIVE_PARAMS_LENGTH
[docs] def to_native(self): return self._mech
[docs] class CONCATENATE_BASE_AND_KEY_Mechanism(object): """CKM_CONCATENATE_BASE_AND_KEY key derivation mechanism""" def __init__(self, encKey): """ :param encKey: a handle of encryption key """ self._mech = PyKCS11.LowLevel.CK_MECHANISM() self._mech.mechanism = CKM_CONCATENATE_BASE_AND_KEY self._mech.pParameter = encKey self._mech.ulParameterLen = PyKCS11.LowLevel.CK_OBJECT_HANDLE_LENGTH
[docs] def to_native(self): return self._mech
[docs] class KEY_DERIVATION_STRING_DATA_MechanismBase(object): """Base class for mechanisms using derivation string data""" def __init__(self, data, mechType): """ :param data: a byte array to concatenate the key with :param mechType: mechanism type """ self._param = PyKCS11.LowLevel.CK_KEY_DERIVATION_STRING_DATA() self._data = ckbytelist(data) self._param.pData = self._data self._param.ulLen = len(self._data) self._mech = PyKCS11.LowLevel.CK_MECHANISM() self._mech.mechanism = mechType self._mech.pParameter = self._param self._mech.ulParameterLen = PyKCS11.LowLevel.CK_KEY_DERIVATION_STRING_DATA_LENGTH
[docs] def to_native(self): return self._mech
[docs] class CONCATENATE_BASE_AND_DATA_Mechanism(KEY_DERIVATION_STRING_DATA_MechanismBase): """CKM_CONCATENATE_BASE_AND_DATA key derivation mechanism""" def __init__(self, data): """ :param data: a byte array to concatenate the key with """ super().__init__(data, CKM_CONCATENATE_BASE_AND_DATA)
[docs] class CONCATENATE_DATA_AND_BASE_Mechanism(KEY_DERIVATION_STRING_DATA_MechanismBase): """CKM_CONCATENATE_DATA_AND_BASE key derivation mechanism""" def __init__(self, data): """ :param data: a byte array to concatenate the key with """ super().__init__(data, CKM_CONCATENATE_DATA_AND_BASE)
[docs] class XOR_BASE_AND_DATA_Mechanism(KEY_DERIVATION_STRING_DATA_MechanismBase): """CKM_XOR_BASE_AND_DATA key derivation mechanism""" def __init__(self, data): """ :param data: a byte array to xor the key with """ super().__init__(data, CKM_XOR_BASE_AND_DATA)
[docs] class DigestSession(object): def __init__(self, lib, session, mecha): self._lib = lib self._session = session self._mechanism = mecha.to_native() rv = self._lib.C_DigestInit(self._session, self._mechanism) if rv != CKR_OK: raise PyKCS11Error(rv)
[docs] def update(self, data): """ C_DigestUpdate :param data: data to add to the digest :type data: bytes or string """ data1 = ckbytelist(data) rv = self._lib.C_DigestUpdate(self._session, data1) if rv != CKR_OK: raise PyKCS11Error(rv) return self
[docs] def digestKey(self, handle): """ C_DigestKey :param handle: key handle :type handle: CK_OBJECT_HANDLE """ rv = self._lib.C_DigestKey(self._session, handle) if rv != CKR_OK: raise PyKCS11Error(rv) return self
[docs] def final(self): """ C_DigestFinal :return: the digest :rtype: ckbytelist """ digest = ckbytelist() # Get the size of the digest rv = self._lib.C_DigestFinal(self._session, digest) if rv != CKR_OK: raise PyKCS11Error(rv) # Get the actual digest rv = self._lib.C_DigestFinal(self._session, digest) if rv != CKR_OK: raise PyKCS11Error(rv) return digest
[docs] class Session(object): """ Manage :func:`PyKCS11Lib.openSession` objects """ def __init__(self, pykcs11, session): """ :param pykcs11: PyKCS11 library object :type pykcs11: PyKCS11Lib :param session: session handle :type session: instance of :class:`CK_SESSION_HANDLE` """ if not isinstance(pykcs11, PyKCS11Lib): raise TypeError("pykcs11 must be a PyKCS11Lib") if not isinstance(session, LowLevel.CK_SESSION_HANDLE): raise TypeError("session must be a CK_SESSION_HANDLE") # hold the PyKCS11Lib reference, so that it's not Garbage Collection'd self.pykcs11 = pykcs11 self.session = session @property def lib(self): """ Get the low level lib of the owning PyKCS11Lib """ return self.pykcs11.lib
[docs] def closeSession(self): """ C_CloseSession """ rv = self.lib.C_CloseSession(self.session) if rv != CKR_OK: raise PyKCS11Error(rv)
[docs] def getSessionInfo(self): """ C_GetSessionInfo :return: a :class:`CK_SESSION_INFO` object """ sessioninfo = PyKCS11.LowLevel.CK_SESSION_INFO() rv = self.lib.C_GetSessionInfo(self.session, sessioninfo) if rv != CKR_OK: raise PyKCS11Error(rv) s = CK_SESSION_INFO() s.slotID = sessioninfo.slotID s.state = sessioninfo.state s.flags = sessioninfo.flags s.ulDeviceError = sessioninfo.ulDeviceError return s
[docs] def login(self, pin, user_type=CKU_USER): """ C_Login :param pin: the user's PIN or None for CKF_PROTECTED_AUTHENTICATION_PATH :type pin: string :param user_type: the user type. The default value is CKU_USER. You may also use CKU_SO :type user_type: integer """ pin1 = ckbytelist(pin) rv = self.lib.C_Login(self.session, user_type, pin1) if rv != CKR_OK: raise PyKCS11Error(rv)
[docs] def logout(self): """ C_Logout """ rv = self.lib.C_Logout(self.session) if rv != CKR_OK: raise PyKCS11Error(rv) del self
[docs] def initPin(self, pin): """ C_InitPIN :param pin: new PIN """ new_pin1 = ckbytelist(pin) rv = self.lib.C_InitPIN(self.session, new_pin1) if rv != CKR_OK: raise PyKCS11Error(rv)
[docs] def setPin(self, old_pin, new_pin): """ C_SetPIN :param old_pin: old PIN :param new_pin: new PIN """ old_pin1 = ckbytelist(old_pin) new_pin1 = ckbytelist(new_pin) rv = self.lib.C_SetPIN(self.session, old_pin1, new_pin1) if rv != CKR_OK: raise PyKCS11Error(rv)
[docs] def createObject(self, template): """ C_CreateObject :param template: object template """ attrs = self._template2ckattrlist(template) handle = PyKCS11.LowLevel.CK_OBJECT_HANDLE() rv = self.lib.C_CreateObject(self.session, attrs, handle) if rv != PyKCS11.CKR_OK: raise PyKCS11.PyKCS11Error(rv) return handle
[docs] def destroyObject(self, obj): """ C_DestroyObject :param obj: object ID """ rv = self.lib.C_DestroyObject(self.session, obj) if rv != CKR_OK: raise PyKCS11Error(rv)
[docs] def digestSession(self, mecha=MechanismSHA1): """ C_DigestInit/C_DigestUpdate/C_DigestKey/C_DigestFinal :param mecha: the digesting mechanism to be used (use `MechanismSHA1` for `CKM_SHA_1`) :type mecha: :class:`Mechanism` :return: A :class:`DigestSession` object :rtype: DigestSession """ return DigestSession(self.lib, self.session, mecha)
[docs] def digest(self, data, mecha=MechanismSHA1): """ C_DigestInit/C_Digest :param data: the data to be digested :type data: (binary) sring or list/tuple of bytes :param mecha: the digesting mechanism to be used (use `MechanismSHA1` for `CKM_SHA_1`) :type mecha: :class:`Mechanism` :return: the computed digest :rtype: list of bytes :note: the returned value is an istance of :class:`ckbytelist`. You can easly convert it to a binary string with: ``bytes(ckbytelistDigest)`` or, for Python 2: ``''.join(chr(i) for i in ckbytelistDigest)`` """ digest = ckbytelist() m = mecha.to_native() data1 = ckbytelist(data) rv = self.lib.C_DigestInit(self.session, m) if rv != CKR_OK: raise PyKCS11Error(rv) # first call get digest size rv = self.lib.C_Digest(self.session, data1, digest) if rv != CKR_OK: raise PyKCS11Error(rv) # second call get actual digest data rv = self.lib.C_Digest(self.session, data1, digest) if rv != CKR_OK: raise PyKCS11Error(rv) return digest
[docs] def sign(self, key, data, mecha=MechanismRSAPKCS1): """ C_SignInit/C_Sign :param key: a key handle, obtained calling :func:`findObjects`. :type key: integer :param data: the data to be signed :type data: (binary) string or list/tuple of bytes :param mecha: the signing mechanism to be used (use `MechanismRSAPKCS1` for `CKM_RSA_PKCS`) :type mecha: :class:`Mechanism` :return: the computed signature :rtype: list of bytes :note: the returned value is an instance of :class:`ckbytelist`. You can easly convert it to a binary string with: ``bytes(ckbytelistSignature)`` or, for Python 2: ``''.join(chr(i) for i in ckbytelistSignature)`` """ m = mecha.to_native() signature = ckbytelist() data1 = ckbytelist(data) rv = self.lib.C_SignInit(self.session, m, key) if rv != CKR_OK: raise PyKCS11Error(rv) # first call get signature size rv = self.lib.C_Sign(self.session, data1, signature) if rv != CKR_OK: raise PyKCS11Error(rv) # second call get actual signature data rv = self.lib.C_Sign(self.session, data1, signature) if rv != CKR_OK: raise PyKCS11Error(rv) return signature
[docs] def verify(self, key, data, signature, mecha=MechanismRSAPKCS1): """ C_VerifyInit/C_Verify :param key: a key handle, obtained calling :func:`findObjects`. :type key: integer :param data: the data that was signed :type data: (binary) string or list/tuple of bytes :param signature: the signature to be verified :type signature: (binary) string or list/tuple of bytes :param mecha: the signing mechanism to be used (use `MechanismRSAPKCS1` for `CKM_RSA_PKCS`) :type mecha: :class:`Mechanism` :return: True if signature is valid, False otherwise :rtype: bool """ m = mecha.to_native() data1 = ckbytelist(data) rv = self.lib.C_VerifyInit(self.session, m, key) if rv != CKR_OK: raise PyKCS11Error(rv) rv = self.lib.C_Verify(self.session, data1, signature) if rv == CKR_OK: return True elif rv == CKR_SIGNATURE_INVALID: return False else: raise PyKCS11Error(rv)
[docs] def encrypt(self, key, data, mecha=MechanismRSAPKCS1): """ C_EncryptInit/C_Encrypt :param key: a key handle, obtained calling :func:`findObjects`. :type key: integer :param data: the data to be encrypted :type data: (binary) string or list/tuple of bytes :param mecha: the encryption mechanism to be used (use `MechanismRSAPKCS1` for `CKM_RSA_PKCS`) :type mecha: :class:`Mechanism` :return: the encrypted data :rtype: list of bytes :note: the returned value is an instance of :class:`ckbytelist`. You can easly convert it to a binary string with: ``bytes(ckbytelistEncrypted)`` or, for Python 2: ``''.join(chr(i) for i in ckbytelistEncrypted)`` """ encrypted = ckbytelist() m = mecha.to_native() data1 = ckbytelist(data) rv = self.lib.C_EncryptInit(self.session, m, key) if rv != CKR_OK: raise PyKCS11Error(rv) # first call get encrypted size rv = self.lib.C_Encrypt(self.session, data1, encrypted) if rv != CKR_OK: raise PyKCS11Error(rv) # second call get actual encrypted data rv = self.lib.C_Encrypt(self.session, data1, encrypted) if rv != CKR_OK: raise PyKCS11Error(rv) return encrypted
[docs] def decrypt(self, key, data, mecha=MechanismRSAPKCS1): """ C_DecryptInit/C_Decrypt :param key: a key handle, obtained calling :func:`findObjects`. :type key: integer :param data: the data to be decrypted :type data: (binary) string or list/tuple of bytes :param mecha: the decrypt mechanism to be used :type mecha: :class:`Mechanism` instance or :class:`MechanismRSAPKCS1` for CKM_RSA_PKCS :return: the decrypted data :rtype: list of bytes :note: the returned value is an instance of :class:`ckbytelist`. You can easly convert it to a binary string with: ``bytes(ckbytelistData)`` or, for Python 2: ``''.join(chr(i) for i in ckbytelistData)`` """ m = mecha.to_native() decrypted = ckbytelist() data1 = ckbytelist(data) rv = self.lib.C_DecryptInit(self.session, m, key) if rv != CKR_OK: raise PyKCS11Error(rv) # first call get decrypted size rv = self.lib.C_Decrypt(self.session, data1, decrypted) if rv != CKR_OK: raise PyKCS11Error(rv) # second call get actual decrypted data rv = self.lib.C_Decrypt(self.session, data1, decrypted) if rv != CKR_OK: raise PyKCS11Error(rv) return decrypted
[docs] def wrapKey(self, wrappingKey, key, mecha=MechanismRSAPKCS1): """ C_WrapKey :param wrappingKey: a wrapping key handle :type wrappingKey: integer :param key: a handle of the key to be wrapped :type key: integer :param mecha: the encrypt mechanism to be used (use `MechanismRSAPKCS1` for `CKM_RSA_PKCS`) :type mecha: :class:`Mechanism` :return: the wrapped key bytes :rtype: list of bytes :note: the returned value is an instance of :class:`ckbytelist`. You can easily convert it to a binary string with: ``bytes(ckbytelistData)`` or, for Python 2: ``''.join(chr(i) for i in ckbytelistData)`` """ wrapped = ckbytelist() native = mecha.to_native() # first call get wrapped size rv = self.lib.C_WrapKey(self.session, native, wrappingKey, key, wrapped) if rv != CKR_OK: raise PyKCS11Error(rv) # second call get actual wrapped key data rv = self.lib.C_WrapKey(self.session, native, wrappingKey, key, wrapped) if rv != CKR_OK: raise PyKCS11Error(rv) return wrapped
[docs] def unwrapKey(self, unwrappingKey, wrappedKey, template, mecha=MechanismRSAPKCS1): """ C_UnwrapKey :param unwrappingKey: the unwrapping key handle :type unwrappingKey: integer :param wrappedKey: the bytes of the wrapped key :type wrappedKey: (binary) string or list/tuple of bytes :param template: template for the unwrapped key :param mecha: the decrypt mechanism to be used (use `MechanismRSAPKCS1` for `CKM_RSA_PKCS`) :type mecha: :class:`Mechanism` :return: the unwrapped key object :rtype: integer """ m = mecha.to_native() data1 = ckbytelist(wrappedKey) handle = PyKCS11.LowLevel.CK_OBJECT_HANDLE() attrs = self._template2ckattrlist(template) rv = self.lib.C_UnwrapKey(self.session, m, unwrappingKey, data1, attrs, handle) if rv != CKR_OK: raise PyKCS11Error(rv) return handle
[docs] def deriveKey(self, baseKey, template, mecha): """ C_DeriveKey :param baseKey: the base key handle :type baseKey: integer :param template: template for the unwrapped key :param mecha: the decrypt mechanism to be used (use `ECDH1_DERIVE_Mechanism(...)` for `CKM_ECDH1_DERIVE`) :type mecha: :class:`Mechanism` :return: the unwrapped key object :rtype: integer """ m = mecha.to_native() handle = PyKCS11.LowLevel.CK_OBJECT_HANDLE() attrs = self._template2ckattrlist(template) rv = self.lib.C_DeriveKey(self.session, m, baseKey, attrs, handle) if rv != CKR_OK: raise PyKCS11Error(rv) return handle
[docs] def isNum(self, type): """ is the type a numerical value? :param type: PKCS#11 type like `CKA_CERTIFICATE_TYPE` :rtype: bool """ if type in (CKA_CERTIFICATE_TYPE, CKA_CLASS, CKA_HW_FEATURE_TYPE, CKA_KEY_GEN_MECHANISM, CKA_KEY_TYPE, CKA_MODULUS_BITS, CKA_VALUE_BITS, CKA_VALUE_LEN): return True return False
[docs] def isString(self, type): """ is the type a string value? :param type: PKCS#11 type like `CKA_LABEL` :rtype: bool """ if type in (CKA_LABEL, CKA_APPLICATION): return True return False
[docs] def isBool(self, type): """ is the type a boolean value? :param type: PKCS#11 type like `CKA_ALWAYS_SENSITIVE` :rtype: bool """ if type in (CKA_ALWAYS_AUTHENTICATE, CKA_ALWAYS_SENSITIVE, CKA_DECRYPT, CKA_DERIVE, CKA_ENCRYPT, CKA_EXTRACTABLE, CKA_HAS_RESET, CKA_LOCAL, CKA_MODIFIABLE, CKA_COPYABLE, CKA_DESTROYABLE, CKA_NEVER_EXTRACTABLE, CKA_PRIVATE, CKA_RESET_ON_INIT, CKA_SECONDARY_AUTH, CKA_SENSITIVE, CKA_SIGN, CKA_SIGN_RECOVER, CKA_TOKEN, CKA_TRUSTED, CKA_UNWRAP, CKA_VERIFY, CKA_VERIFY_RECOVER, CKA_WRAP, CKA_WRAP_WITH_TRUSTED): return True return False
[docs] def isBin(self, type): """ is the type a byte array value? :param type: PKCS#11 type like `CKA_MODULUS` :rtype: bool """ return (not self.isBool(type)) \ and (not self.isString(type)) \ and (not self.isNum(type))
[docs] def isAttributeList(self, type): """ is the type a attribute list value? :param type: PKCS#11 type like `CKA_WRAP_TEMPLATE` :rtype: bool """ if type in (CKA_WRAP_TEMPLATE, CKA_UNWRAP_TEMPLATE): return True return False
def _template2ckattrlist(self, template): t = PyKCS11.LowLevel.ckattrlist(len(template)) for x in range(len(template)): attr = template[x] if self.isNum(attr[0]): t[x].SetNum(attr[0], int(attr[1])) elif self.isString(attr[0]): t[x].SetString(attr[0], str(attr[1])) elif self.isBool(attr[0]): t[x].SetBool(attr[0], attr[1] == CK_TRUE) elif self.isAttributeList(attr[0]): t[x].SetList(attr[0], self._template2ckattrlist(attr[1])) elif self.isBin(attr[0]): attrBin = attr[1] attrStr = attr[1] if isinstance(attr[1], int): attrStr = str(attr[1]) if isinstance(attr[1], bytes): attrBin = ckbytelist(attrStr) t[x].SetBin(attr[0], attrBin) else: raise PyKCS11Error(-2) return t
[docs] def generateKey(self, template, mecha=MechanismAESGENERATEKEY): """ generate a secret key :param template: template for the secret key :param mecha: mechanism to use :return: handle of the generated key :rtype: PyKCS11.LowLevel.CK_OBJECT_HANDLE """ t = self._template2ckattrlist(template) ck_handle = PyKCS11.LowLevel.CK_OBJECT_HANDLE() m = mecha.to_native() rv = self.lib.C_GenerateKey(self.session, m, t, ck_handle) if rv != CKR_OK: raise PyKCS11Error(rv) return ck_handle
[docs] def generateKeyPair(self, templatePub, templatePriv, mecha=MechanismRSAGENERATEKEYPAIR): """ generate a key pair :param templatePub: template for the public key :param templatePriv: template for the private key :param mecha: mechanism to use :return: a tuple of handles (pub, priv) :rtype: tuple """ tPub = self._template2ckattrlist(templatePub) tPriv = self._template2ckattrlist(templatePriv) ck_pub_handle = PyKCS11.LowLevel.CK_OBJECT_HANDLE() ck_prv_handle = PyKCS11.LowLevel.CK_OBJECT_HANDLE() m = mecha.to_native() rv = self.lib.C_GenerateKeyPair(self.session, m, tPub, tPriv, ck_pub_handle, ck_prv_handle) if rv != CKR_OK: raise PyKCS11Error(rv) return ck_pub_handle, ck_prv_handle
[docs] def findObjects(self, template=()): """ find the objects matching the template pattern :param template: list of attributes tuples (attribute,value). The default value is () and all the objects are returned :type template: list :return: a list of object ids :rtype: list """ t = self._template2ckattrlist(template) # we search for 10 objects by default. speed/memory tradeoff result = PyKCS11.LowLevel.ckobjlist(10) rv = self.lib.C_FindObjectsInit(self.session, t) if rv != CKR_OK: raise PyKCS11Error(rv) res = [] while True: rv = self.lib.C_FindObjects(self.session, result) if rv != CKR_OK: raise PyKCS11Error(rv) for x in result: # make a copy of the handle: the original value get # corrupted (!!) a = CK_OBJECT_HANDLE(self) a.assign(x.value()) res.append(a) if len(result) == 0: break rv = self.lib.C_FindObjectsFinal(self.session) if rv != CKR_OK: raise PyKCS11Error(rv) return res
[docs] def getAttributeValue(self, obj_id, attr, allAsBinary=False): """ C_GetAttributeValue :param obj_id: object ID returned by :func:`findObjects` :type obj_id: integer :param attr: list of attributes :type attr: list :param allAsBinary: return all values as binary data; default is False. :type allAsBinary: Boolean :return: a list of values corresponding to the list of attributes :rtype: list :see: :func:`getAttributeValue_fragmented` :note: if allAsBinary is True the function do not convert results to Python types (i.e.: CKA_TOKEN to Bool, CKA_CLASS to int, ...). Binary data is returned as :class:`ckbytelist` type, usable as a list containing only bytes. You can easly convert it to a binary string with: ``bytes(ckbytelistVariable)`` or, for Python 2: ``''.join(chr(i) for i in ckbytelistVariable)`` """ valTemplate = PyKCS11.LowLevel.ckattrlist(len(attr)) for x in range(len(attr)): valTemplate[x].SetType(attr[x]) # first call to get the attribute size and reserve the memory rv = self.lib.C_GetAttributeValue(self.session, obj_id, valTemplate) if rv in (CKR_ATTRIBUTE_TYPE_INVALID, CKR_ATTRIBUTE_SENSITIVE, CKR_ARGUMENTS_BAD): return self.getAttributeValue_fragmented(obj_id, attr, allAsBinary) if rv != CKR_OK: raise PyKCS11Error(rv) # second call to get the attribute value rv = self.lib.C_GetAttributeValue(self.session, obj_id, valTemplate) if rv != CKR_OK: raise PyKCS11Error(rv) res = [] for x in range(len(attr)): if allAsBinary: res.append(valTemplate[x].GetBin()) elif valTemplate[x].IsNum(): res.append(valTemplate[x].GetNum()) elif valTemplate[x].IsBool(): res.append(valTemplate[x].GetBool()) elif valTemplate[x].IsString(): res.append(valTemplate[x].GetString()) elif valTemplate[x].IsBin(): res.append(valTemplate[x].GetBin()) else: raise PyKCS11Error(-2) return res
[docs] def getAttributeValue_fragmented(self, obj_id, attr, allAsBinary=False): """ Same as :func:`getAttributeValue` except that when some attribute is sensitive or unknown an empty value (None) is returned. Note: this is achived by getting attributes one by one. :see: :func:`getAttributeValue` """ # some attributes does not exists or is sensitive # but we don't know which ones. So try one by one valTemplate = PyKCS11.LowLevel.ckattrlist(1) res = [] for x in range(len(attr)): valTemplate[0].Reset() valTemplate[0].SetType(attr[x]) # first call to get the attribute size and reserve the memory rv = self.lib.C_GetAttributeValue(self.session, obj_id, valTemplate) if rv in (CKR_ATTRIBUTE_TYPE_INVALID, CKR_ATTRIBUTE_SENSITIVE, CKR_ARGUMENTS_BAD): # append an empty value res.append(None) continue if rv != CKR_OK: raise PyKCS11Error(rv) # second call to get the attribute value rv = self.lib.C_GetAttributeValue(self.session, obj_id, valTemplate) if rv != CKR_OK: raise PyKCS11Error(rv) if allAsBinary: res.append(valTemplate[0].GetBin()) elif valTemplate[0].IsNum(): res.append(valTemplate[0].GetNum()) elif valTemplate[0].IsBool(): res.append(valTemplate[0].GetBool()) elif valTemplate[0].IsString(): res.append(valTemplate[0].GetString()) elif valTemplate[0].IsBin(): res.append(valTemplate[0].GetBin()) elif valTemplate[0].IsAttributeList(): res.append(valTemplate[0].GetBin()) else: raise PyKCS11Error(-2) return res
[docs] def setAttributeValue(self, obj_id, template): """ C_SetAttributeValue :param obj_id: object ID returned by :func:`findObjects` :type obj_id: integer :param template: list of (attribute, value) pairs :type template: list :return: Nothing :rtype: None """ templ = self._template2ckattrlist(template) rv = self.lib.C_SetAttributeValue(self.session, obj_id, templ) if rv != CKR_OK: raise PyKCS11Error(rv) return None
[docs] def seedRandom(self, seed): """ C_SeedRandom :param seed: seed material :type seed: iterable """ low_seed = ckbytelist(seed) rv = self.lib.C_SeedRandom(self.session, low_seed) if rv != CKR_OK: raise PyKCS11Error(rv)
[docs] def generateRandom(self, size=16): """ C_GenerateRandom :param size: number of random bytes to get :type size: integer :note: the returned value is an instance of :class:`ckbytelist`. You can easly convert it to a binary string with: ``bytes(random)`` or, for Python 2: ``''.join(chr(i) for i in random)`` """ low_rand = ckbytelist([0] * size) rv = self.lib.C_GenerateRandom(self.session, low_rand) if rv != CKR_OK: raise PyKCS11Error(rv) return low_rand
if __name__ == "__main__": # sample test/debug code p = PyKCS11Lib() p.load() print("getInfo") print(p.getInfo()) print() print("getSlotList") s = p.getSlotList() print("slots:", s) slot = s[0] print("using slot:", slot) print() print("getSlotInfo") print(p.getSlotInfo(slot)) print() print("getTokenInfo") print(p.getTokenInfo(slot)) print() print("openSession") se = p.openSession(slot) print() print("sessionInfo") print(se.getSessionInfo()) print() print("seedRandom") try: se.seedRandom([1, 2, 3, 4]) except PyKCS11Error as e: print(e) print("generateRandom") print(se.generateRandom()) print() print("login") se.login(pin="0000") print() print("sessionInfo") print(se.getSessionInfo()) print() print("findObjects") objs = se.findObjects([(CKA_CLASS, CKO_CERTIFICATE)]) print("Nb objetcs:", len(objs)) print(objs) print() print("getAttributeValue") for o in objs: attr = se.getAttributeValue(o, [CKA_LABEL, CKA_CLASS]) print(attr) print() print("logout") se.logout() print() print("closeSession") se.closeSession()