whitespace: proto.xmr

This commit is contained in:
The MMGen Project 2024-10-18 10:32:09 +00:00
commit 1623ea6beb
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
5 changed files with 88 additions and 88 deletions

View file

@ -12,26 +12,26 @@
proto.xmr.addrgen: Monero address generation class for the MMGen suite
"""
from ...addrgen import addr_generator,check_data
from ...addrgen import addr_generator, check_data
from ...addr import CoinAddr
class monero(addr_generator.keccak):
def b58enc(self,addr_bytes):
def b58enc(self, addr_bytes):
from ...baseconv import baseconv
enc = baseconv('b58').frombytes
l = len(addr_bytes)
a = ''.join([enc( addr_bytes[i*8:i*8+8], pad=11, tostr=True ) for i in range(l//8)])
b = enc( addr_bytes[l-l%8:], pad=7, tostr=True )
a = ''.join([enc(addr_bytes[i*8:i*8+8], pad=11, tostr=True) for i in range(l//8)])
b = enc(addr_bytes[l-l%8:], pad=7, tostr=True)
return a + b
@check_data
def to_addr(self,data):
def to_addr(self, data):
step1 = self.proto.addr_fmt_to_ver_bytes['monero'] + data.pubkey
return CoinAddr(
proto = self.proto,
addr = self.b58enc( step1 + self.keccak_256(step1).digest()[:4]) )
addr = self.b58enc(step1 + self.keccak_256(step1).digest()[:4]))
@check_data
def to_viewkey(self,data):
return self.proto.viewkey( data.viewkey_bytes.hex() )
def to_viewkey(self, data):
return self.proto.viewkey(data.viewkey_bytes.hex())

View file

@ -15,28 +15,28 @@ proto.xmr.daemon: Monero base protocol daemon classes
import sys, os
from ...cfg import gc
from ...util import list_gen,die,contains_any
from ...daemon import CoinDaemon,RPCDaemon,_nw,_dd
from ...util import list_gen, die, contains_any
from ...daemon import CoinDaemon, RPCDaemon, _nw, _dd
class monero_daemon(CoinDaemon):
daemon_data = _dd('Monero', 18003004, '0.18.3.4-release')
networks = ('mainnet','testnet')
networks = ('mainnet', 'testnet')
exec_fn = 'monerod'
testnet_dir = 'stagenet'
new_console_mswin = True
rpc_ports = _nw(18081, 38081, None) # testnet is stagenet
cfg_file = 'bitmonero.conf'
datadirs = {
'linux': [gc.home_dir,'.bitmonero'],
'darwin': [gc.home_dir,'.bitmonero'],
'win32': ['/','c','ProgramData','bitmonero']
'linux': [gc.home_dir, '.bitmonero'],
'darwin': [gc.home_dir, '.bitmonero'],
'win32': ['/', 'c', 'ProgramData', 'bitmonero']
}
def init_datadir(self):
self.logdir = super().init_datadir()
return os.path.join(
self.logdir,
self.testnet_dir if self.network == 'testnet' else '' )
self.testnet_dir if self.network == 'testnet' else '')
def get_p2p_port(self):
return self.rpc_port - 1
@ -52,7 +52,7 @@ class monero_daemon(CoinDaemon):
user = None,
passwd = None,
test_connection = False,
daemon = self )
daemon = self)
self.use_pidfile = sys.platform == 'linux'
@ -75,9 +75,9 @@ class monero_daemon(CoinDaemon):
@property
def stop_cmd(self):
if self.platform == 'win32':
return ['kill','-Wf',self.pid]
elif contains_any( self.start_cmd, ['--restricted-rpc','--public-node'] ):
return ['kill',self.pid]
return ['kill', '-Wf', self.pid]
elif contains_any(self.start_cmd, ['--restricted-rpc', '--public-node']):
return ['kill', self.pid]
else:
return [self.exec_fn] + self.shared_args + ['exit']
@ -88,17 +88,17 @@ class MoneroWalletDaemon(RPCDaemon):
exec_fn = 'monero-wallet-rpc'
coin = 'XMR'
new_console_mswin = True
networks = ('mainnet','testnet')
networks = ('mainnet', 'testnet')
rpc_ports = _nw(13131, 13141, None) # testnet is non-standard
_reset_ok = ('debug','wait','pids','force_kill')
test_suite_datadir = os.path.join('test','daemons','xmrtest','wallet_rpc')
_reset_ok = ('debug', 'wait', 'pids', 'force_kill')
test_suite_datadir = os.path.join('test', 'daemons', 'xmrtest', 'wallet_rpc')
def start(self,*args,**kwargs):
def start(self, *args, **kwargs):
try: # NB: required due to bug in v18.3.1: PID file not deleted on shutdown
os.unlink(self.pidfile)
except FileNotFoundError:
pass
super().start(*args,**kwargs)
super().start(*args, **kwargs)
def __init__(
self,
@ -115,25 +115,25 @@ class MoneroWalletDaemon(RPCDaemon):
trust_monerod = False,
test_monerod = False,
opts = None,
flags = None ):
flags = None):
self.proto = proto
self.test_suite = test_suite
super().__init__(cfg,opts=opts,flags=flags)
super().__init__(cfg, opts=opts, flags=flags)
self.network = proto.network
self.wallet_dir = wallet_dir or (self.test_suite_datadir if test_suite else None)
self.rpc_port = (
self.cfg.wallet_rpc_port or
getattr(self.rpc_ports,self.network) + (11 if test_suite else 0) )
getattr(self.rpc_ports, self.network) + (11 if test_suite else 0))
if port_shift:
self.rpc_port += port_shift
id_str = f'{self.exec_fn}-{self.bind_port}'
self.datadir = datadir or (self.test_suite_datadir if test_suite else self.exec_fn + '.d')
self.pidfile = os.path.join(self.datadir,id_str+'.pid')
self.logfile = os.path.join(self.datadir,id_str+'.log')
self.pidfile = os.path.join(self.datadir, id_str+'.pid')
self.logfile = os.path.join(self.datadir, id_str+'.log')
self.use_pidfile = sys.platform == 'linux'
@ -150,9 +150,9 @@ class MoneroWalletDaemon(RPCDaemon):
if test_monerod and self.monerod_port:
import socket
try:
socket.create_connection(('localhost',self.monerod_port),timeout=1).close()
socket.create_connection(('localhost', self.monerod_port), timeout=1).close()
except:
die( 'SocketError', f'Unable to connect to Monero daemon at localhost:{self.monerod_port}' )
die('SocketError', f'Unable to connect to Monero daemon at localhost:{self.monerod_port}')
self.user = user or self.cfg.wallet_rpc_user or self.cfg.monero_wallet_rpc_user
self.passwd = passwd or self.cfg.wallet_rpc_password or self.cfg.monero_wallet_rpc_password
@ -163,7 +163,7 @@ class MoneroWalletDaemon(RPCDaemon):
'You must set your Monero wallet RPC password.\n' +
'This can be done on the command line with the --wallet-rpc-password option\n' +
"(insecure, not recommended), or by setting 'monero_wallet_rpc_password' in\n" +
"the MMGen config file." )
"the MMGen config file.")
self.daemon_args = list_gen(
['--trusted-daemon', trust_monerod],
@ -185,4 +185,4 @@ class MoneroWalletDaemon(RPCDaemon):
self.rpc = MoneroWalletRPCClient(
cfg = self.cfg,
daemon = self,
test_connection = False )
test_connection = False)

View file

@ -19,32 +19,32 @@ class backend:
class base(keygen_base):
def __init__(self,cfg):
def __init__(self, cfg):
super().__init__(cfg)
from ...proto.xmr.params import mainnet
self.proto_cls = mainnet
from ...util2 import get_keccak
self.keccak_256 = get_keccak(cfg)
def to_viewkey(self,privkey):
def to_viewkey(self, privkey):
return self.proto_cls.preprocess_key(
self.proto_cls,
self.keccak_256(privkey).digest(),
None )
None)
class nacl(base):
production_safe = True
def __init__(self,cfg):
def __init__(self, cfg):
super().__init__(cfg)
from nacl.bindings import crypto_scalarmult_ed25519_base_noclamp
self.scalarmultbase = crypto_scalarmult_ed25519_base_noclamp
def to_pubkey(self,privkey):
def to_pubkey(self, privkey):
return PubKey(
self.scalarmultbase( privkey ) +
self.scalarmultbase( self.to_viewkey(privkey) ),
self.scalarmultbase(privkey) +
self.scalarmultbase(self.to_viewkey(privkey)),
compressed = privkey.compressed
)
@ -52,15 +52,15 @@ class backend:
production_safe = False
def __init__(self,cfg):
def __init__(self, cfg):
super().__init__(cfg)
from ...contrib.ed25519 import edwards,encodepoint,B,scalarmult
from ...contrib.ed25519 import edwards, encodepoint, B, scalarmult
self.edwards = edwards
self.encodepoint = encodepoint
self.B = B
self.scalarmult = scalarmult
def scalarmultbase(self,privnum):
def scalarmultbase(self, privnum):
"""
Source and license for scalarmultbase function:
https://github.com/bigreddmachine/MoneroPy/blob/master/moneropy/crypto/ed25519.py
@ -77,18 +77,18 @@ class backend:
@staticmethod
def rev_bytes2int(in_bytes):
return int.from_bytes( in_bytes[::-1], 'big' )
return int.from_bytes(in_bytes[::-1], 'big')
def to_pubkey(self,privkey):
def to_pubkey(self, privkey):
return PubKey(
self.encodepoint( self.scalarmultbase( self.rev_bytes2int(privkey) )) +
self.encodepoint( self.scalarmultbase( self.rev_bytes2int(self.to_viewkey(privkey)) )),
self.encodepoint(self.scalarmultbase(self.rev_bytes2int(privkey))) +
self.encodepoint(self.scalarmultbase(self.rev_bytes2int(self.to_viewkey(privkey)))),
compressed = privkey.compressed
)
class ed25519ll_djbec(ed25519):
def __init__(self,cfg):
def __init__(self, cfg):
super().__init__(cfg)
from ...contrib.ed25519ll_djbec import scalarmult
self.scalarmult = scalarmult

