xmrwallet.py -> xmrwallet (23 files)

This commit is contained in:
The MMGen Project 2024-10-18 10:32:07 +00:00
commit 4daf293dc2
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
31 changed files with 2734 additions and 2275 deletions

View file

@ -343,15 +343,14 @@ class Signable:
summary_footer = ''
async def sign(self,f):
from .xmrwallet import MoneroMMGenTX,MoneroWalletOps,xmrwallet_uargs
tx1 = MoneroMMGenTX.Completed( self.parent.xmrwallet_cfg, f )
m = MoneroWalletOps.sign(
from . import xmrwallet
from .xmrwallet.file.tx import MoneroMMGenTX
tx1 = MoneroMMGenTX.Completed(self.parent.xmrwallet_cfg, f)
m = xmrwallet.op(
'sign',
self.parent.xmrwallet_cfg,
xmrwallet_uargs(
infile = str(self.parent.wallet_files[0]), # MMGen wallet file
wallets = str(tx1.src_wallet_idx),
spec = None ),
)
infile = str(self.parent.wallet_files[0]), # MMGen wallet file
wallets = str(tx1.src_wallet_idx))
tx2 = await m.main( f, restart_daemon=self.need_daemon_restart(m,tx1.src_wallet_idx) )
tx2.write(ask_write=False)
return tx2
@ -372,15 +371,13 @@ class Signable:
if not json.loads(f.read_text())['MoneroMMGenWalletOutputsFile']['data']['imported'])
async def sign(self,f):
from .xmrwallet import MoneroWalletOps,xmrwallet_uargs
wallet_idx = MoneroWalletOps.wallet.get_idx_from_fn(f)
m = MoneroWalletOps.import_outputs(
from . import xmrwallet
wallet_idx = xmrwallet.op_cls('wallet').get_idx_from_fn(f)
m = xmrwallet.op(
'import_outputs',
self.parent.xmrwallet_cfg,
xmrwallet_uargs(
infile = str(self.parent.wallet_files[0]), # MMGen wallet file
wallets = str(wallet_idx),
spec = None ),
)
infile = str(self.parent.wallet_files[0]), # MMGen wallet file
wallets = str(wallet_idx))
obj = await m.main(f, wallet_idx, restart_daemon=self.need_daemon_restart(m,wallet_idx))
obj.write(quiet=not obj.data.sign)
self.action_desc = 'imported and signed' if obj.data.sign else 'imported'
@ -802,16 +799,14 @@ class Autosign:
def xmr_setup(self):
def create_signing_wallets():
from .xmrwallet import MoneroWalletOps,xmrwallet_uargs
from . import xmrwallet
if len(self.wallet_files) > 1:
ymsg(f'Warning: more than one wallet file, using the first ({self.wallet_files[0]}) for xmrwallet generation')
m = MoneroWalletOps.create_offline(
m = xmrwallet.op(
'create_offline',
self.xmrwallet_cfg,
xmrwallet_uargs(
infile = str(self.wallet_files[0]), # MMGen wallet file
wallets = self.cfg.xmrwallets, # XMR wallet idxs
spec = None ),
)
infile = str(self.wallet_files[0]), # MMGen wallet file
wallets = self.cfg.xmrwallets) # XMR wallet idxs
asyncio.run(m.main())
asyncio.run(m.stop_wallet_daemon())

View file

@ -13,7 +13,7 @@ help.xmrwallet: xmrwallet help notes for MMGen suite
"""
def help(proto, cfg):
from ..xmrwallet import xmrwallet_uarg_info as uarg_info
from ..xmrwallet import uarg_info
return """
Many operations take an optional wallets argument: one or more address

View file

@ -16,12 +16,7 @@ import asyncio
from .cfg import gc, Config
from .util import die, fmt_dict
from .xmrwallet import (
MoneroWalletOps,
xmrwallet_uarg_info,
xmrwallet_uargs,
tx_priorities
)
from . import xmrwallet
opts_data = {
'sets': [
@ -94,11 +89,11 @@ opts_data = {
},
'code': {
'options': lambda cfg, s: s.format(
D=xmrwallet_uarg_info['daemon'].annot,
R=xmrwallet_uarg_info['tx_relay_daemon'].annot,
D=xmrwallet.uarg_info['daemon'].annot,
R=xmrwallet.uarg_info['tx_relay_daemon'].annot,
cfg=cfg,
gc=gc,
tp=fmt_dict(tx_priorities,fmt='equal_compact')
tp=fmt_dict(xmrwallet.tx_priorities, fmt='equal_compact')
),
'notes': lambda help_mod, s: s.format(
xmrwallet_help = help_mod('xmrwallet')
@ -112,7 +107,7 @@ cmd_args = cfg._args
if cmd_args and cfg.autosign and (
cmd_args[0] in (
MoneroWalletOps.kafile_arg_ops
xmrwallet.kafile_arg_ops
+ ('export-outputs', 'export-outputs-sign', 'import-key-images', 'txview', 'txlist')
)
or len(cmd_args) == 1 and cmd_args[0] in ('submit', 'resubmit', 'abort')
@ -148,9 +143,7 @@ elif op in ('export-outputs', 'export-outputs-sign', 'import-key-images'):
else:
die(1, f'{op!r}: unrecognized operation')
op_cls = getattr(MoneroWalletOps,op.replace('-','_'))
m = op_cls(cfg, xmrwallet_uargs(infile, wallets, spec))
m = xmrwallet.op(op, cfg, infile, wallets, spec)
if asyncio.run(m.main()):
m.post_main_success()

File diff suppressed because it is too large Load diff

117
mmgen/xmrwallet/__init__.py Executable file
View file

@ -0,0 +1,117 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.__init__: Monero wallet ops for the MMGen Suite
"""
import re, importlib
from collections import namedtuple
from ..proto.btc.common import b58a
from ..util import capfirst
tx_priorities = {
1: 'low',
2: 'normal',
3: 'high',
4: 'highest'
}
uargs = namedtuple('xmrwallet_uargs', [
'infile',
'wallets',
'spec',
])
uarg_info = (
lambda e, hp: {
'daemon': e('HOST:PORT', hp),
'tx_relay_daemon': e('HOST:PORT[:PROXY_IP:PROXY_PORT]', rf'({hp})(?::({hp}))?'),
'newaddr_spec': e('WALLET[:ACCOUNT][,"label text"]', r'(\d+)(?::(\d+))?(?:,(.*))?'),
'transfer_spec': e('SOURCE:ACCOUNT:ADDRESS,AMOUNT', rf'(\d+):(\d+):([{b58a}]+),([0-9.]+)'),
'sweep_spec': e('SOURCE:ACCOUNT[,DEST[:ACCOUNT]]', r'(\d+):(\d+)(?:,(\d+)(?::(\d+))?)?'),
'label_spec': e('WALLET:ACCOUNT:ADDRESS,"label text"', r'(\d+):(\d+):(\d+),(.*)'),
})(
namedtuple('uarg_info_entry', ['annot','pat']),
r'(?:[^:]+):(?:\d+)'
)
# canonical op names mapped to their respective modules:
op_names = {
'create': 'create',
'create_offline': 'create',
'sync': 'sync',
'list': 'view',
'view': 'view',
'listview': 'view',
'new': 'new',
'transfer': 'sweep',
'sweep': 'sweep',
'sweep_all': 'sweep',
'relay': 'relay',
'txview': 'txview',
'txlist': 'txview',
'label': 'label',
'sign': 'sign',
'submit': 'submit',
'resubmit': 'submit',
'abort': 'submit',
'dump': 'dump',
'restore': 'restore',
'export_outputs': 'export',
'export_outputs_sign': 'export',
'import_outputs': 'import',
'import_key_images': 'import',
'wallet': 'wallet', # virtual class
}
kafile_arg_ops = (
'create',
'sync',
'list',
'view',
'listview',
'label',
'new',
'transfer',
'sweep',
'sweep_all',
'dump',
'restore')
opts = (
'wallet_dir',
'daemon',
'tx_relay_daemon',
'use_internal_keccak_module',
'hash_preset',
'restore_height',
'no_start_wallet_daemon',
'no_stop_wallet_daemon',
'no_relay',
'watch_only',
'autosign',
'skip_empty_accounts',
'skip_empty_addresses')
pat_opts = ('daemon', 'tx_relay_daemon')
def op_cls(op_name):
def upper(m):
return m[1].upper()
clsname = 'Op' + capfirst(re.sub(r'_(.)', upper, op_name))
cls = getattr(importlib.import_module(f'.ops.{op_names[op_name]}', 'mmgen.xmrwallet'), clsname)
cls.name = op_name
return cls
def op(op, cfg, infile, wallets, spec=None):
return op_cls(op.replace('-', '_'))(cfg, uargs(infile, wallets, spec))

View file

@ -0,0 +1,64 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.file: Monero file base class for the MMGen Suite
"""
import json
from ...util import make_chksum_N
from ...fileutil import get_data_from_file
from ...rpc import json_encoder
class MoneroMMGenFile:
silent_load = False
def make_chksum(self, keys=None):
res = json.dumps(
dict((k, v) for k, v in self.data._asdict().items() if (not keys or k in keys)),
cls = json_encoder
)
return make_chksum_N(res, rounds=1, nchars=self.chksum_nchars, upper=False)
@property
def base_chksum(self):
return self.make_chksum(self.base_chksum_fields)
@property
def full_chksum(self):
return self.make_chksum(self.full_chksum_fields) if self.full_chksum_fields else None
def check_checksums(self, d_wrap):
for k in ('base_chksum', 'full_chksum'):
a = getattr(self, k)
if a is not None:
b = d_wrap[k]
assert a == b, f'{k} mismatch: {a} != {b}'
def make_wrapped_data(self, in_data):
out = {
'base_chksum': self.base_chksum,
'full_chksum': self.full_chksum,
'data': in_data,
} if self.full_chksum else {
'base_chksum': self.base_chksum,
'data': in_data,
}
return json.dumps(
{ self.data_label: out },
cls = json_encoder,
indent = 2,
)
def extract_data_from_file(self, cfg, fn):
return json.loads(
get_data_from_file(cfg, str(fn), self.desc, silent=self.silent_load)
)[self.data_label]

167
mmgen/xmrwallet/file/outputs.py Executable file
View file

@ -0,0 +1,167 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.file.outputs: Monero wallet outputs file class for the MMGen Suite
"""
import re
from collections import namedtuple
from pathlib import Path
from ...util import die, suf
from ...tx.util import get_autosign_obj
from . import MoneroMMGenFile
class MoneroWalletOutputsFile:
class Base(MoneroMMGenFile):
desc = 'wallet outputs'
data_label = 'MoneroMMGenWalletOutputsFile'
base_chksum_fields = {'seed_id', 'wallet_index', 'outputs_data_hex'}
full_chksum_fields = {'seed_id', 'wallet_index', 'outputs_data_hex', 'signed_key_images'}
fn_fs = '{a}-outputs-{b}.{c}'
ext_offset = 25 # len('-outputs-') + len(chksum) ({b})
chksum_nchars = 16
data_tuple = namedtuple('wallet_outputs_data', [
'seed_id',
'wallet_index',
'outputs_data_hex',
'signed_key_images',
'sign',
'imported',
])
def __init__(self, cfg):
self.name = type(self).__name__
self.cfg = cfg
def write(self, add_suf='', quiet=False):
from ...fileutil import write_data_to_file
write_data_to_file(
cfg = self.cfg,
outfile = str(self.get_outfile(self.cfg, self.wallet_fn)) + add_suf,
data = self.make_wrapped_data(self.data._asdict()),
desc = self.desc,
ask_overwrite = False,
quiet = quiet,
ignore_opt_outdir = True)
def get_outfile(self, cfg, wallet_fn):
return (
get_autosign_obj(cfg).xmr_outputs_dir if cfg.autosign else
wallet_fn.parent) / self.fn_fs.format(
a = wallet_fn.name,
b = self.base_chksum,
c = self.ext,
)
def get_wallet_fn(self, fn):
assert fn.name.endswith(f'.{self.ext}'), (
f'{self.name}: filename does not end with {"."+self.ext!r}'
)
return fn.parent / fn.name[:-(len(self.ext)+self.ext_offset+1)]
def get_info(self, indent=''):
if self.data.signed_key_images is not None:
data = self.data.signed_key_images or []
return f'{indent}{self.wallet_fn.name}: {len(data)} signed key image{suf(data)}'
else:
return f'{indent}{self.wallet_fn.name}: no key images'
class New(Base):
ext = 'raw'
def __init__(self, parent, wallet_fn, data, wallet_idx=None, sign=False):
super().__init__(parent.cfg)
self.wallet_fn = wallet_fn
init_data = dict.fromkeys(self.data_tuple._fields)
init_data.update({
'seed_id': parent.kal.al_id.sid,
'wallet_index': wallet_idx or parent.get_idx_from_fn(wallet_fn),
})
if sign:
init_data['sign'] = sign
init_data.update({k:v for k, v in data.items() if k in init_data})
self.data = self.data_tuple(**init_data)
class Completed(New):
def __init__(self, parent, fn=None, wallet_fn=None):
def check_equal(desc, a, b):
assert a == b, f'{desc} mismatch: {a} (from file) != {b} (from filename)'
fn = fn or self.get_outfile(parent.cfg, wallet_fn)
wallet_fn = wallet_fn or self.get_wallet_fn(fn)
d_wrap = self.extract_data_from_file(parent.cfg, fn)
data = d_wrap['data']
check_equal('Seed ID', data['seed_id'], parent.kal.al_id.sid)
wallet_idx = parent.get_idx_from_fn(wallet_fn)
check_equal('Wallet index', data['wallet_index'], wallet_idx)
super().__init__(
parent = parent,
wallet_fn = wallet_fn,
data = data,
wallet_idx = wallet_idx,
)
self.check_checksums(d_wrap)
@classmethod
def find_fn_from_wallet_fn(cls, cfg, wallet_fn, ret_on_no_match=False):
path = get_autosign_obj(cfg).xmr_outputs_dir or Path()
pat = cls.fn_fs.format(
a = wallet_fn.name,
b = f'[0-9a-f]{{{cls.chksum_nchars}}}\\',
c = cls.ext,
)
matches = [f for f in path.iterdir() if re.match(pat, f.name)]
if not matches and ret_on_no_match:
return None
if not matches or len(matches) > 1:
die(2, "{a} matching pattern {b!r} found in '{c}'!".format(
a = 'No files' if not matches else 'More than one file',
b = pat,
c = path
))
return matches[0]
class Unsigned(Completed):
pass
class SignedNew(New):
desc = 'signed key images'
ext = 'sig'
class Signed(Completed, SignedNew):
pass
class MoneroWalletDumpFile:
class Base:
desc = 'Monero wallet dump'
data_label = 'MoneroMMGenWalletDumpFile'
base_chksum_fields = {'seed_id', 'wallet_index', 'wallet_metadata'}
full_chksum_fields = None
ext = 'dump'
ext_offset = 0
data_tuple = namedtuple('wallet_dump_data', [
'seed_id',
'wallet_index',
'wallet_metadata',
])
def get_outfile(self, cfg, wallet_fn):
return wallet_fn.parent / f'{wallet_fn.name}.{self.ext}'
class New(Base, MoneroWalletOutputsFile.New):
pass
class Completed(Base, MoneroWalletOutputsFile.Completed):
pass

326
mmgen/xmrwallet/file/tx.py Executable file
View file

@ -0,0 +1,326 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.file.tx: Monero transaction file class for the MMGen Suite
"""
import time
from collections import namedtuple
from pathlib import Path
from ...obj import CoinTxID, Int
from ...color import red, yellow, blue, cyan, pink, orange, purple
from ...util import die, fmt, make_timestr, list_gen
from ...seed import SeedID
from ...protocol import init_proto
from ...addr import CoinAddr
from ...tx.util import get_autosign_obj
from ..include import XMRWalletAddrSpec
from . import MoneroMMGenFile
class MoneroMMGenTX:
class Base(MoneroMMGenFile):
data_label = 'MoneroMMGenTX'
# both base_chksum and full_chksum are used to make the filename stem, so we must not include
# fields that change when TX is signed and submitted (e.g. ‘sign_time’, ‘submit_time’)
base_chksum_fields = {
'op',
'create_time',
'network',
'seed_id',
'source',
'dest',
'amount' }
full_chksum_fields = {
'op',
'create_time',
'network',
'seed_id',
'source',
'dest',
'amount',
'fee',
'blob' }
oneline_fs = '{a:7} {b:8} {c:19} {d:13} {e:9} {f:6} {x:2} {g:6} {h:17} {j}'
oneline_fixed_cols_w = 96 # width of all columns except the last (coin address)
chksum_nchars = 6
xmrwallet_tx_data = namedtuple('xmrwallet_tx_data', [
'op',
'create_time',
'sign_time',
'submit_time',
'network',
'seed_id',
'source',
'dest',
'dest_address',
'txid',
'amount',
'priority',
'fee',
'blob',
'metadata',
'unsigned_txset',
'signed_txset',
'complete',
])
def __init__(self):
self.name = type(self).__name__
@property
def src_wallet_idx(self):
return int(self.data.source.split(':')[0])
def get_info_oneline(self, indent='', addr_w=None):
d = self.data
return self.oneline_fs.format(
a = yellow(d.network),
b = d.seed_id.hl(),
c = make_timestr(d.submit_time if d.submit_time is not None else d.create_time),
d = orange(self.file_id),
e = purple(d.op.ljust(9)),
f = red('{}:{}'.format(d.source.wallet, d.source.account).ljust(6)),
g = red('{}:{}'.format(d.dest.wallet, d.dest.account).ljust(6)) if d.dest else cyan('ext '),
h = d.amount.fmt(color=True, iwidth=4, prec=12),
j = d.dest_address.fmt(0, width=addr_w, color=True) if addr_w else d.dest_address.hl(0),
x = '->'
)
def get_info(self, indent='', addr_w=None):
d = self.data
pmt_id = d.dest_address.parsed.payment_id
fs = '\n'.join(list_gen(
['Info for transaction {a} [Seed ID: {b}. Network: {c}]:'],
[' TxID: {d}'],
[' Created: {e:19} [{f}]'],
[' Signed: {g:19} [{h}]', d.sign_time],
[' Submitted: {s:19} [{t}]', d.submit_time],
[' Type: {i}{S}'],
[' From: wallet {j}, account {k}'],
[' To: wallet {x}, account {y}, address {z}', d.dest],
[' Amount: {m} XMR'],
[' Priority: {F}', d.priority],
[' Fee: {n} XMR'],
[' Dest: {o}'],
[' Size: {Z} bytes', d.signed_txset],
[' Payment ID: {P}', pmt_id],
))
from ...util2 import format_elapsed_hr
from ..ops import addr_width
from .. import tx_priorities
return fmt(fs, strip_char='\t', indent=indent).format(
a = orange(self.file_id),
b = d.seed_id.hl(),
c = yellow(d.network.upper()),
d = d.txid.hl(),
e = make_timestr(d.create_time),
f = format_elapsed_hr(d.create_time),
g = make_timestr(d.sign_time) if d.sign_time else None,
h = format_elapsed_hr(d.sign_time) if d.sign_time else None,
i = blue(d.op),
j = d.source.wallet.hl(),
k = red(f'#{d.source.account}'),
m = d.amount.hl(),
F = (Int(d.priority).hl() + f' [{tx_priorities[d.priority]}]') if d.priority else None,
n = d.fee.hl(),
o = d.dest_address.hl(0) if self.cfg.full_address
else d.dest_address.fmt(0, width=addr_width, color=True),
P = pink(pmt_id.hex()) if pmt_id else None,
s = make_timestr(d.submit_time) if d.submit_time else None,
S = pink(f" [cold signed{', submitted' if d.complete else ''}]") if d.signed_txset else '',
t = format_elapsed_hr(d.submit_time) if d.submit_time else None,
x = d.dest.wallet.hl() if d.dest else None,
y = red(f'#{d.dest.account}') if d.dest else None,
z = red(f'#{d.dest.account_address}') if d.dest else None,
Z = Int(len(d.signed_txset) // 2).hl() if d.signed_txset else None,
)
@property
def file_id(self):
return (self.base_chksum + ('-' + self.full_chksum if self.full_chksum else '')).upper()
def write(self, delete_metadata=False, ask_write=True, ask_overwrite=True):
dict_data = self.data._asdict()
if delete_metadata:
dict_data['metadata'] = None
fn = '{a}-XMR[{b!s}]{c}.{d}'.format(
a = self.file_id,
b = self.data.amount,
c = '' if self.data.network == 'mainnet' else f'.{self.data.network}',
d = self.ext
)
if self.cfg.autosign:
fn = get_autosign_obj(self.cfg).xmr_tx_dir / fn
from ...fileutil import write_data_to_file
write_data_to_file(
cfg = self.cfg,
outfile = str(fn),
data = self.make_wrapped_data(dict_data),
desc = self.desc,
ask_write = ask_write,
ask_write_default_yes = not ask_write,
ask_overwrite = ask_overwrite,
ignore_opt_outdir = self.cfg.autosign)
class New(Base):
def __init__(self, *args, **kwargs):
super().__init__()
assert not args, 'Non-keyword args not permitted'
if '_in_tx' in kwargs:
in_data = kwargs.pop('_in_tx').data._asdict()
in_data.update(kwargs)
else:
in_data = kwargs
d = namedtuple('monero_tx_in_data_tuple', in_data)(**in_data)
self.cfg = d.cfg
proto = init_proto(self.cfg, 'xmr', network=d.network, need_amt=True)
now = int(time.time())
self.data = self.xmrwallet_tx_data(
op = d.op,
create_time = now if self.name in ('NewSigned', 'NewUnsigned') else getattr(d, 'create_time', None),
sign_time = now if self.name in ('NewSigned', 'NewColdSigned') else getattr(d, 'sign_time', None),
submit_time = now if self.name == 'NewSubmitted' else None,
network = d.network,
seed_id = SeedID(sid=d.seed_id),
source = XMRWalletAddrSpec(d.source),
dest = None if d.dest is None else XMRWalletAddrSpec(d.dest),
dest_address = CoinAddr(proto, d.dest_address),
txid = CoinTxID(d.txid),
amount = d.amount,
priority = self.cfg.priority if self.name in ('NewSigned', 'NewUnsigned') else d.priority,
fee = d.fee,
blob = d.blob,
metadata = d.metadata,
unsigned_txset = d.unsigned_txset,
signed_txset = getattr(d, 'signed_txset', None),
complete = self.name in ('NewSigned', 'NewSubmitted'),
)
class NewUnsigned(New):
desc = 'unsigned transaction'
ext = 'rawtx'
signed = False
class NewSigned(New):
desc = 'signed transaction'
ext = 'sigtx'
signed = True
class NewColdSigned(NewSigned):
pass
class NewSubmitted(NewColdSigned):
desc = 'submitted transaction'
ext = 'subtx'
class Completed(Base):
desc = 'transaction'
forbidden_fields = ()
def __init__(self, cfg, fn):
super().__init__()
self.cfg = cfg
self.fn = Path(fn)
try:
d_wrap = self.extract_data_from_file(cfg, fn)
except Exception as e:
die('MoneroMMGenTXFileParseError', f'{type(e).__name__}: {e}\nCould not load transaction file')
if 'unsigned_txset' in d_wrap['data']: # post-autosign
self.full_chksum_fields &= set(d_wrap['data']) # allow for added chksum fields in future
else:
self.full_chksum_fields = set(d_wrap['data']) - {'metadata'}
for key in self.xmrwallet_tx_data._fields: # backwards compat: fill in missing fields
if not key in d_wrap['data']:
d_wrap['data'][key] = None
d = self.xmrwallet_tx_data(**d_wrap['data'])
if self.name not in ('View', 'Completed'):
assert fn.name.endswith('.'+self.ext), 'TX file {fn} has incorrect extension (not {self.ext!r})'
assert getattr(d, self.req_field), f'{self.name} TX missing required field {self.req_field!r}'
assert bool(d.sign_time) == self.signed, '{a} has {b}sign time!'.format(
a = self.desc,
b = 'no ' if self.signed else'')
for f in self.forbidden_fields:
assert not getattr(d, f), f'{self.name} TX mismatch: contains forbidden field {f!r}'
proto = init_proto(cfg, 'xmr', network=d.network, need_amt=True)
self.data = self.xmrwallet_tx_data(
op = d.op,
create_time = d.create_time,
sign_time = d.sign_time,
submit_time = d.submit_time,
network = d.network,
seed_id = SeedID(sid=d.seed_id),
source = XMRWalletAddrSpec(d.source),
dest = None if d.dest is None else XMRWalletAddrSpec(d.dest),
dest_address = CoinAddr(proto, d.dest_address),
txid = CoinTxID(d.txid),
amount = proto.coin_amt(d.amount),
priority = d.priority,
fee = proto.coin_amt(d.fee),
blob = d.blob,
metadata = d.metadata,
unsigned_txset = d.unsigned_txset,
signed_txset = d.signed_txset,
complete = d.complete,
)
self.check_checksums(d_wrap)
class Unsigned(Completed):
desc = 'unsigned transaction'
ext = 'rawtx'
signed = False
req_field = 'unsigned_txset'
forbidden_fields = ('signed_txset',)
class Signed(Completed):
desc = 'signed transaction'
ext = 'sigtx'
signed = True
req_field = 'blob'
forbidden_fields = ('signed_txset', 'unsigned_txset')
class ColdSigned(Signed):
req_field = 'signed_txset'
forbidden_fields = ()
class Submitted(ColdSigned):
desc = 'submitted transaction'
ext = 'subtx'
silent_load = True
class View(Completed):
silent_load = True

86
mmgen/xmrwallet/include.py Executable file
View file

@ -0,0 +1,86 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.include: Monero wallet shared data for the MMGen Suite
"""
import re
from ..objmethods import MMGenObject, HiliteStr, InitErrors
from ..color import red, green, pink
from ..addr import CoinAddr, AddrIdx
from ..util import die
def gen_acct_addr_info(self, wallet_data, account, indent=''):
fs = indent + '{I:<3} {A} {U} {B} {L}'
addrs_data = wallet_data.addrs_data[account]['addresses']
for d in addrs_data:
d['unlocked_balance'] = 0
if 'per_subaddress' in wallet_data.bals_data:
for d in wallet_data.bals_data['per_subaddress']:
if d['account_index'] == account:
addrs_data[d['address_index']]['unlocked_balance'] = d['unlocked_balance']
from .ops import addr_width
yield fs.format(
I = '',
A = 'Address'.ljust(addr_width),
U = 'Used'.ljust(5),
B = ' Unlocked Balance',
L = 'Label')
for addr in addrs_data:
ca = CoinAddr(self.proto, addr['address'])
bal = addr['unlocked_balance']
if self.cfg.skip_empty_addresses and addr['used'] and not bal:
continue
from .ops import fmt_amt
yield fs.format(
I = addr['address_index'],
A = ca.hl(0) if self.cfg.full_address else ca.fmt(0, color=True, width=addr_width),
U = (red('True ') if addr['used'] else green('False')),
B = fmt_amt(bal),
L = pink(addr['label']))
class XMRWalletAddrSpec(HiliteStr, InitErrors, MMGenObject):
color = 'cyan'
width = 0
trunc_ok = False
min_len = 5 # 1:0:0
max_len = 14 # 9999:9999:9999
def __new__(cls, arg1, arg2=None, arg3=None):
if isinstance(arg1, cls):
return arg1
try:
if isinstance(arg1, str):
me = str.__new__(cls, arg1)
m = re.fullmatch('({n}):({n}):({n}|None)'.format(n=r'[0-9]{1,4}'), arg1)
assert m is not None, f'{arg1!r}: invalid XMRWalletAddrSpec'
for e in m.groups():
if len(e) != 1 and e[0] == '0':
die(2, f'{e}: leading zeroes not permitted in XMRWalletAddrSpec element')
me.wallet = AddrIdx(m[1])
me.account = int(m[2])
me.account_address = None if m[3] == 'None' else int(m[3])
else:
me = str.__new__(cls, f'{arg1}:{arg2}:{arg3}')
for arg in [arg1, arg2] + ([] if arg3 is None else [arg3]):
assert isinstance(arg, int), f'{arg}: XMRWalletAddrSpec component not of type int'
assert arg is None or arg <= 9999, f'{arg}: XMRWalletAddrSpec component greater than 9999'
me.wallet = AddrIdx(arg1)
me.account = arg2
me.account_address = arg3
return me
except Exception as e:
return cls.init_fail(e, me)

142
mmgen/xmrwallet/ops/__init__.py Executable file
View file

@ -0,0 +1,142 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.__init__: Monero wallet ops for the MMGen Suite
"""
import re, atexit
from ...color import blue
from ...util import msg, die, fmt
from ...protocol import init_proto
from ...tx.util import get_autosign_obj
from ... import xmrwallet
from .. import uarg_info
# required to squelch pylint:
def fmt_amt(amt):
return str(amt)
def hl_amt(amt):
return str(amt)
class OpBase:
opts = ('wallet_dir',)
trust_monerod = False
do_umount = True
name = None
def __init__(self, cfg, uarg_tuple):
def gen_classes():
for cls in type(self).__mro__:
if not cls.__name__.startswith('OpMixin'):
yield cls
if cls.__name__ == 'OpBase':
break
self.cfg = cfg
classes = tuple(gen_classes())
self.opts = tuple(set(opt for cls in classes for opt in xmrwallet.opts))
if not hasattr(self, 'stem'):
self.stem = self.name
global fmt_amt, hl_amt, addr_width
self.uargs = uarg_tuple
def fmt_amt(amt):
return self.proto.coin_amt(amt, from_unit='atomic').fmt(iwidth=5, prec=12, color=True)
def hl_amt(amt):
return self.proto.coin_amt(amt, from_unit='atomic').hl()
addr_width = 95 if self.cfg.full_address else 17
self.proto = init_proto(cfg, 'xmr', network=self.cfg.network, need_amt=True)
id_cur = None
for cls in classes:
if id(cls.check_uopts) != id_cur:
cls.check_uopts(self)
id_cur = id(cls.check_uopts)
id_cur = None
for cls in classes:
if id(cls.pre_init_action) != id_cur:
cls.pre_init_action(self)
id_cur = id(cls.pre_init_action)
if cfg.autosign:
self.asi = get_autosign_obj(cfg)
def check_uopts(self):
def check_pat_opt(name):
val = getattr(self.cfg, name)
if not re.fullmatch(uarg_info[name].pat, val, re.ASCII):
die(1, '{!r}: invalid value for --{}: it must have format {!r}'.format(
val,
name.replace('_', '-'),
uarg_info[name].annot
))
for attr in self.cfg.__dict__:
if attr in xmrwallet.opts and not attr in self.opts:
die(1, 'Option --{} not supported for {!r} operation'.format(
attr.replace('_', '-'),
self.name,
))
for opt in xmrwallet.pat_opts:
if getattr(self.cfg, opt, None):
check_pat_opt(opt)
def parse_tx_relay_opt(self):
return re.fullmatch(
uarg_info['tx_relay_daemon'].pat,
self.cfg.tx_relay_daemon,
re.ASCII)
def display_tx_relay_info(self, indent=''):
m = self.parse_tx_relay_opt()
msg(fmt(f"""
TX relay info:
Host: {blue(m[1])}
Proxy: {blue(m[2] or 'None')}
""", strip_char='\t', indent=indent))
def mount_removable_device(self):
if self.cfg.autosign:
if not self.asi.device_inserted:
die(1, 'Removable device not present!')
if self.do_umount:
atexit.register(lambda: self.asi.do_umount())
self.asi.do_mount()
self.post_mount_action()
def pre_init_action(self):
pass
def post_main_success(self):
pass
def post_main_failure(self):
pass
async def stop_wallet_daemon(self):
pass
def post_mount_action(self):
pass

84
mmgen/xmrwallet/ops/create.py Executable file
View file

@ -0,0 +1,84 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.create: Monero wallet ops for the MMGen Suite
"""
from pathlib import Path
from ...util import msg, msg_r, gmsg, pp_msg, die
from ...addrlist import ViewKeyAddrList
from .wallet import OpWallet
class OpCreate(OpWallet):
stem = 'creat'
wallet_exists = False
opts = ('restore_height',)
def check_uopts(self):
if self.cfg.restore_height != 'current':
if int(self.cfg.restore_height or 0) < 0:
die(1, f'{self.cfg.restore_height}: invalid value for --restore-height (less than zero)')
async def process_wallet(self, d, fn, last):
msg_r('') # for pexpect
if self.cfg.restore_height == 'current':
restore_height = self.get_coin_daemon_rpc().call_raw('get_height')['height']
else:
restore_height = self.cfg.restore_height
if self.cfg.watch_only:
ret = self.c.call(
'generate_from_keys',
filename = fn.name,
password = d.wallet_passwd,
address = d.addr,
viewkey = d.viewkey,
restore_height = restore_height)
else:
from ...xmrseed import xmrseed
ret = self.c.call(
'restore_deterministic_wallet',
filename = fn.name,
password = d.wallet_passwd,
seed = xmrseed().fromhex(d.sec.wif, tostr=True),
restore_height = restore_height,
language = 'English')
pp_msg(ret) if self.cfg.debug else msg(f' Address: {ret["address"]}')
return True
class OpCreateOffline(OpCreate):
def __init__(self, cfg, uarg_tuple):
super().__init__(cfg, uarg_tuple)
gmsg('\nCreating viewkey-address file for watch-only wallets')
vkal = ViewKeyAddrList(
cfg = self.cfg,
proto = self.proto,
addrfile = None,
addr_idxs = self.uargs.wallets,
seed = self.seed_src.seed,
skip_chksum_msg = True)
vkf = vkal.file
# before writing viewkey-address file, shred any old ones in the directory:
for f in Path(self.asi.xmr_dir).iterdir():
if f.name.endswith(vkf.ext):
from ...fileutil import shred_file
msg(f"\nShredding old viewkey-address file '{f}'")
shred_file(f, verbose=self.cfg.verbose)
vkf.write(outdir=self.asi.xmr_dir)

35
mmgen/xmrwallet/ops/dump.py Executable file
View file

@ -0,0 +1,35 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.dump: Monero wallet ops for the MMGen Suite
"""
from ...util import msg
from ..file.outputs import MoneroWalletDumpFile
from ..rpc import MoneroWalletRPC
from .wallet import OpWallet
class OpDump(OpWallet):
wallet_offline = True
async def process_wallet(self, d, fn, last):
h = MoneroWalletRPC(self, d)
h.open_wallet('source')
wallet_data = h.get_wallet_data(print=False)
msg('')
MoneroWalletDumpFile.New(
parent = self,
wallet_fn = fn,
data = {'wallet_metadata': wallet_data.addrs_data}
).write()
return True

60
mmgen/xmrwallet/ops/export.py Executable file
View file

@ -0,0 +1,60 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.export: Monero wallet ops for the MMGen Suite
"""
from ...util import gmsg, gmsg_r
from ..file.outputs import MoneroWalletOutputsFile
from ..rpc import MoneroWalletRPC
from .wallet import OpWallet
class OpExportOutputs(OpWallet):
action = 'exporting outputs from'
stem = 'process'
sign = False
async def process_wallet(self, d, fn, last):
h = MoneroWalletRPC(self, d)
h.open_wallet('source')
if self.cfg.rescan_blockchain:
gmsg_r('\n Rescanning blockchain...')
self.c.call('rescan_blockchain')
gmsg('done')
if self.cfg.rescan_spent:
gmsg_r('\n Rescanning spent outputs...')
self.c.call('rescan_spent')
gmsg('done')
self.head_msg(d.idx, h.fn)
for ftype in ('Unsigned', 'Signed'):
old_fn = getattr(MoneroWalletOutputsFile, ftype).find_fn_from_wallet_fn(
cfg = self.cfg,
wallet_fn = fn,
ret_on_no_match = True)
if old_fn:
old_fn.unlink()
m = MoneroWalletOutputsFile.New(
parent = self,
wallet_fn = fn,
data = self.c.call('export_outputs', all=True),
sign = self.sign,
)
m.write()
return True
class OpExportOutputsSign(OpExportOutputs):
opts = ('rescan_spent', 'rescan_blockchain')
sign = True

77
mmgen/xmrwallet/ops/import.py Executable file
View file

@ -0,0 +1,77 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.import: Monero wallet ops for the MMGen Suite
"""
from ...util import msg, bmsg, die, suf
from ..file.outputs import MoneroWalletOutputsFile
from ..rpc import MoneroWalletRPC
from .wallet import OpWallet
class OpImportOutputs(OpWallet):
action = 'importing wallet outputs into'
start_daemon = False
async def main(self, fn, wallet_idx, restart_daemon=True):
if restart_daemon:
await self.restart_wallet_daemon()
h = MoneroWalletRPC(self, self.addr_data[0])
self.head_msg(wallet_idx, fn)
if restart_daemon:
h.open_wallet(refresh=False)
m = MoneroWalletOutputsFile.Unsigned(
parent = self,
fn = fn)
res = self.c.call(
'import_outputs',
outputs_data_hex = m.data.outputs_data_hex)
idata = res['num_imported']
bmsg(f'\n {idata} output{suf(idata)} imported')
if m.data.sign:
data = m.data._asdict()
data.update(self.c.call('export_key_images', all=True))
m = MoneroWalletOutputsFile.SignedNew(
parent = self,
wallet_fn = m.get_wallet_fn(fn),
data = data)
idata = m.data.signed_key_images or []
bmsg(f' {len(idata)} key image{suf(idata)} signed')
else:
m.data = m.data._replace(imported=True)
return m
class OpImportKeyImages(OpWallet):
action = 'importing key images into'
stem = 'process'
trust_monerod = True
def post_main_failure(self):
rw_msg = ' for requested wallets' if self.uargs.wallets else ''
die(2, f'No signed key image files found{rw_msg}!')
async def process_wallet(self, d, fn, last):
keyimage_fn = MoneroWalletOutputsFile.Signed.find_fn_from_wallet_fn(self.cfg, fn, ret_on_no_match=True)
if not keyimage_fn:
msg(f'No signed key image file found for wallet #{d.idx}')
return False
h = MoneroWalletRPC(self, d)
h.open_wallet()
self.head_msg(d.idx, h.fn)
m = MoneroWalletOutputsFile.Signed(parent=self, fn=keyimage_fn)
data = m.data.signed_key_images or []
bmsg(f'\n {len(data)} signed key image{suf(data)} to import')
if data:
res = self.c.call('import_key_images', signed_key_images=data)
bmsg(f' Success: {res}')
return True

82
mmgen/xmrwallet/ops/label.py Executable file
View file

@ -0,0 +1,82 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.label: Monero wallet ops for the MMGen Suite
"""
from ...color import red, pink, cyan, gray
from ...util import msg, ymsg, gmsg, die, make_timestr
from ...ui import keypress_confirm
from ...addr import CoinAddr
from ..rpc import MoneroWalletRPC
from .spec import OpMixinSpec
from .wallet import OpWallet
class OpLabel(OpMixinSpec, OpWallet):
spec_id = 'label_spec'
spec_key = ((1, 'source'),)
opts = ()
wallet_offline = True
async def main(self):
gmsg('\n{a} label for wallet {b}, account #{c}, address #{d}'.format(
a = 'Setting' if self.label else 'Removing',
b = self.source.idx,
c = self.account,
d = self.address_idx
))
h = MoneroWalletRPC(self, self.source)
h.open_wallet('source')
wallet_data = h.get_wallet_data()
max_acct = len(wallet_data.accts_data['subaddress_accounts']) - 1
if self.account > max_acct:
die(2, f'{self.account}: requested account index out of bounds (>{max_acct})')
ret = h.print_acct_addrs(wallet_data, self.account)
if self.address_idx > len(ret) - 1:
die(2, '{}: requested address index out of bounds (>{})'.format(
self.address_idx,
len(ret) - 1))
addr = ret[self.address_idx]
new_label = f'{self.label} [{make_timestr()}]' if self.label else ''
ca = CoinAddr(self.proto, addr['address'])
from . import addr_width
msg('\n {a} {b}\n {c} {d}\n {e} {f}'.format(
a = 'Address: ',
b = ca.hl(0) if self.cfg.full_address else ca.fmt(0, color=True, width=addr_width),
c = 'Existing label:',
d = pink(addr['label']) if addr['label'] else gray('[none]'),
e = 'New label: ',
f = pink(new_label) if new_label else gray('[none]')))
op = 'remove' if not new_label else 'update' if addr['label'] else 'set'
if addr['label'] == new_label:
ymsg('\nLabel is unchanged, operation cancelled')
elif keypress_confirm(self.cfg, f' {op.capitalize()} label?'):
h.set_label(self.account, self.address_idx, new_label)
ret = h.print_acct_addrs(h.get_wallet_data(print=False), self.account)
label_chk = ret[self.address_idx]['label']
if label_chk != new_label:
ymsg(f'Warning: new label {label_chk!r} does not match requested value!')
return False
else:
msg(cyan('\nLabel successfully {}'.format('set' if op == 'set' else op+'d')))
else:
ymsg('\nOperation cancelled by user request')

69
mmgen/xmrwallet/ops/new.py Executable file
View file

@ -0,0 +1,69 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.new: Monero wallet ops for the MMGen Suite
"""
from ...color import red, pink
from ...util import msg, ymsg, make_timestr
from ...ui import keypress_confirm
from ..rpc import MoneroWalletRPC
from .spec import OpMixinSpec
from .wallet import OpWallet
class OpNew(OpMixinSpec, OpWallet):
spec_id = 'newaddr_spec'
spec_key = ((1, 'source'),)
wallet_offline = True
async def main(self):
h = MoneroWalletRPC(self, self.source)
h.open_wallet('Monero')
desc = 'account' if self.account is None else 'address'
label = (
None if self.label == '' else
'{} [{}]'.format(self.label or f'xmrwallet new {desc}', make_timestr()))
wallet_data = h.get_wallet_data()
if desc == 'address':
h.print_acct_addrs(wallet_data, self.account)
if keypress_confirm(
self.cfg,
'\nCreating new {a} for wallet {b}{c} with {d}\nOK?'.format(
a = desc,
b = red(str(self.source.idx)),
c = '' if desc == 'account' else f', account {red("#"+str(self.account))}',
d = 'label ' + pink(''+label+'') if label else 'empty label')
):
if desc == 'address':
h.create_new_addr(self.account, label=label)
else:
h.create_acct(label=label)
wallet_data = h.get_wallet_data(print=desc=='account')
if desc == 'address':
h.print_acct_addrs(wallet_data, self.account)
else:
ymsg('\nOperation cancelled by user request')
# wallet must be left open: otherwise the 'stop_wallet' RPC call used to stop the daemon will fail
if self.cfg.no_stop_wallet_daemon:
h.close_wallet('Monero')
msg('')

87
mmgen/xmrwallet/ops/relay.py Executable file
View file

@ -0,0 +1,87 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.relay: Monero wallet ops for the MMGen Suite
"""
import time
from pathlib import Path
from ...util import msg, msg_r, gmsg, ymsg, die
from ...ui import keypress_confirm
from ...proto.xmr.rpc import MoneroRPCClient
from ..file.tx import MoneroMMGenTX
from . import OpBase
class OpRelay(OpBase):
opts = ('tx_relay_daemon',)
def __init__(self, cfg, uarg_tuple):
super().__init__(cfg, uarg_tuple)
self.mount_removable_device()
self.tx = MoneroMMGenTX.Signed(self.cfg, Path(self.uargs.infile))
if self.cfg.tx_relay_daemon:
m = self.parse_tx_relay_opt()
host, port = m[1].split(':')
proxy = m[2]
md = None
else:
from ...daemon import CoinDaemon
md = CoinDaemon(self.cfg, 'xmr', test_suite=self.cfg.test_suite)
host, port = ('localhost', md.rpc_port)
proxy = None
self.dc = MoneroRPCClient(
cfg = self.cfg,
proto = self.proto,
daemon = md,
host = host,
port = int(port),
user = None,
passwd = None,
test_connection = host == 'localhost', # avoid extra connections if relay is a public node
proxy = proxy)
async def main(self):
msg('\n' + self.tx.get_info(indent=' '))
if self.cfg.tx_relay_daemon:
self.display_tx_relay_info(indent=' ')
if keypress_confirm(self.cfg, 'Relay transaction?'):
if self.cfg.tx_relay_daemon:
msg_r('Relaying transaction to remote daemon, please be patient...')
t_start = time.time()
res = self.dc.call_raw(
'send_raw_transaction',
tx_as_hex = self.tx.data.blob
)
if res['status'] == 'OK':
if res['not_relayed']:
msg('not relayed')
ymsg('Transaction not relayed')
else:
msg('success')
if self.cfg.tx_relay_daemon:
from ...util2 import format_elapsed_hr
msg(f'Relay time: {format_elapsed_hr(t_start, rel_now=False, show_secs=True)}')
gmsg('OK')
return True
else:
die('RPCFailure', repr(res))
else:
die(1, 'Exiting at user request')

105
mmgen/xmrwallet/ops/restore.py Executable file
View file

@ -0,0 +1,105 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.restore: Monero wallet ops for the MMGen Suite
"""
from ...util import msg, gmsg, bmsg, ymsg, rmsg, die
from ..file.outputs import MoneroWalletDumpFile
from ..rpc import MoneroWalletRPC
from .create import OpCreate
class OpRestore(OpCreate):
wallet_offline = True
def check_uopts(self):
if self.cfg.restore_height is not None:
die(1, '--restore-height must be unset when running the ‘restore’ command')
async def process_wallet(self, d, fn, last):
def get_dump_data():
def gen():
for fn in [self.get_wallet_fn(d, watch_only=wo) for wo in (True, False)]:
ret = fn.parent / (fn.name + '.dump')
if ret.exists():
yield ret
dump_fns = tuple(gen())
if not dump_fns:
die(1, f"No suitable dump file found for '{fn}'")
elif len(dump_fns) > 1:
ymsg(f"Warning: more than one dump file found for '{fn}' - using the first!")
return MoneroWalletDumpFile.Completed(
parent = self,
fn = dump_fns[0]).data._asdict()['wallet_metadata']
def restore_accounts():
bmsg(' Restoring accounts:')
for acct_idx, acct_data in enumerate(data[1:], 1):
msg(fs.format(acct_idx, 0, acct_data['address']))
self.c.call('create_account')
def restore_subaddresses():
bmsg(' Restoring subaddresses:')
for acct_idx, acct_data in enumerate(data):
for addr_idx, addr_data in enumerate(acct_data['addresses'][1:], 1):
msg(fs.format(acct_idx, addr_idx, addr_data['address']))
self.c.call('create_address', account_index=acct_idx)
def restore_labels():
bmsg(' Restoring labels:')
for acct_idx, acct_data in enumerate(data):
for addr_idx, addr_data in enumerate(acct_data['addresses']):
addr_data['used'] = False # do this so that restored data matches
msg(fs.format(acct_idx, addr_idx, addr_data['label']))
self.c.call(
'label_address',
index = { 'major': acct_idx, 'minor': addr_idx },
label = addr_data['label'],
)
def make_format_str():
return ' acct {:O>%s}, addr {:O>%s} [{}]' % (
len(str(len(data) - 1)),
len(str(max(len(acct_data['addresses']) for acct_data in data) - 1))
)
def check_restored_data():
restored_data = h.get_wallet_data(print=False).addrs_data
if restored_data != data:
rmsg('Restored data does not match original dump! Dumping bad data.')
MoneroWalletDumpFile.New(
parent = self,
wallet_fn = fn,
data = {'wallet_metadata': restored_data}
).write(add_suf='.bad')
die(3, 'Fatal error')
await super().process_wallet(d, fn, last)
h = MoneroWalletRPC(self, d)
h.open_wallet('newly created')
msg('')
data = get_dump_data()
fs = make_format_str()
gmsg('\nRestoring accounts, subaddresses and labels from dump file:\n')
restore_accounts()
restore_subaddresses()
restore_labels()
check_restored_data()
return True

45
mmgen/xmrwallet/ops/sign.py Executable file
View file

@ -0,0 +1,45 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.sign: Monero wallet ops for the MMGen Suite
"""
from ..file.tx import MoneroMMGenTX
from ..rpc import MoneroWalletRPC
from .wallet import OpWallet
class OpSign(OpWallet):
action = 'signing transaction with'
start_daemon = False
async def main(self, fn, restart_daemon=True):
if restart_daemon:
await self.restart_wallet_daemon()
tx = MoneroMMGenTX.Unsigned(self.cfg, fn)
h = MoneroWalletRPC(self, self.addr_data[0])
self.head_msg(tx.src_wallet_idx, h.fn)
if restart_daemon:
h.open_wallet(refresh=False)
res = self.c.call(
'sign_transfer',
unsigned_txset = tx.data.unsigned_txset,
export_raw = True,
get_tx_keys = True
)
new_tx = MoneroMMGenTX.NewColdSigned(
cfg = self.cfg,
txid = res['tx_hash_list'][0],
unsigned_txset = None,
signed_txset = res['signed_txset'],
_in_tx = tx,
)
return new_tx

64
mmgen/xmrwallet/ops/spec.py Executable file
View file

@ -0,0 +1,64 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.spec: Monero wallet ops for the MMGen Suite
"""
import re
from ...util import die
from ...addr import CoinAddr
from .. import uarg_info
class OpMixinSpec:
def create_addr_data(self):
m = re.fullmatch(uarg_info[self.spec_id].pat, self.uargs.spec, re.ASCII)
if not m:
fs = "{!r}: invalid {!r} arg: for {} operation, it must have format {!r}"
die(1, fs.format(self.uargs.spec, self.spec_id, self.name, uarg_info[self.spec_id].annot))
def gen():
for i, k in self.spec_key:
if m[i] is None:
setattr(self, k, None)
else:
idx = int(m[i])
try:
res = self.kal.entry(idx)
except:
die(1, f'Supplied key-address file does not contain address {self.kal.al_id.sid}:{idx}')
else:
setattr(self, k, res)
yield res
self.addr_data = list(gen())
self.account = None if m[2] is None else int(m[2])
def strip_quotes(s):
if s and s[0] in ("'", '"'):
if s[-1] != s[0] or len(s) < 2:
die(1, f'{s!r}: unbalanced quotes in label string!')
return s[1:-1]
else:
return s # None or empty string
if self.name in ('sweep', 'sweep_all'):
self.dest_acct = None if m[4] is None else int(m[4])
elif self.name == 'transfer':
self.dest_addr = CoinAddr(self.proto, m[3])
self.amount = self.proto.coin_amt(m[4])
elif self.name == 'new':
self.label = strip_quotes(m[3])
elif self.name == 'label':
self.address_idx = int(m[3])
self.label = strip_quotes(m[4])

138
mmgen/xmrwallet/ops/submit.py Executable file
View file

@ -0,0 +1,138 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.submit: Monero wallet ops for the MMGen Suite
"""
import time
from pathlib import Path
from ...util import msg, msg_r, gmsg, die
from ...ui import keypress_confirm
from ...proto.xmr.daemon import MoneroWalletDaemon
from ...proto.xmr.rpc import MoneroWalletRPCClient
from ..file.tx import MoneroMMGenTX
from ..rpc import MoneroWalletRPC
from . import OpBase
from .wallet import OpWallet
class OpSubmit(OpWallet):
action = 'submitting transaction with'
opts = ('tx_relay_daemon',)
def post_mount_action(self):
self.tx # trigger an exit if no suitable transaction present
@property
def tx(self):
if not hasattr(self, '_tx'):
self._tx = self.get_tx()
return self._tx
def get_tx(self):
if self.uargs.infile:
fn = Path(self.uargs.infile)
else:
from ...autosign import Signable
fn = Signable.xmr_transaction(self.asi).get_unsubmitted()
return MoneroMMGenTX.ColdSigned(cfg=self.cfg, fn=fn)
def get_relay_rpc(self):
relay_opt = self.parse_tx_relay_opt()
wd = MoneroWalletDaemon(
cfg = self.cfg,
proto = self.proto,
wallet_dir = self.cfg.wallet_dir or '.',
test_suite = self.cfg.test_suite,
monerod_addr = relay_opt[1],
)
u = wd.usr_daemon_args = []
if self.cfg.test_suite:
u.append('--daemon-ssl-allow-any-cert')
if relay_opt[2]:
u.append(f'--proxy={relay_opt[2]}')
return MoneroWalletRPCClient(
cfg = self.cfg,
daemon = wd,
test_connection = False,
)
async def main(self):
tx = self.tx
h = MoneroWalletRPC(self, self.kal.entry(tx.src_wallet_idx))
self.head_msg(tx.src_wallet_idx, h.fn)
h.open_wallet()
if self.cfg.tx_relay_daemon:
await self.c.stop_daemon()
self.c = self.get_relay_rpc()
self.c.start_daemon()
h = MoneroWalletRPC(self, self.kal.entry(tx.src_wallet_idx))
h.open_wallet('TX-relay-configured watch-only', refresh=False)
msg('\n' + tx.get_info(indent=' '))
if self.cfg.tx_relay_daemon:
self.display_tx_relay_info(indent=' ')
if keypress_confirm(self.cfg, f'{self.name.capitalize()} transaction?'):
if self.cfg.tx_relay_daemon:
msg_r('Relaying transaction to remote daemon, please be patient...')
t_start = time.time()
res = self.c.call(
'submit_transfer',
tx_data_hex = tx.data.signed_txset)
assert res['tx_hash_list'][0] == tx.data.txid, 'TxID mismatch in ‘submit_transfer’ result!'
if self.cfg.tx_relay_daemon:
from ...util2 import format_elapsed_hr
msg(f'success\nRelay time: {format_elapsed_hr(t_start, rel_now=False, show_secs=True)}')
else:
die(1, 'Exiting at user request')
new_tx = MoneroMMGenTX.NewSubmitted(
cfg = self.cfg,
_in_tx = tx,
)
gmsg('\nOK')
new_tx.write(
ask_write = not self.cfg.autosign,
ask_overwrite = not self.cfg.autosign)
return new_tx
class OpResubmit(OpSubmit):
action = 'resubmitting transaction with'
def check_uopts(self):
if not self.cfg.autosign:
die(1, '--autosign is required for this operation')
def get_tx(self):
from ...autosign import Signable
fns = Signable.xmr_transaction(self.asi).get_submitted()
return sorted(
(MoneroMMGenTX.Submitted(self.cfg, Path(fn)) for fn in fns),
key = lambda x: getattr(x.data, 'submit_time', None) or x.data.create_time
)[-1]
class OpAbort(OpBase):
opts = ('watch_only', 'autosign')
def __init__(self, cfg, uarg_tuple):
super().__init__(cfg, uarg_tuple)
self.mount_removable_device()
from ...autosign import Signable
Signable.xmr_transaction(self.asi).shred_abortable() # prompts user, then raises exception or exits

213
mmgen/xmrwallet/ops/sweep.py Executable file
View file

@ -0,0 +1,213 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.sweep: Monero wallet ops for the MMGen Suite
"""
from ...util import msg, msg_r, gmsg, die, fmt_dict, make_timestr
from ...proto.xmr.rpc import MoneroWalletRPCClient
from ...proto.xmr.daemon import MoneroWalletDaemon
from ...ui import keypress_confirm
from .. import tx_priorities
from ..rpc import MoneroWalletRPC
from .spec import OpMixinSpec
from .wallet import OpWallet
class OpSweep(OpMixinSpec, OpWallet):
spec_id = 'sweep_spec'
spec_key = ((1, 'source'), (3, 'dest'))
opts = (
'no_relay',
'tx_relay_daemon',
'watch_only',
'priority',
'skip_empty_accounts',
'skip_empty_addresses')
sweep_type = 'single-address'
def check_uopts(self):
if self.cfg.tx_relay_daemon and (self.cfg.no_relay or self.cfg.autosign):
die(1, '--tx-relay-daemon makes no sense in this context!')
if self.cfg.priority and self.cfg.priority not in list(tx_priorities):
die(1, '{}: invalid parameter for --priority (valid params: {})'.format(
self.cfg.priority,
fmt_dict(tx_priorities, fmt='square_compact')))
def init_tx_relay_daemon(self):
m = self.parse_tx_relay_opt()
wd2 = MoneroWalletDaemon(
cfg = self.cfg,
proto = self.proto,
wallet_dir = self.cfg.wallet_dir or '.',
test_suite = self.cfg.test_suite,
monerod_addr = m[1],
proxy = m[2])
if self.cfg.test_suite:
wd2.usr_daemon_args = ['--daemon-ssl-allow-any-cert']
wd2.start()
self.c = MoneroWalletRPCClient(
cfg = self.cfg,
daemon = wd2)
def create_tx(self, h, wallet_data):
def create_new_addr_maybe(h, account, label):
if keypress_confirm(self.cfg, f'\nCreate new address for account #{account}?'):
return h.create_new_addr(account, label)
elif not keypress_confirm(self.cfg, f'Sweep to last existing address of account #{account}?'):
die(1, 'Exiting at user request')
return None
dest_addr_chk = None
if self.dest is None: # sweep to same account
dest_acct = self.account
dest_addr_chk = create_new_addr_maybe(
h, self.account, f'{self.name} from this account [{make_timestr()}]')
if dest_addr_chk:
wallet_data = h.get_wallet_data(print=False)
dest_addr, dest_addr_idx = h.get_last_addr(self.account, wallet_data, display=not dest_addr_chk)
if dest_addr_chk:
h.print_acct_addrs(wallet_data, self.account)
elif self.dest_acct is None: # sweep to wallet
h.close_wallet('source')
h2 = MoneroWalletRPC(self, self.dest)
h2.open_wallet('destination')
wallet_data2 = h2.get_wallet_data()
wf = self.get_wallet_fn(self.dest)
if keypress_confirm(self.cfg, f'\nCreate new account for wallet {wf.name!r}?'):
dest_acct, dest_addr = h2.create_acct(
label = f'{self.name} from {self.source.idx}:{self.account} [{make_timestr()}]')
dest_addr_idx = 0
h2.get_wallet_data()
elif keypress_confirm(self.cfg, f'Sweep to last existing account of wallet {wf.name!r}?'):
dest_acct, dest_addr_chk = h2.get_last_acct(wallet_data2.accts_data)
dest_addr, dest_addr_idx = h2.get_last_addr(dest_acct, wallet_data2, display=False)
else:
die(1, 'Exiting at user request')
h2.close_wallet('destination')
h.open_wallet('source', refresh=False)
else: # sweep to specific account of wallet
def get_dest_addr_params(h, wallet_data, dest_acct, label):
self.check_account_exists(wallet_data.accts_data, dest_acct)
h.print_acct_addrs(wallet_data, dest_acct)
dest_addr_chk = create_new_addr_maybe(h, dest_acct, label)
if dest_addr_chk:
wallet_data = h.get_wallet_data(print=False)
dest_addr, dest_addr_idx = h.get_last_addr(dest_acct, wallet_data, display=not dest_addr_chk)
if dest_addr_chk:
h.print_acct_addrs(wallet_data, dest_acct)
return dest_addr, dest_addr_idx, dest_addr_chk
dest_acct = self.dest_acct
if self.dest == self.source:
dest_addr, dest_addr_idx, dest_addr_chk = get_dest_addr_params(
h, wallet_data, dest_acct,
f'{self.name} from account #{self.account} [{make_timestr()}]')
else:
h.close_wallet('source')
h2 = MoneroWalletRPC(self, self.dest)
h2.open_wallet('destination')
dest_addr, dest_addr_idx, dest_addr_chk = get_dest_addr_params(
h2, h2.get_wallet_data(), dest_acct,
f'{self.name} from {self.source.idx}:{self.account} [{make_timestr()}]')
h2.close_wallet('destination')
h.open_wallet('source', refresh=False)
assert dest_addr_chk in (None, dest_addr), (
f'dest_addr: ({dest_addr}) != dest_addr_chk: ({dest_addr_chk})')
msg(f'\n Creating {self.name} transaction...')
return (h, h.make_sweep_tx(self.account, dest_acct, dest_addr_idx, dest_addr, wallet_data.addrs_data))
@property
def add_desc(self):
return (
r' to new address' if self.dest is None else
f' to new account in wallet {self.dest.idx}' if self.dest_acct is None else
f' to account #{self.dest_acct} of wallet {self.dest.idx}') + f' ({self.sweep_type} sweep)'
def check_account_exists(self, accts_data, idx):
max_acct = len(accts_data['subaddress_accounts']) - 1
if self.account > max_acct:
die(2, f'{self.account}: requested account index out of bounds (>{max_acct})')
async def main(self):
gmsg(
f'\n{self.stem.capitalize()}ing account #{self.account}'
f' of wallet {self.source.idx}{self.add_desc}')
h = MoneroWalletRPC(self, self.source)
h.open_wallet('source')
wallet_data = h.get_wallet_data(skip_empty_ok=True)
self.check_account_exists(wallet_data.accts_data, self.account)
h.print_acct_addrs(wallet_data, self.account)
h, new_tx = self.create_tx(h, wallet_data)
msg('\n' + new_tx.get_info(indent=' '))
if self.cfg.tx_relay_daemon:
self.display_tx_relay_info(indent=' ')
msg('Saving TX data to file')
new_tx.write(delete_metadata=True)
if self.cfg.no_relay or self.cfg.autosign:
return True
if keypress_confirm(self.cfg, f'Relay {self.name} transaction?'):
if self.cfg.tx_relay_daemon:
await h.stop_wallet('source')
msg('')
self.init_tx_relay_daemon()
h = MoneroWalletRPC(self, self.source)
h.open_wallet('TX-relay-configured source', refresh=False)
msg_r(f'\n Relaying {self.name} transaction...')
h.relay_tx(new_tx.data.metadata)
gmsg('\nAll done')
return True
else:
die(1, '\nExiting at user request')
class OpSweepAll(OpSweep):
stem = 'sweep'
sweep_type = 'all-address'
class OpTransfer(OpSweep):
stem = 'transferr'
spec_id = 'transfer_spec'
spec_key = ((1, 'source'),)
@property
def add_desc(self):
return f': {self.amount} XMR to {self.dest_addr}'
def create_tx(self, h, wallet_data):
msg(f'\n Creating {self.name} transaction...')
return (h, h.make_transfer_tx(self.account, self.dest_addr, self.amount))

132
mmgen/xmrwallet/ops/sync.py Executable file
View file

@ -0,0 +1,132 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.sync: Monero wallet ops for the MMGen Suite
"""
import time
from ...util import msg, msg_r, ymsg, die
from ..rpc import MoneroWalletRPC
from .wallet import OpWallet
class OpSync(OpWallet):
opts = ('rescan_blockchain', 'skip_empty_accounts', 'skip_empty_addresses')
def check_uopts(self):
if self.cfg.rescan_blockchain and self.cfg.watch_only:
die(1, f'Operation ‘{self.name}’ does not support --rescan-blockchain with watch-only wallets')
def __init__(self, cfg, uarg_tuple):
super().__init__(cfg, uarg_tuple)
if not self.wallet_offline:
self.dc = self.get_coin_daemon_rpc()
self.wallets_data = {}
async def process_wallet(self, d, fn, last):
chain_height = self.dc.call_raw('get_height')['height']
msg(f' Chain height: {chain_height}')
t_start = time.time()
msg_r(' Opening wallet...')
self.c.call(
'open_wallet',
filename = fn.name,
password = d.wallet_passwd)
msg('done')
msg_r(' Getting wallet height (be patient, this could take a long time)...')
wallet_height = self.c.call('get_height')['height']
msg_r('\r' + ' '*68 + '\r')
msg(f' Wallet height: {wallet_height} ')
behind = chain_height - wallet_height
if behind > 1000:
msg_r(f' Wallet is {behind} blocks behind chain tip. Please be patient. Syncing...')
ret = self.c.call('refresh')
if behind > 1000:
msg('done')
if ret['received_money']:
msg(' Wallet has received funds')
for i in range(2):
wallet_height = self.c.call('get_height')['height']
if wallet_height >= chain_height:
break
ymsg(f' Wallet failed to sync (wallet height [{wallet_height}] < chain height [{chain_height}])')
if i or not self.cfg.rescan_blockchain:
break
msg_r(' Rescanning blockchain, please be patient...')
self.c.call('rescan_blockchain')
self.c.call('refresh')
msg('done')
t_elapsed = int(time.time() - t_start)
wd = MoneroWalletRPC(self, d).get_wallet_data(print=False, skip_empty_ok=True)
from . import hl_amt
msg(' Balance: {} Unlocked balance: {}'.format(
hl_amt(wd.accts_data['total_balance']),
hl_amt(wd.accts_data['total_unlocked_balance']),
))
self.wallets_data[fn.name] = wd
msg(f' Wallet height: {wallet_height}')
msg(f' Sync time: {t_elapsed//60:02}:{t_elapsed%60:02}')
if not last:
self.c.call('close_wallet')
return wallet_height >= chain_height
def gen_body(self, wallets_data):
for wnum, (_, wallet_data) in enumerate(wallets_data.items()):
yield from MoneroWalletRPC(self, self.addr_data[wnum]).gen_accts_info(
wallet_data.accts_data,
wallet_data.addrs_data,
indent = '',
skip_empty_ok = True)
yield ''
def post_main_success(self):
def gen_info(data):
yield from self.gen_body(data)
col1_w = max(map(len, data)) + 1
fs = '{:%s} {} {}' % col1_w
tbals = [0, 0]
yield fs.format('Wallet', 'Balance ', 'Unlocked Balance')
from . import fmt_amt
for k in data:
b = data[k].accts_data['total_balance']
ub = data[k].accts_data['total_unlocked_balance']
yield fs.format(k + ':', fmt_amt(b), fmt_amt(ub))
tbals[0] += b
tbals[1] += ub
yield fs.format('-'*col1_w, '-'*18, '-'*18)
yield fs.format('TOTAL:', fmt_amt(tbals[0]), fmt_amt(tbals[1]))
self.cfg._util.stdout_or_pager('\n'.join(gen_info(self.wallets_data)) + '\n')

94
mmgen/xmrwallet/ops/txview.py Executable file
View file

@ -0,0 +1,94 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.txview: Monero wallet ops for the MMGen Suite
"""
from pathlib import Path
from ...util import die
from ..file.tx import MoneroMMGenTX
from . import OpBase
class OpTxview(OpBase):
view_method = 'get_info'
opts = ('watch_only', 'autosign')
hdr = ''
col_hdr = ''
footer = ''
do_umount = False
async def main(self, cols=None):
self.mount_removable_device()
if self.cfg.autosign:
files = [f for f in self.asi.xmr_tx_dir.iterdir()
if f.name.endswith('.'+MoneroMMGenTX.Submitted.ext)]
else:
files = self.uargs.infile
txs = sorted(
(MoneroMMGenTX.View(self.cfg, Path(fn)) for fn in files),
# old TX files have no ‘submit_time’ field:
key = lambda x: getattr(x.data, 'submit_time', None) or x.data.create_time
)
if self.cfg.autosign:
self.asi.do_umount()
addr_w = None if self.cfg.full_address or cols is None else cols - self.fixed_cols_w
self.cfg._util.stdout_or_pager(
(self.hdr if len(files) > 1 else '')
+ self.col_hdr
+ '\n'.join(getattr(tx, self.view_method)(addr_w=addr_w) for tx in txs)
+ self.footer
)
class OpTxlist(OpTxview):
view_method = 'get_info_oneline'
add_nl = True
footer = '\n'
fixed_cols_w = MoneroMMGenTX.Base.oneline_fixed_cols_w
min_addr_w = 10
@property
def hdr(self):
return ('SUBMITTED ' if self.cfg.autosign else '') + 'MONERO TRANSACTIONS\n'
@property
def col_hdr(self):
return MoneroMMGenTX.View.oneline_fs.format(
a = 'Network',
b = 'Seed ID',
c = 'Submitted' if self.cfg.autosign else 'Date',
d = 'TxID',
e = 'Type',
f = 'Src',
g = 'Dest',
h = ' Amount',
j = 'Dest Address',
x = '',
) + '\n'
async def main(self):
if self.cfg.pager:
cols = None
else:
from ...term import get_terminal_size
cols = self.cfg.columns or get_terminal_size().width
if cols < self.fixed_cols_w + self.min_addr_w:
die(1, f'A terminal at least {self.fixed_cols_w + self.min_addr_w} columns wide is required '
'to display this output (or use --columns or --pager)')
await super().main(cols=cols)

69
mmgen/xmrwallet/ops/view.py Executable file
View file

@ -0,0 +1,69 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.view: Monero wallet ops for the MMGen Suite
"""
from ...color import green
from ...util import msg, ymsg
from ..include import gen_acct_addr_info
from ..rpc import MoneroWalletRPC
from .sync import OpSync
class OpList(OpSync):
stem = 'sync'
def gen_body(self, wallets_data):
for (wallet_fn, wallet_data) in wallets_data.items():
ad = wallet_data.accts_data['subaddress_accounts']
yield green(f'Wallet {wallet_fn}:')
for account in range(len(wallet_data.addrs_data)):
bal = ad[account]['unlocked_balance']
if self.cfg.skip_empty_accounts and not bal:
continue
yield ''
yield ' Account #{a} [{b} {c}]'.format(
a = account,
b = self.proto.coin_amt(bal, from_unit='atomic').hl(),
c = self.proto.coin_amt.hlc('XMR'))
yield from gen_acct_addr_info(self, wallet_data, account, indent=' ')
yield ''
class OpView(OpSync):
stem = 'open'
opts = ()
wallet_offline = True
def pre_init_action(self):
ymsg('Running in offline mode. Balances may be out of date!')
async def process_wallet(self, d, fn, last):
self.c.call(
'open_wallet',
filename = fn.name,
password = d.wallet_passwd)
wallet_height = self.c.call('get_height')['height']
msg(f' Wallet height: {wallet_height}')
self.wallets_data[fn.name] = MoneroWalletRPC(self, d).get_wallet_data(print=False, skip_empty_ok=True)
if not last:
self.c.call('close_wallet')
return True
class OpListview(OpView, OpList):
pass

227
mmgen/xmrwallet/ops/wallet.py Executable file
View file

@ -0,0 +1,227 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.ops.wallet: xmrwallet wallet op for the MMGen Suite
"""
import asyncio, re, atexit
from pathlib import Path
from ...color import orange
from ...util import msg, gmsg, ymsg, die, suf
from ...addrlist import KeyAddrList, ViewKeyAddrList, AddrIdxList
from ...proto.xmr.rpc import MoneroRPCClient, MoneroWalletRPCClient
from ...proto.xmr.daemon import MoneroWalletDaemon
from . import OpBase
class OpWallet(OpBase):
opts = (
'use_internal_keccak_module',
'hash_preset',
'daemon',
'no_start_wallet_daemon',
'no_stop_wallet_daemon',
'autosign',
'watch_only',
)
wallet_offline = False
wallet_exists = True
start_daemon = True
skip_wallet_check = False # for debugging
def __init__(self, cfg, uarg_tuple):
def wallet_exists(fn):
try:
fn.stat()
except:
return False
else:
return True
def check_wallets():
for d in self.addr_data:
fn = self.get_wallet_fn(d)
exists = wallet_exists(fn)
if exists and not self.wallet_exists:
die(1, f'Wallet ‘{fn}’ already exists!')
elif not exists and self.wallet_exists:
die(1, f'Wallet ‘{fn}’ not found!')
super().__init__(cfg, uarg_tuple)
if self.cfg.offline or (self.name == 'create' and self.cfg.restore_height is None):
self.wallet_offline = True
self.wd = MoneroWalletDaemon(
cfg = self.cfg,
proto = self.proto,
wallet_dir = self.cfg.wallet_dir or '.',
test_suite = self.cfg.test_suite,
monerod_addr = self.cfg.daemon or None,
trust_monerod = self.trust_monerod,
test_monerod = not self.wallet_offline,
)
if self.wallet_offline:
self.wd.usr_daemon_args = ['--offline']
self.c = MoneroWalletRPCClient(
cfg = self.cfg,
daemon = self.wd,
test_connection = False,
)
if self.cfg.offline:
from ...wallet import Wallet
self.seed_src = Wallet(
cfg = cfg,
fn = self.uargs.infile,
ignore_in_fmt = True)
gmsg('\nCreating ephemeral key-address list for offline wallets')
self.kal = KeyAddrList(
cfg = cfg,
proto = self.proto,
seed = self.seed_src.seed,
addr_idxs = self.uargs.wallets,
skip_chksum_msg = True)
else:
self.mount_removable_device()
# with watch_only, make a second attempt to open the file as KeyAddrList:
for first_try in (True, False):
try:
self.kal = (ViewKeyAddrList if (self.cfg.watch_only and first_try) else KeyAddrList)(
cfg = cfg,
proto = self.proto,
addrfile = str(self.autosign_viewkey_addr_file) if self.cfg.autosign else self.uargs.infile,
key_address_validity_check = True,
skip_chksum_msg = True)
break
except:
if first_try:
msg(f'Attempting to open ‘{self.uargs.infile}’ as key-address list')
continue
raise
self.create_addr_data()
if not self.skip_wallet_check:
check_wallets()
if self.start_daemon and not self.cfg.no_start_wallet_daemon:
asyncio.run(self.restart_wallet_daemon())
@classmethod
def get_idx_from_fn(cls, fn):
return int(re.match(r'[0-9a-fA-F]{8}-(\d+)-Monero(WatchOnly)?Wallet.*', fn.name)[1])
def pre_init_action(self):
if self.cfg.skip_empty_accounts:
msg(orange('Skipping display of empty accounts where applicable'))
if self.cfg.skip_empty_addresses:
msg(orange('Skipping display of empty used addresses where applicable'))
def get_coin_daemon_rpc(self):
host, port = self.cfg.daemon.split(':') if self.cfg.daemon else ('localhost', self.wd.monerod_port)
from ...daemon import CoinDaemon
return MoneroRPCClient(
cfg = self.cfg,
proto = self.proto,
daemon = CoinDaemon(self.cfg, 'xmr'),
host = host,
port = int(port),
user = None,
passwd = None)
@property
def autosign_viewkey_addr_file(self):
from ...addrfile import ViewKeyAddrFile
flist = [f for f in self.asi.xmr_dir.iterdir() if f.name.endswith(ViewKeyAddrFile.ext)]
if len(flist) != 1:
die(2,
'{a} viewkey-address files found in autosign mountpoint directory ‘{b}’!\n'.format(
a = 'Multiple' if flist else 'No',
b = self.asi.xmr_dir
)
+ 'Have you run ‘mmgen-autosign setup’ on your offline machine with the --xmrwallets option?'
)
else:
return flist[0]
def create_addr_data(self):
if self.uargs.wallets:
idxs = AddrIdxList(self.uargs.wallets)
self.addr_data = [d for d in self.kal.data if d.idx in idxs]
if len(self.addr_data) != len(idxs):
die(1, f'List {self.uargs.wallets!r} contains addresses not present in supplied key-address file')
else:
self.addr_data = self.kal.data
async def restart_wallet_daemon(self):
atexit.register(lambda: asyncio.run(self.stop_wallet_daemon()))
await self.c.restart_daemon()
async def stop_wallet_daemon(self):
if not self.cfg.no_stop_wallet_daemon:
try:
await self.c.stop_daemon()
except KeyboardInterrupt:
ymsg('\nForce killing wallet daemon')
self.c.daemon.force_kill = True
self.c.daemon.stop()
def get_wallet_fn(self, data, watch_only=None):
if watch_only is None:
watch_only = self.cfg.watch_only
return Path(
(self.cfg.wallet_dir or '.'),
'{a}-{b}-Monero{c}Wallet{d}'.format(
a = self.kal.al_id.sid,
b = data.idx,
c = 'WatchOnly' if watch_only else '',
d = f'.{self.cfg.network}' if self.cfg.network != 'mainnet' else '')
)
@property
def add_wallet_desc(self):
return 'offline signing ' if self.cfg.offline else 'watch-only ' if self.cfg.watch_only else ''
async def main(self):
gmsg('\n{a}ing {b} {c}wallet{d}'.format(
a = self.stem.capitalize(),
b = len(self.addr_data),
c = self.add_wallet_desc,
d = suf(self.addr_data)))
processed = 0
for n, d in enumerate(self.addr_data): # [d.sec,d.addr,d.wallet_passwd,d.viewkey]
fn = self.get_wallet_fn(d)
gmsg('\n{a}ing wallet {b}/{c} ({d})'.format(
a = self.stem.capitalize(),
b = n + 1,
c = len(self.addr_data),
d = fn.name,
))
processed += await self.process_wallet(d, fn, last=n==len(self.addr_data)-1)
gmsg(f'\n{processed} wallet{suf(processed)} {self.stem}ed\n')
return processed
def head_msg(self, wallet_idx, fn):
gmsg('\n{a} {b}wallet #{c} ({d})'.format(
a = self.action.capitalize(),
b = self.add_wallet_desc,
c = wallet_idx,
d = fn.name
))

211
mmgen/xmrwallet/rpc.py Executable file
View file

@ -0,0 +1,211 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
xmrwallet.rpc: Monero wallet RPC methods for the MMGen Suite
"""
from collections import namedtuple
from ..obj import CoinTxID
from ..color import red, cyan, pink
from ..util import msg, msg_r, gmsg, gmsg_r, die, fmt
from ..addr import CoinAddr
from .include import gen_acct_addr_info, XMRWalletAddrSpec
from .file.tx import MoneroMMGenTX
class MoneroWalletRPC:
def __init__(self, parent, d):
self.parent = parent
self.cfg = parent.cfg
self.proto = parent.proto
self.c = parent.c
self.d = d
self.fn = parent.get_wallet_fn(d)
self.new_tx_cls = (
MoneroMMGenTX.NewUnsigned if self.cfg.watch_only else
MoneroMMGenTX.NewSigned)
def open_wallet(self, desc=None, refresh=True):
add_desc = desc + ' ' if desc else self.parent.add_wallet_desc
gmsg_r(f'\n Opening {add_desc}wallet...')
self.c.call( # returns {}
'open_wallet',
filename = self.fn.name,
password = self.d.wallet_passwd)
gmsg('done')
if refresh:
gmsg_r(f' Refreshing {add_desc}wallet...')
ret = self.c.call('refresh')
gmsg('done')
if ret['received_money']:
msg(' Wallet has received funds')
def close_wallet(self, desc):
gmsg_r(f'\n Closing {desc} wallet...')
self.c.call('close_wallet')
gmsg_r('done')
async def stop_wallet(self, desc):
msg(f'Stopping {self.c.daemon.desc} on port {self.c.daemon.bind_port}')
gmsg_r(f'\n Stopping {desc} wallet...')
await self.c.stop_daemon(quiet=True) # closes wallet
gmsg_r('done')
def gen_accts_info(self, accts_data, addrs_data, indent=' ', skip_empty_ok=False):
from .ops import addr_width
fs = indent + ' {I:<3} {A} {N} {B} {L}'
yield indent + f'Accounts of wallet {self.fn.name}:'
yield fs.format(
I = '',
A = 'Base Address'.ljust(addr_width),
N = 'nAddrs',
B = ' Unlocked Balance',
L = 'Label')
for i, e in enumerate(accts_data['subaddress_accounts']):
if skip_empty_ok and self.cfg.skip_empty_accounts and not e['unlocked_balance']:
continue
ca = CoinAddr(self.proto, e['base_address'])
from .ops import fmt_amt
yield fs.format(
I = str(e['account_index']),
A = ca.hl(0) if self.cfg.full_address else ca.fmt(0, color=True, width=addr_width),
N = red(str(len(addrs_data[i]['addresses'])).ljust(6)),
B = fmt_amt(e['unlocked_balance']),
L = pink(e['label']))
def get_wallet_data(self, print=True, skip_empty_ok=False):
accts_data = self.c.call('get_accounts')
addrs_data = [
self.c.call('get_address', account_index=i)
for i in range(len(accts_data['subaddress_accounts']))
]
if print:
msg('\n' + '\n'.join(self.gen_accts_info(accts_data, addrs_data, skip_empty_ok=skip_empty_ok)))
bals_data = self.c.call('get_balance', all_accounts=True)
return namedtuple('wallet_data', ['accts_data', 'addrs_data', 'bals_data'])(
accts_data, addrs_data, bals_data)
def create_acct(self, label=None):
msg('\n Creating new account...')
ret = self.c.call('create_account', label=label)
msg(' Index: {}'.format(pink(str(ret['account_index']))))
msg(' Address: {}'.format(cyan(ret['address'])))
return (ret['account_index'], ret['address'])
def get_last_acct(self, accts_data):
msg('\n Getting last account...')
ret = accts_data['subaddress_accounts'][-1]
msg(' Index: {}'.format(pink(str(ret['account_index']))))
msg(' Address: {}'.format(cyan(ret['base_address'])))
return (ret['account_index'], ret['base_address'])
def print_acct_addrs(self, wallet_data, account):
msg('\n Addresses of account #{} ({}):'.format(
account,
wallet_data.accts_data['subaddress_accounts'][account]['label']))
msg('\n'.join(gen_acct_addr_info(self, wallet_data, account, indent=' ')))
return wallet_data.addrs_data[account]['addresses']
def create_new_addr(self, account, label):
msg_r('\n Creating new address: ')
ret = self.c.call('create_address', account_index=account, label=label or '')
msg(cyan(ret['address']))
return ret['address']
def get_last_addr(self, account, wallet_data, display=True):
if display:
msg('\n Getting last address:')
acct_addrs = wallet_data.addrs_data[account]['addresses']
addr = acct_addrs[-1]['address']
if display:
msg(' ' + cyan(addr))
return (addr, len(acct_addrs) - 1)
def set_label(self, account, address_idx, label):
return self.c.call(
'label_address',
index = { 'major': account, 'minor': address_idx },
label = label
)
def make_transfer_tx(self, account, addr, amt):
res = self.c.call(
'transfer',
account_index = account,
destinations = [{
'amount': amt.to_unit('atomic'),
'address': addr
}],
priority = self.cfg.priority or None,
do_not_relay = True,
get_tx_hex = True,
get_tx_metadata = True
)
return self.new_tx_cls(
cfg = self.cfg,
op = self.parent.name,
network = self.proto.network,
seed_id = self.parent.kal.al_id.sid,
source = XMRWalletAddrSpec(self.parent.source.idx, self.parent.account, None),
dest = None,
dest_address = addr,
txid = res['tx_hash'],
amount = self.proto.coin_amt(res['amount'], from_unit='atomic'),
fee = self.proto.coin_amt(res['fee'], from_unit='atomic'),
blob = res['tx_blob'],
metadata = res['tx_metadata'],
unsigned_txset = res['unsigned_txset'] if self.cfg.watch_only else None,
)
def make_sweep_tx(self, account, dest_acct, dest_addr_idx, addr, addrs_data):
res = self.c.call(
'sweep_all',
address = addr,
account_index = account,
subaddr_indices = list(range(len(addrs_data[account]['addresses'])))
if self.parent.name == 'sweep_all' else [],
priority = self.cfg.priority or None,
do_not_relay = True,
get_tx_hex = True,
get_tx_metadata = True
)
if len(res['tx_hash_list']) > 1:
die(3, 'More than one TX required. Cannot perform this sweep')
return self.new_tx_cls(
cfg = self.cfg,
op = self.parent.name,
network = self.proto.network,
seed_id = self.parent.kal.al_id.sid,
source = XMRWalletAddrSpec(self.parent.source.idx, self.parent.account, None),
dest = XMRWalletAddrSpec(
(self.parent.dest or self.parent.source).idx,
dest_acct,
dest_addr_idx),
dest_address = addr,
txid = res['tx_hash_list'][0],
amount = self.proto.coin_amt(res['amount_list'][0], from_unit='atomic'),
fee = self.proto.coin_amt(res['fee_list'][0], from_unit='atomic'),
blob = res['tx_blob_list'][0],
metadata = res['tx_metadata_list'][0],
unsigned_txset = res['unsigned_txset'] if self.cfg.watch_only else None,
)
def relay_tx(self, tx_hex):
ret = self.c.call('relay_tx', hex=tx_hex)
try:
msg('\n Relayed {}'.format(CoinTxID(ret['tx_hash']).hl()))
except:
msg(f'\n Server returned: {ret!s}')

View file

@ -68,8 +68,15 @@ ignored-classes = [ # ignored for no-member, otherwise checked
"mmgen.autosign.Signable.base",
"mmgen.autosign.Autosign", # tx_dir, etc. created dynamically
"Sha2",
"mmgen.xmrwallet.MoneroMMGenTX.Base",
"mmgen.xmrwallet.MoneroWalletOutputsFile.Base",
"mmgen.xmrwallet.file.MoneroMMGenTX.Base",
"mmgen.xmrwallet.file.MoneroWalletOutputsFile.Base",
"mmgen.xmrwallet.ops.sweep.OpSweep",
"mmgen.xmrwallet.ops.wallet.OpWallet",
"mmgen.xmrwallet.ops.label.OpLabel",
"mmgen.xmrwallet.ops.new.OpNew",
"mmgen.xmrwallet.ops.txview.OpTxview",
"mmgen.xmrwallet.file.outputs.MoneroWalletOutputsFile.Base",
"mmgen.xmrwallet.file.tx.MoneroMMGenTX.Base",
"mmgen.proto.eth.tx.Completed",
"TxInfo",
"TwRPC",

View file

@ -91,6 +91,9 @@ packages =
mmgen.tw
mmgen.wallet
mmgen.wordlist
mmgen.xmrwallet
mmgen.xmrwallet.file
mmgen.xmrwallet.ops
scripts =
cmds/mmgen-addrgen

View file

@ -126,7 +126,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignThreaded):
self.spawn('',msg_only=True)
data = self.users['alice']
from mmgen.wallet import Wallet
from mmgen.xmrwallet import MoneroWalletOps,xmrwallet_uargs
from mmgen.xmrwallet import op
from mmgen.addrlist import KeyAddrList
silence()
kal = KeyAddrList(
@ -138,9 +138,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignThreaded):
key_address_validity_check = False )
kal.file.write(ask_overwrite=False)
fn = get_file_with_ext(data.udir,'akeys')
m = MoneroWalletOps.create(
self.cfg,
xmrwallet_uargs(fn, '1-2', None))
m = op('create', self.cfg, fn, '1-2')
async_run(m.main())
async_run(m.stop_wallet_daemon())
end_silence()

View file

@ -64,7 +64,7 @@ class unit_tests:
return True
def xmrwallet_uarg_info(self, name, ut, desc='dict xmrwallet.xmrwallet_uarg_info'): # WIP
from mmgen.xmrwallet import xmrwallet_uarg_info as uarg_info
from mmgen.xmrwallet import uarg_info
vs = namedtuple('vector_data', ['text', 'groups'])
fs = '{:16} {}'