View file

@ -14,22 +14,22 @@ proto.xmr.params: Monero protocol
from collections import namedtuple
from ...protocol import CoinProtocol,_nw
from ...protocol import CoinProtocol, _nw
from ...obj import HexStr
parsed_addr = namedtuple('parsed_addr',['ver_bytes','data','payment_id'])
parsed_addr = namedtuple('parsed_addr', ['ver_bytes', 'data', 'payment_id'])
class MoneroViewKey(HexStr):
color,width,hexcase = 'cyan',64,'lower' # FIXME - no checking performed
color, width, hexcase = 'cyan', 64, 'lower' # FIXME - no checking performed
# https://github.com/monero-project/monero/blob/master/src/cryptonote_config.h
class mainnet(CoinProtocol.DummyWIF,CoinProtocol.Base):
class mainnet(CoinProtocol.DummyWIF, CoinProtocol.Base):
network_names = _nw('mainnet','stagenet',None)
network_names = _nw('mainnet', 'stagenet', None)
base_proto = 'Monero'
base_proto_coin = 'XMR'
base_coin = 'XMR'
addr_ver_info = { '12': 'monero', '2a': 'monero_sub', '13': 'monero_integrated' }
addr_ver_info = {'12': 'monero', '2a': 'monero_sub', '13': 'monero_integrated'}
pubkey_types = ('monero',)
mmtypes = ('M',)
dfl_mmtype = 'M'
@ -41,25 +41,25 @@ class mainnet(CoinProtocol.DummyWIF,CoinProtocol.Base):
coin_amt = 'XMRAmt'
sign_mode = 'standalone'
def get_addr_len(self,addr_fmt):
return (64,72)[addr_fmt == 'monero_integrated']
def get_addr_len(self, addr_fmt):
return (64, 72)[addr_fmt == 'monero_integrated']
def preprocess_key(self,sec,pubkey_type): # reduce key
def preprocess_key(self, sec, pubkey_type): # reduce key
from ...contrib.ed25519 import l
return int.to_bytes(
int.from_bytes( sec[::-1], 'big' ) % l,
int.from_bytes(sec[::-1], 'big') % l,
self.privkey_len,
'big' )[::-1]
'big')[::-1]
def decode_addr(self,addr):
def decode_addr(self, addr):
from ...baseconv import baseconv
def b58dec(addr_str):
bc = baseconv('b58')
l = len(addr_str)
a = b''.join([bc.tobytes( addr_str[i*11:i*11+11], pad=8 ) for i in range(l//11)])
b = bc.tobytes( addr_str[-(l%11):], pad=5 )
a = b''.join([bc.tobytes(addr_str[i*11:i*11+11], pad=8) for i in range(l//11)])
b = bc.tobytes(addr_str[-(l%11):], pad=5)
return a + b
ret = b58dec(addr)
@ -70,7 +70,7 @@ class mainnet(CoinProtocol.DummyWIF,CoinProtocol.Base):
return self.decode_addr_bytes(ret[:-4])
def parse_addr(self,ver_bytes,addr_bytes,fmt):
def parse_addr(self, ver_bytes, addr_bytes, fmt):
addr_len = self.get_addr_len('monero')
return parsed_addr(
ver_bytes = ver_bytes,
@ -78,12 +78,12 @@ class mainnet(CoinProtocol.DummyWIF,CoinProtocol.Base):
payment_id = addr_bytes[addr_len:] if fmt == 'monero_integrated' else None,
)
def pubhash2addr(self,*args,**kwargs):
def pubhash2addr(self, *args, **kwargs):
raise NotImplementedError('Monero addresses do not support pubhash2addr()')
def viewkey(self,viewkey_str):
return MoneroViewKey.__new__(MoneroViewKey,viewkey_str)
def viewkey(self, viewkey_str):
return MoneroViewKey.__new__(MoneroViewKey, viewkey_str)
class testnet(mainnet): # use stagenet for testnet
# testnet is {'35','3f','36'}
addr_ver_info = { '18': 'monero', '24': 'monero_sub', '19': 'monero_integrated' }
# testnet is {'35', '3f', '36'}
addr_ver_info = {'18': 'monero', '24': 'monero_sub', '19': 'monero_integrated'}

View file

@ -13,7 +13,7 @@ proto.xmr.rpc: Monero base protocol RPC client class
"""
import re
from ...rpc import RPCClient,IPPort,auth_data
from ...rpc import RPCClient, IPPort, auth_data
class MoneroRPCClient(RPCClient):
@ -32,7 +32,7 @@ class MoneroRPCClient(RPCClient):
test_connection = True,
proxy = None,
daemon = None,
ignore_daemon_version = False ):
ignore_daemon_version = False):
self.proto = proto
@ -42,10 +42,10 @@ class MoneroRPCClient(RPCClient):
if host.endswith('.onion'):
self.network_proto = 'http'
super().__init__(cfg,host,port,test_connection)
super().__init__(cfg, host, port, test_connection)
if self.auth_type:
self.auth = auth_data(user,passwd)
self.auth = auth_data(user, passwd)
if True:
self.set_backend('requests')
@ -63,54 +63,54 @@ class MoneroRPCClient(RPCClient):
if ver_str:
self.daemon_version_str = ver_str
self.daemon_version = sum(
int(m) * (1000 ** n) for n,m in
enumerate(reversed(re.match(r'(\d+)\.(\d+)\.(\d+)\.(\d+)',ver_str).groups()))
int(m) * (1000 ** n) for n, m in
enumerate(reversed(re.match(r'(\d+)\.(\d+)\.(\d+)\.(\d+)', ver_str).groups()))
)
if self.daemon and self.daemon_version > self.daemon.coind_version:
self.handle_unsupported_daemon_version(
proto.name,
ignore_daemon_version or proto.ignore_daemon_version or self.cfg.ignore_daemon_version )
ignore_daemon_version or proto.ignore_daemon_version or self.cfg.ignore_daemon_version)
else: # restricted (public) node:
self.daemon_version_str = None
self.daemon_version = None
def call(self,method,*params,**kwargs):
def call(self, method, *params, **kwargs):
assert not params, f'{self.name}.call() accepts keyword arguments only'
return self.process_http_resp(self.backend.run_noasync(
payload = {'id': 0, 'jsonrpc': '2.0', 'method': method, 'params': kwargs },
payload = {'id': 0, 'jsonrpc': '2.0', 'method': method, 'params': kwargs},
timeout = 3600, # allow enough time to sync ≈1,000,000 blocks
host_path = '/json_rpc'
))
def call_raw(self,method,*params,**kwargs):
def call_raw(self, method, *params, **kwargs):
assert not params, f'{self.name}.call() accepts keyword arguments only'
return self.process_http_resp(self.backend.run_noasync(
payload = kwargs,
timeout = self.timeout,
host_path = f'/{method}'
),json_rpc=False)
), json_rpc=False)
async def do_stop_daemon(self,silent=False):
async def do_stop_daemon(self, silent=False):
return self.call_raw('stop_daemon') # unreliable on macOS (daemon stops, but closes connection)
rpcmethods = ( 'get_info', )
rpcmethods_raw = ( 'get_height', 'send_raw_transaction', 'stop_daemon' )
rpcmethods = ('get_info',)
rpcmethods_raw = ('get_height', 'send_raw_transaction', 'stop_daemon')
class MoneroWalletRPCClient(MoneroRPCClient):
auth_type = 'digest'
def __init__(self,cfg,daemon,test_connection=True):
def __init__(self, cfg, daemon, test_connection=True):
RPCClient.__init__(
self = self,
cfg = cfg,
host = 'localhost',
port = daemon.rpc_port,
test_connection = test_connection )
test_connection = test_connection)
self.daemon = daemon
self.auth = auth_data(daemon.user,daemon.passwd)
self.auth = auth_data(daemon.user, daemon.passwd)
self.set_backend('requests')
rpcmethods = (
@ -125,10 +125,10 @@ class MoneroWalletRPCClient(MoneroRPCClient):
'refresh', # start_height
)
def call_raw(self,*args,**kwargs):
def call_raw(self, *args, **kwargs):
raise NotImplementedError('call_raw() not implemented for class MoneroWalletRPCClient')
async def do_stop_daemon(self,silent=False):
async def do_stop_daemon(self, silent=False):
"""
NB: the 'stop_wallet' RPC call closes the open wallet before shutting down the daemon,
returning an error if no wallet is open
@ -136,7 +136,7 @@ class MoneroWalletRPCClient(MoneroRPCClient):
try:
return self.call('stop_wallet')
except Exception as e:
from ...util import msg,msg_r,ymsg
from ...util import msg, msg_r, ymsg
from ...color import yellow
msg(f'{type(e).__name__}: {e}')
msg_r(yellow('Unable to shut down wallet daemon gracefully, so killing process instead...'))