modularize transaction classes

The monolithic tx.py module has been split into multiple modules, and a
clean separation of protocol-dependent and protocol-independent code has
been carried out.

- Protocol-independent base classes are located under `tx`.
- Protocol-dependent subclasses are under `base_proto/{name}/tx`.
- The code in `tx/__init__.py` loads the required module and returns an
  initialized instance of the requested class.
This commit is contained in:
The MMGen Project 2022-02-03 20:40:43 +00:00
commit 818488c559
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
57 changed files with 2924 additions and 2421 deletions

View file

@ -262,7 +262,7 @@ class AddrFile(MMGenObject):
Having caller supply protocol and checking address file protocol against it here
allows us to catch all mismatches in one place. This behavior differs from that of
transaction files, which determine the protocol independently, requiring the caller
to check for protocol mismatches (e.g. MMGenTX.check_correct_chain())
to check for protocol mismatches (e.g. mmgen.tx.completed.check_correct_chain())
"""
raise ValueError(
f'{p.desc} file is '

View file

View file

View file

@ -0,0 +1,325 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.bitcoin.tx.base: Bitcoin base transaction class
"""
from collections import namedtuple
import mmgen.tx.base as TxBase
from ....opts import opt
from ....obj import MMGenObject,MMGenList,HexStr
from ....util import msg,dmsg,make_chksum_6
def addr2pubhash(proto,addr):
ap = proto.parse_addr(addr)
assert ap,f'coin address {addr!r} could not be parsed'
return ap.bytes.hex()
def addr2scriptPubKey(proto,addr):
return {
'p2pkh': '76a914' + addr2pubhash(proto,addr) + '88ac',
'p2sh': 'a914' + addr2pubhash(proto,addr) + '87',
'bech32': proto.witness_vernum_hex + '14' + addr2pubhash(proto,addr)
}[addr.addr_fmt]
def scriptPubKey2addr(proto,s):
if len(s) == 50 and s[:6] == '76a914' and s[-4:] == '88ac':
return proto.pubhash2addr(bytes.fromhex(s[6:-4]),p2sh=False),'p2pkh'
elif len(s) == 46 and s[:4] == 'a914' and s[-2:] == '87':
return proto.pubhash2addr(bytes.fromhex(s[4:-2]),p2sh=True),'p2sh'
elif len(s) == 44 and s[:4] == proto.witness_vernum_hex + '14':
return proto.pubhash2bech32addr(bytes.fromhex(s[4:])),'bech32'
else:
raise NotImplementedError(f'Unknown scriptPubKey ({s})')
def DeserializeTX(proto,txhex):
"""
Parse a serialized Bitcoin transaction
For checking purposes, additionally reconstructs the serialized TX without signature
"""
def bytes2int(bytes_le):
if bytes_le[-1] & 0x80: # sign bit is set
die(3,"{}: Negative values not permitted in transaction!".format(bytes_le[::-1].hex()))
return int(bytes_le[::-1].hex(),16)
def bytes2coin_amt(bytes_le):
return proto.coin_amt(bytes2int(bytes_le) * proto.coin_amt.satoshi)
def bshift(n,skip=False,sub_null=False):
ret = tx[var.idx:var.idx+n]
var.idx += n
if sub_null:
var.raw_tx += b'\x00'
elif not skip:
var.raw_tx += ret
return ret
# https://bitcoin.org/en/developer-reference#compactsize-unsigned-integers
# For example, the number 515 is encoded as 0xfd0302.
def readVInt(skip=False):
s = tx[var.idx]
var.idx += 1
if not skip:
var.raw_tx.append(s)
vbytes_len = 1 if s < 0xfd else 2 if s == 0xfd else 4 if s == 0xfe else 8
if vbytes_len == 1:
return s
else:
vbytes = tx[var.idx:var.idx+vbytes_len]
var.idx += vbytes_len
if not skip:
var.raw_tx += vbytes
return int(vbytes[::-1].hex(),16)
def make_txid(tx_bytes):
from hashlib import sha256
return sha256(sha256(tx_bytes).digest()).digest()[::-1].hex()
class vardata:
idx = 0
raw_tx = bytearray()
var = vardata()
tx = bytes.fromhex(txhex)
d = { 'version': bytes2int(bshift(4)) }
has_witness = tx[var.idx] == 0
if has_witness:
u = bshift(2,skip=True).hex()
if u != '0001':
raise IllegalWitnessFlagValue(f'{u!r}: Illegal value for flag in transaction!')
d['num_txins'] = readVInt()
d['txins'] = MMGenList([{
'txid': bshift(32)[::-1].hex(),
'vout': bytes2int(bshift(4)),
'scriptSig': bshift(readVInt(skip=True),sub_null=True).hex(),
'nSeq': bshift(4)[::-1].hex()
} for i in range(d['num_txins'])])
d['num_txouts'] = readVInt()
d['txouts'] = MMGenList([{
'amount': bytes2coin_amt(bshift(8)),
'scriptPubKey': bshift(readVInt()).hex()
} for i in range(d['num_txouts'])])
for o in d['txouts']:
o['address'] = scriptPubKey2addr(proto,o['scriptPubKey'])[0]
if has_witness:
# https://github.com/bitcoin/bips/blob/master/bip-0141.mediawiki
# A non-witness program (defined hereinafter) txin MUST be associated with an empty
# witness field, represented by a 0x00.
d['txid'] = make_txid(tx[:4] + tx[6:var.idx] + tx[-4:])
d['witness_size'] = len(tx) - var.idx + 2 - 4 # add len(marker+flag), subtract len(locktime)
for txin in d['txins']:
if tx[var.idx] == 0:
bshift(1,skip=True)
continue
txin['witness'] = [
bshift(readVInt(skip=True),skip=True).hex() for item in range(readVInt(skip=True)) ]
else:
d['txid'] = make_txid(tx)
d['witness_size'] = 0
if len(tx) - var.idx != 4:
raise TxHexParseError('TX hex has invalid length: {} extra bytes'.format(len(tx)-var.idx-4))
d['locktime'] = bytes2int(bshift(4))
d['unsigned_hex'] = var.raw_tx.hex()
return namedtuple('deserialized_tx',list(d.keys()))(**d)
class Base(TxBase.Base):
rel_fee_desc = 'satoshis per byte'
rel_fee_disp = 'sat/byte'
_deserialized = None
class InputList(TxBase.Base.InputList):
# Lexicographical Indexing of Transaction Inputs and Outputs
# https://github.com/bitcoin/bips/blob/master/bip-0069.mediawiki
def sort_bip69(self):
def sort_func(a):
return (
bytes.fromhex(a.txid)
+ int.to_bytes(a.vout,4,'big') )
self.sort(key=sort_func)
class OutputList(TxBase.Base.OutputList):
def sort_bip69(self):
def sort_func(a):
return (
int.to_bytes(a.amt.to_unit('satoshi'),8,'big')
+ bytes.fromhex(addr2scriptPubKey(self.parent.proto,a.addr)) )
self.sort(key=sort_func)
def has_segwit_inputs(self):
return any(i.mmtype in ('S','B') for i in self.inputs)
def has_segwit_outputs(self):
return any(o.mmtype in ('S','B') for o in self.outputs)
# https://bitcoin.stackexchange.com/questions/1195/how-to-calculate-transaction-size-before-sending
# 180: uncompressed, 148: compressed
def estimate_size_old(self):
if not self.inputs or not self.outputs:
return None
return len(self.inputs)*180 + len(self.outputs)*34 + 10
# https://bitcoincore.org/en/segwit_wallet_dev/
# vsize: 3 times of the size with original serialization, plus the size with new
# serialization, divide the result by 4 and round up to the next integer.
# TODO: results differ slightly from actual transaction size
def estimate_size(self):
if not self.inputs or not self.outputs:
return None
sig_size = 72 # sig in DER format
pubkey_size_uncompressed = 65
pubkey_size_compressed = 33
def get_inputs_size():
# txid vout [scriptSig size (vInt)] scriptSig (<sig> <pubkey>) nSeq
isize_common = 32 + 4 + 1 + 4 # txid vout [scriptSig size] nSeq = 41
input_size = {
'L': isize_common + sig_size + pubkey_size_uncompressed, # = 180
'C': isize_common + sig_size + pubkey_size_compressed, # = 148
'S': isize_common + 23, # = 64
'B': isize_common + 0 # = 41
}
ret = sum(input_size[i.mmtype] for i in self.inputs if i.mmtype)
# We have no way of knowing whether a non-MMGen P2PKH addr is compressed or uncompressed
# until we see the key, so assume compressed for fee-estimation purposes. If fee estimate
# is off by more than 5%, sign() aborts and user is instructed to use --vsize-adj option.
return ret + sum(input_size['C'] for i in self.inputs if not i.mmtype)
def get_outputs_size():
# output bytes = amt: 8, byte_count: 1+, pk_script
# pk_script bytes: p2pkh: 25, p2sh: 23, bech32: 22
return sum({'p2pkh':34,'p2sh':32,'bech32':31}[o.addr.addr_fmt] for o in self.outputs)
# https://github.com/bitcoin/bips/blob/master/bip-0141.mediawiki
# The witness is a serialization of all witness data of the transaction. Each txin is
# associated with a witness field. A witness field starts with a var_int to indicate the
# number of stack items for the txin. It is followed by stack items, with each item starts
# with a var_int to indicate the length. Witness data is NOT script.
# A non-witness program txin MUST be associated with an empty witness field, represented
# by a 0x00. If all txins are not witness program, a transaction's wtxid is equal to its txid.
def get_witness_size():
if not self.has_segwit_inputs():
return 0
wf_size = 1 + 1 + sig_size + 1 + pubkey_size_compressed # vInt vInt sig vInt pubkey = 108
return sum((1,wf_size)[i.mmtype in ('S','B')] for i in self.inputs)
isize = get_inputs_size()
osize = get_outputs_size()
wsize = get_witness_size()
# TODO: compute real varInt sizes instead of assuming 1 byte
# old serialization: [nVersion] [vInt][txins][vInt][txouts] [nLockTime]
old_size = 4 + 1 + isize + 1 + osize + 4
# marker = 0x00, flag = 0x01
# new serialization: [nVersion][marker][flag][vInt][txins][vInt][txouts][witness][nLockTime]
new_size = 4 + 1 + 1 + 1 + isize + 1 + osize + wsize + 4 \
if wsize else old_size
ret = (old_size * 3 + new_size) // 4
dmsg('\nData from estimate_size():')
dmsg(f' inputs size: {isize}, outputs size: {osize}, witness size: {wsize}')
dmsg(f' size: {new_size}, vsize: {ret}, old_size: {old_size}')
return int(ret * (opt.vsize_adj or 1))
# convert absolute CoinAmt fee to sat/byte using estimated size
def fee_abs2rel(self,abs_fee,to_unit='satoshi'):
return int(
abs_fee /
getattr( self.proto.coin_amt, to_unit ) /
self.estimate_size() )
@property
def deserialized(self):
if not self._deserialized:
self._deserialized = DeserializeTX(self.proto,self.serialized)
return self._deserialized
def update_serialized(self,data):
self.serialized = HexStr(data)
self._deserialized = None
self.check_serialized_integrity()
def check_serialized_integrity(self):
"""
Check that a malicious, compromised or malfunctioning coin daemon hasn't produced bad
serialized tx data.
Does not check witness data.
Perform this check every time a serialized tx is received from the coin daemon or read
from a transaction file.
"""
def do_error(errmsg):
from ....exception import TxHexMismatch
raise TxHexMismatch(errmsg+'\n'+hdr)
def check_equal(desc,hexio,mmio):
if mmio != hexio:
msg('\nMMGen {d}:\n{m}\nSerialized {d}:\n{h}'.format(
d = desc,
m = pp_fmt(mmio),
h = pp_fmt(hexio) ))
do_error(
f'{desc.capitalize()} in serialized transaction data from coin daemon ' +
'do not match those in MMGen transaction!' )
hdr = 'A malicious or malfunctioning coin daemon or other program may have altered your data!'
dtx = self.deserialized
if dtx.locktime != int(self.locktime or 0):
do_error(
f'Transaction hex nLockTime ({dtx.locktime}) ' +
f'does not match MMGen transaction nLockTime ({self.locktime})' )
check_equal(
'sequence numbers',
[int(i['nSeq'],16) for i in dtx.txins],
[i.sequence or self.proto.max_int for i in self.inputs] )
check_equal(
'inputs',
sorted((i['txid'],i['vout']) for i in dtx.txins),
sorted((i.txid,i.vout) for i in self.inputs) )
check_equal(
'outputs',
sorted((o['address'],self.proto.coin_amt(o['amount'])) for o in dtx.txouts),
sorted((o.addr,o.amt) for o in self.outputs) )
if str(self.txid) != make_chksum_6(bytes.fromhex(dtx.unsigned_hex)).upper():
do_error(
f'MMGen TxID ({self.txid}) does not match serialized transaction data!')

View file

@ -0,0 +1,51 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.bitcoin.tx.bump: Bitcoin transaction bump class
"""
import mmgen.tx.bump as TxBase
from .new import New
from .completed import Completed
class Bump(Completed,New,TxBase.Bump):
desc = 'fee-bumped transaction'
@property
def min_fee(self):
return self.sum_inputs() - self.sum_outputs() + self.relay_fee
def bump_fee(self,idx,fee):
self.update_output_amt(
idx,
self.sum_inputs() - self.sum_outputs(exclude=idx) - fee
)
def convert_and_check_fee(self,tx_fee,desc):
ret = super().convert_and_check_fee(tx_fee,desc)
if ret < self.min_fee:
msg('{} {c}: {} fee too small. Minimum fee: {} {c} ({} {})'.format(
ret.hl(),
desc,
self.min_fee,
self.fee_abs2rel(self.min_fee.hl()),
self.rel_fee_desc,
c = self.coin ))
return False
output_amt = self.outputs[self.bump_output_idx].amt
if ret >= output_amt:
msg('{} {c}: {} fee too large. Maximum fee: <{} {c}'.format(
ret.hl(),
desc,
output_amt.hl(),
c = self.coin ))
return False
return ret

View file

@ -0,0 +1,86 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.bitcoin.tx.completed: Bitcoin completed transaction class
"""
import mmgen.tx.completed as TxBase
from .base import Base,scriptPubKey2addr
from ....obj import HexStr
from ....util import msg
class Completed(Base,TxBase.Completed):
fn_fee_unit = 'satoshi'
# check signature and witness data
def check_sigs(self): # return True if sigs found, False otherwise; raise exception on error
txins = self.deserialized.txins
has_ss = any(ti['scriptSig'] for ti in txins)
has_witness = any('witness' in ti and ti['witness'] for ti in txins)
if not (has_ss or has_witness):
return False
fs = "Hex TX has {} scriptSig but input is of type '{}'!"
for n in range(len(txins)):
ti,mmti = txins[n],self.inputs[n]
if ti['scriptSig'] == '' or ( len(ti['scriptSig']) == 46 and # native P2WPKH or P2SH-P2WPKH
ti['scriptSig'][:6] == '16' + self.proto.witness_vernum_hex + '14' ):
assert 'witness' in ti, 'missing witness'
assert type(ti['witness']) == list and len(ti['witness']) == 2, 'malformed witness'
assert len(ti['witness'][1]) == 66, 'incorrect witness pubkey length'
assert mmti.mmtype == ('S','B')[ti['scriptSig']==''], fs.format('witness-type',mmti.mmtype)
else: # non-witness
assert mmti.mmtype not in ('S','B'), fs.format('signature in',mmti.mmtype)
assert not 'witness' in ti, 'non-witness input has witness'
# sig_size 72 (DER format), pubkey_size 'compressed':33, 'uncompressed':65
assert (200 < len(ti['scriptSig']) < 300), 'malformed scriptSig' # VERY rough check
return True
def check_pubkey_scripts(self):
for n,i in enumerate(self.inputs,1):
addr,fmt = scriptPubKey2addr(self.proto,i.scriptPubKey)
if i.addr != addr:
if fmt != i.addr.addr_fmt:
m = 'Address format of scriptPubKey ({}) does not match that of address ({}) in input #{}'
msg(m.format(fmt,i.addr.addr_fmt,n))
m = 'ERROR: Address and scriptPubKey of transaction input #{} do not match!'
die(3,(m+'\n {:23}{}'*3).format(n, 'address:',i.addr,
'scriptPubKey:',i.scriptPubKey,
'scriptPubKey->address:',addr ))
# def is_replaceable_from_rpc(self):
# dec_tx = await self.rpc.call('decoderawtransaction',self.serialized)
# return None < dec_tx['vin'][0]['sequence'] <= self.proto.max_int - 2
def is_replaceable(self):
return self.inputs[0].sequence == self.proto.max_int - 2
@property
def send_amt(self):
return self.sum_outputs(
exclude = None if len(self.outputs) == 1 else self.get_chg_output_idx()
)
def check_txfile_hex_data(self):
self.serialized = HexStr(self.serialized)
def parse_txfile_serialized_data(self):
pass
@property
def fee(self):
return self.sum_inputs() - self.sum_outputs()
@property
def change(self):
return self.sum_outputs() - self.send_amt
def get_serialized_locktime(self):
return int(bytes.fromhex(self.serialized[-8:])[::-1].hex(),16)

View file

@ -0,0 +1,133 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.bitcoin.tx.info: Bitcoin transaction info class
"""
from ....tx.info import TxInfo
from ....util import fmt,die
from ....color import red,green,pink
from ....addr import MMGenID
class TxInfo(TxInfo):
sort_orders = ('addr','raw')
txinfo_hdr_fs = 'TRANSACTION DATA\n\nID={i} ({a} {c}) UTC={t} RBF={r} Sig={s} Locktime={l}\n'
txinfo_hdr_fs_short = 'TX {i} ({a} {c}) UTC={t} RBF={r} Sig={s} Locktime={l}\n'
txinfo_ftr_fs = fmt("""
Input amount: {i} {d}
Spend amount: {s} {d}
Change: {C} {d}
Fee: {a} {c}{r}
""")
def format_rel_fee(self,terse):
tx = self.tx
return ' ({} {}, {} of spend amount)'.format(
pink(str(tx.fee_abs2rel(tx.fee))),
tx.rel_fee_disp,
pink('{:0.6f}%'.format( tx.fee / tx.send_amt * 100 ))
)
def format_abs_fee(self):
return self.tx.proto.coin_amt(self.tx.fee).hl()
def format_verbose_footer(self):
tx = self.tx
tsize = len(tx.serialized) // 2 if tx.serialized else 'unknown'
out = f'Transaction size: Vsize {tx.estimate_size()} (estimated), Total {tsize}'
if tx.name in ('Signed','OnlineSigned'):
wsize = tx.deserialized.witness_size
out += f', Base {tsize-wsize}, Witness {wsize}'
return out + '\n'
def format_body(self,blockcount,nonmm_str,max_mmwid,enl,terse,sort):
tx = self.tx
if sort not in self.sort_orders:
die(1,'{!r}: invalid transaction view sort order. Valid options: {}'.format(
sort,
','.join(self.sort_orders) ))
def format_io(desc):
io = getattr(tx,desc)
is_input = desc == 'inputs'
yield desc.capitalize() + ':\n' + enl
confs_per_day = 60*60*24 // tx.proto.avg_bdi
io_sorted = {
'addr': lambda: sorted(
io, # prepend '+' (sorts before '0') to ensure non-MMGen addrs are displayed first
key = lambda o: (o.mmid.sort_key if o.mmid else f'+{o.addr}') + f'{o.amt:040.20f}' ),
'raw': lambda: io
}[sort]
for n,e in enumerate(io_sorted()):
if is_input and blockcount:
confs = e.confs + blockcount - tx.blockcount
days = int(confs // confs_per_day)
if e.mmid:
mmid_fmt = e.mmid.fmt(
width=max_mmwid,
encl='()',
color=True,
append_chars=('',' (chg)')[bool(not is_input and e.is_chg and terse)],
append_color='green')
else:
mmid_fmt = MMGenID.fmtc(nonmm_str,width=max_mmwid,color=True)
if terse:
yield '{:3} {} {} {} {}\n'.format(
n+1,
e.addr.fmt(color=True,width=addr_w),
mmid_fmt,
e.amt.hl(),
tx.dcoin )
else:
def gen():
if is_input:
yield (n+1, 'tx,vout:', f'{e.txid.hl()},{red(str(e.vout))}')
yield ('', 'address:', f'{e.addr.hl()} {mmid_fmt}')
else:
yield (n+1, 'address:', f'{e.addr.hl()} {mmid_fmt}')
if e.label:
yield ('', 'comment:', e.label.hl())
yield ('', 'amount:', f'{e.amt.hl()} {tx.dcoin}')
if is_input and blockcount:
yield ('', 'confirmations:', f'{confs} (around {days} days)')
if not is_input and e.is_chg:
yield ('', 'change:', green('True'))
yield '\n'.join('{:>3} {:<8} {}'.format(*d) for d in gen()) + '\n\n'
addr_w = max(len(e.addr) for f in (tx.inputs,tx.outputs) for e in f)
return (
'Displaying inputs and outputs in {} sort order'.format({'raw':'raw','addr':'address'}[sort])
+ ('\n\n','\n')[terse]
+ ''.join(format_io('inputs'))
+ ''.join(format_io('outputs')) )
def strfmt_locktime(self,locktime=None,terse=False):
# Locktime itself is an unsigned 4-byte integer which can be parsed two ways:
#
# If less than 500 million, locktime is parsed as a block height. The transaction can be
# added to any block which has this height or higher.
# MMGen note: s/this height or higher/a higher block height/
#
# If greater than or equal to 500 million, locktime is parsed using the Unix epoch time
# format (the number of seconds elapsed since 1970-01-01T00:00 UTC). The transaction can be
# added to any block whose block time is greater than the locktime.
num = locktime or self.tx.locktime
if num == None:
return '(None)'
elif num >= 5 * 10**6:
import time
return ' '.join(time.strftime('%c',time.gmtime(num)).split()[1:])
elif num > 0:
return '{}{}'.format(('block height ','')[terse],num)
else:
die(2,f'{num!r}: invalid nLockTime value!')

View file

@ -0,0 +1,137 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.bitcoin.tx.new: Bitcoin new transaction class
"""
import mmgen.tx.new as TxBase
from .base import Base
from ....opts import opt
from ....obj import HexStr,MMGenTxID
from ....util import dmsg,vmsg,make_chksum_6
class New(Base,TxBase.New):
usr_fee_prompt = 'Enter transaction fee: '
fee_fail_fs = 'Network fee estimation for {c} confirmations failed ({t})'
no_chg_msg = 'Warning: Change address will be deleted as transaction produces no change'
msg_wallet_low_coin = 'Wallet has insufficient funds for this transaction ({} {} needed)'
@property
def relay_fee(self):
kb_fee = self.proto.coin_amt(self.rpc.cached['networkinfo']['relayfee'])
ret = kb_fee * self.estimate_size() / 1024
vmsg('Relay fee: {} {c}/kB, for transaction: {} {c}'.format(kb_fee,ret,c=self.coin))
return ret
async def get_rel_fee_from_network(self):
try:
ret = await self.rpc.call('estimatesmartfee',opt.tx_confs,opt.fee_estimate_mode.upper())
fee_per_kb = ret['feerate'] if 'feerate' in ret else -2
fe_type = 'estimatesmartfee'
except:
args = self.rpc.daemon.estimatefee_args(self.rpc)
fee_per_kb = await self.rpc.call('estimatefee',*args)
fe_type = 'estimatefee'
return fee_per_kb,fe_type
# given tx size, rel fee and units, return absolute fee
def fee_rel2abs(self,tx_size,units,amt,unit):
if tx_size:
return self.proto.coin_amt(amt * tx_size * getattr(self.proto.coin_amt,units[unit]))
else:
return None
# given network fee estimate in BTC/kB, return absolute fee using estimated tx size
def fee_est2abs(self,fee_per_kb,fe_type=None):
from decimal import Decimal
tx_size = self.estimate_size()
ret = self.proto.coin_amt(
fee_per_kb * Decimal(opt.tx_fee_adj) * tx_size / 1024,
from_decimal = True )
if opt.verbose:
msg(fmt(f"""
{fe_type.upper()} fee for {opt.tx_confs} confirmations: {fee_per_kb} {self.coin}/kB
TX size (estimated): {tx_size} bytes
Fee adjustment factor: {opt.tx_fee_adj:.2f}
Absolute fee (fee_per_kb * adj_factor * tx_size / 1024): {ret} {self.coin}
""").strip())
return ret
def convert_and_check_fee(self,tx_fee,desc):
abs_fee = self.feespec2abs(tx_fee,self.estimate_size())
if abs_fee == None:
raise ValueError(f'{tx_fee}: cannot convert {self.rel_fee_desc} to {self.coin}'
+ ' because transaction size is unknown')
if abs_fee == False:
err = f'{tx_fee!r}: invalid TX fee (not a {self.coin} amount or {self.rel_fee_desc} specification)'
elif abs_fee > self.proto.max_tx_fee:
err = f'{abs_fee} {self.coin}: {desc} fee too large (maximum fee: {self.proto.max_tx_fee} {self.coin})'
elif abs_fee < self.relay_fee:
err = f'{abs_fee} {self.coin}: {desc} fee too small (less than relay fee of {self.relay_fee} {self.coin})'
else:
return abs_fee
msg(err)
return False
async def get_cmdline_input_addrs(self):
# Bitcoin full node, call doesn't go to the network, so just call listunspent with addrs=[]
return []
def update_change_output(self,funds_left):
chg_idx = self.get_chg_output_idx()
if funds_left == 0:
msg(self.no_chg_msg)
self.outputs.pop(chg_idx)
else:
self.update_output_amt(chg_idx,self.proto.coin_amt(funds_left))
def check_fee(self):
fee = self.sum_inputs() - self.sum_outputs()
if fee > self.proto.max_tx_fee:
c = self.proto.coin
raise MaxFeeExceeded(f'Transaction fee of {fee} {c} too high! (> {self.proto.max_tx_fee} {c})')
def final_inputs_ok_msg(self,funds_left):
return 'Transaction produces {} {} in change'.format(
self.proto.coin_amt(funds_left).hl(),
self.coin
)
async def create_serialized(self,locktime=None,bump=None):
if not bump:
self.inputs.sort_bip69()
# do this only after inputs are sorted
if opt.rbf:
self.inputs[0].sequence = self.proto.max_int - 2 # handles the nLockTime case too
elif locktime:
self.inputs[0].sequence = self.proto.max_int - 1
self.outputs.sort_bip69()
ret = await self.rpc.call(
'createrawtransaction', [
{'txid':e.txid,'vout':e.vout,'sequence':e.sequence} if n == 0 and e.sequence else
{'txid':e.txid,'vout':e.vout}
for n,e in enumerate(self.inputs) ],
{e.addr:e.amt for e in self.outputs} )
if locktime and not bump:
msg(f'Setting nLockTime to {self.strfmt_locktime(locktime)}!')
assert isinstance(locktime,int), 'locktime value not an integer'
self.locktime = locktime
ret = ret[:-8] + bytes.fromhex('{:08x}'.format(locktime))[::-1].hex()
# TxID is set only once!
self.txid = MMGenTxID(make_chksum_6(bytes.fromhex(ret)).upper())
self.update_serialized(ret)

View file

@ -0,0 +1,82 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.bitcoin.tx.online: Bitcoin online signed transaction class
"""
import mmgen.tx.online as TxBase
from .signed import Signed
from ....globalvars import *
from ....util import msg,ymsg,rmsg
class OnlineSigned(Signed,TxBase.OnlineSigned):
async def send(self,prompt_user=True,exit_on_fail=False):
self.check_correct_chain()
if not g.bogus_send:
if self.has_segwit_outputs() and not self.rpc.info('segwit_is_active'):
die(2,'Transaction has Segwit outputs, but this blockchain does not support Segwit'
+ ' at the current height')
if self.fee > self.proto.max_tx_fee:
die(2,'Transaction fee ({}) greater than {} max_tx_fee ({} {})!'.format(
self.fee,
self.proto.name,
self.proto.max_tx_fee,
self.proto.coin ))
await self.status.display()
if prompt_user:
self.confirm_send()
if g.bogus_send:
ret = None
else:
try:
ret = await self.rpc.call('sendrawtransaction',self.serialized)
except Exception as e:
errmsg = str(e)
ret = False
if ret == False: # TODO: test send errors
if errmsg.count('Signature must use SIGHASH_FORKID'):
m = ('The Aug. 1 2017 UAHF has activated on this chain.\n'
+ 'Re-run the script with the --coin=bch option.' )
elif errmsg.count('Illegal use of SIGHASH_FORKID'):
m = ('The Aug. 1 2017 UAHF is not yet active on this chain.\n'
+ 'Re-run the script without the --coin=bch option.' )
elif errmsg.count('64: non-final'):
m = "Transaction with nLockTime {!r} can't be included in this block!".format(
self.strfmt_locktime(self.get_serialized_locktime()) )
else:
m = errmsg
ymsg(m)
rmsg(f'Send of MMGen transaction {self.txid} failed')
if exit_on_fail:
sys.exit(1)
return False
else:
if g.bogus_send:
m = 'BOGUS transaction NOT sent: {}'
else:
m = 'Transaction sent: {}'
assert ret == self.coin_txid, 'txid mismatch (after sending)'
msg(m.format(self.coin_txid.hl()))
self.add_timestamp()
self.add_blockcount()
self.desc = 'sent transaction'
return True
def print_contract_addr(self):
pass

View file

@ -0,0 +1,33 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.bitcoin.tx.signed: Bitcoin signed transaction class
"""
import mmgen.tx.signed as TxBase
from .completed import Completed
from ....util import fmt,vmsg
class Signed(Completed,TxBase.Signed):
def compare_size_and_estimated_size(self,tx_decoded):
est_vsize = self.estimate_size()
d = tx_decoded
vsize = d['vsize'] if 'vsize' in d else d['size']
vmsg(f'\nVsize: {vsize} (true) {est_vsize} (estimated)')
ratio = float(est_vsize) / vsize
if not (0.95 < ratio < 1.05): # allow for 5% error
from ....exception import BadTxSizeEstimate
raise BadTxSizeEstimate(fmt(f"""
Estimated transaction vsize is {ratio:1.2f} times the true vsize
Your transaction fee estimates will be inaccurate
Please re-create and re-sign the transaction using the option --vsize-adj={1/ratio:1.2f}
""").strip())

View file

@ -0,0 +1,102 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.bitcoin.tx.status: Bitcoin transaction status class
"""
import time
import mmgen.tx.status as TxBase
from ....opts import opt
from ....util import msg,suf,die,secs_to_dhms
class Status(TxBase.Status):
async def display(self,usr_req=False):
tx = self.tx
class r(object):
pass
async def is_in_wallet():
try:
ret = await tx.rpc.call('gettransaction',tx.coin_txid)
except:
return False
if ret.get('confirmations',0) > 0:
r.confs = ret['confirmations']
return True
else:
return False
async def is_in_utxos():
try:
return 'txid' in await tx.rpc.call('getrawtransaction',tx.coin_txid,True)
except:
return False
async def is_in_mempool():
try:
return 'height' in await tx.rpc.call('getmempoolentry',tx.coin_txid)
except:
return False
async def is_replaced():
if await is_in_mempool():
return False
try:
ret = await tx.rpc.call('gettransaction',tx.coin_txid)
except:
return False
else:
if 'bip125-replaceable' in ret and ret.get('confirmations',1) <= 0:
r.replacing_confs = -ret['confirmations']
r.replacing_txs = ret['walletconflicts']
return True
else:
return False
if await is_in_mempool():
if usr_req:
d = await tx.rpc.call('gettransaction',tx.coin_txid)
rep = ('' if d.get('bip125-replaceable') == 'yes' else 'NOT ') + 'replaceable'
t = d['timereceived']
if opt.quiet:
msg('Transaction is in mempool')
else:
msg(f'TX status: in mempool, {rep}')
msg('Sent {} ({} ago)'.format(
time.strftime('%c',time.gmtime(t)),
secs_to_dhms(int(time.time()-t))) )
else:
msg('Warning: transaction is in mempool!')
elif await is_in_wallet():
die(0,f'Transaction has {r.confs} confirmation{suf(r.confs)}')
elif await is_in_utxos():
rdie(2,'ERROR: transaction is in the blockchain (but not in the tracking wallet)!')
elif await is_replaced():
msg('Transaction has been replaced')
msg('Replacement transaction ' + (
f'has {r.replacing_confs} confirmation{suf(r.replacing_confs)}'
if r.replacing_confs else
'is in mempool' ) )
if not opt.quiet:
msg('Replacing transactions:')
d = []
for txid in r.replacing_txs:
try:
d.append(await tx.rpc.call('getmempoolentry',txid))
except:
d.append({})
for txid,mp_entry in zip(r.replacing_txs,d):
msg(f' {txid}' + (' in mempool' if 'height' in mp_entry else '') )
die(0,'')

View file

@ -0,0 +1,84 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.bitcoin.tx.unsigned: Bitcoin unsigned transaction class
"""
import mmgen.tx.unsigned as TxBase
from .completed import Completed
from ....globalvars import *
from ....obj import HexStr,CoinTxID,MMGenDict
from ....util import msg,msg_r,ymsg,qmsg,suf
class Unsigned(Completed,TxBase.Unsigned):
desc = 'unsigned transaction'
async def sign(self,tx_num_str,keys): # return signed object or False; don't exit or raise exception
try:
self.check_correct_chain()
except TransactionChainMismatch:
return False
if (self.has_segwit_inputs() or self.has_segwit_outputs()) and not self.proto.cap('segwit'):
ymsg(f"TX has Segwit inputs or outputs, but {self.coin} doesn't support Segwit!")
return False
self.check_pubkey_scripts()
qmsg(f'Passing {len(keys)} key{suf(keys)} to {self.rpc.daemon.exec_fn}')
if self.has_segwit_inputs():
from ....addr import KeyGenerator,AddrGenerator
kg = KeyGenerator(self.proto,'std')
ag = AddrGenerator(self.proto,'segwit')
keydict = MMGenDict([(d.addr,d.sec) for d in keys])
sig_data = []
for d in self.inputs:
e = {k:getattr(d,k) for k in ('txid','vout','scriptPubKey','amt')}
e['amount'] = e['amt']
del e['amt']
if d.mmtype == 'S':
e['redeemScript'] = ag.to_segwit_redeem_script(kg.gen_data(keydict[d.addr]))
sig_data.append(e)
msg_r(f'Signing transaction{tx_num_str}...')
wifs = [d.sec.wif for d in keys]
try:
args = (
('signrawtransaction', self.serialized,sig_data,wifs,self.proto.sighash_type),
('signrawtransactionwithkey',self.serialized,wifs,sig_data,self.proto.sighash_type)
)['sign_with_key' in self.rpc.caps]
ret = await self.rpc.call(*args)
except Exception as e:
ymsg(self.rpc.daemon.sigfail_errmsg(e))
return False
from ....tx import SignedTX
try:
self.update_serialized(ret['hex'])
new = await SignedTX(data=self.__dict__)
tx_decoded = await self.rpc.call( 'decoderawtransaction', ret['hex'] )
new.compare_size_and_estimated_size(tx_decoded)
new.coin_txid = CoinTxID(self.deserialized.txid)
if not new.coin_txid == tx_decoded['txid']:
raise BadMMGenTxID('txid mismatch (after signing)')
msg('OK')
return new
except Exception as e:
ymsg(f'\n{e.args[0]}')
if g.traceback:
import traceback
ymsg( '\n' + ''.join(traceback.format_exception(*sys.exc_info())) )
return False

View file

@ -1,606 +0,0 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
altcoins.base_proto.ethereum.tx: Ethereum transaction classes
"""
import json
from collections import namedtuple
from decimal import Decimal
from ...globalvars import g
from ...color import red,yellow,blue,pink
from ...opts import opt
from ...util import msg,msg_r,ymsg,dmsg,fmt,line_input,is_int,is_hex_str,make_chksum_6,die,suf,capfirst,pp_fmt
from ...exception import TransactionChainMismatch
from ...obj import Int,Str,HexStr,CoinTxID,MMGenTxID,ETHNonce
from ...addr import MMGenID,CoinAddr,TokenAddr,is_mmgen_id,is_coin_addr
from ...amt import ETHAmt
from ...tx import MMGenTX
from ...twctl import TrackingWallet
from .contract import Token
class EthereumMMGenTX:
class Base(MMGenTX.Base):
rel_fee_desc = 'gas price'
rel_fee_disp = 'gas price in Gwei'
txobj = None # ""
tx_gas = ETHAmt(21000,'wei') # an approximate number, used for fee estimation purposes
start_gas = ETHAmt(21000,'wei') # the actual startgas amt used in the transaction
# for simple sends with no data, tx_gas = start_gas = 21000
contract_desc = 'contract'
usr_contract_data = HexStr('')
disable_fee_check = False
# given absolute fee in ETH, return gas price in Gwei using tx_gas
def fee_abs2rel(self,abs_fee,to_unit='Gwei'):
ret = ETHAmt(int(abs_fee.toWei() // self.tx_gas.toWei()),'wei')
dmsg(f'fee_abs2rel() ==> {ret} ETH')
return ret if to_unit == 'eth' else ret.to_unit(to_unit,show_decimal=True)
def get_hex_locktime(self):
return None # TODO
# given rel fee (gasPrice) in wei, return absolute fee using tx_gas (not in MMGenTX)
def fee_gasPrice2abs(self,rel_fee):
assert isinstance(rel_fee,int), f'{rel_fee!r}: incorrect type for fee estimate (not an integer)'
return ETHAmt(rel_fee * self.tx_gas.toWei(),'wei')
def is_replaceable(self):
return True
async def get_receipt(self,txid,silent=False):
rx = await self.rpc.call('eth_getTransactionReceipt','0x'+txid) # -> null if pending
if not rx:
return None
tx = await self.rpc.call('eth_getTransactionByHash','0x'+txid)
return namedtuple('exec_status',['status','gas_sent','gas_used','gas_price','contract_addr','tx','rx'])(
status = Int(rx['status'],16), # zero is failure, non-zero success
gas_sent = Int(tx['gas'],16),
gas_used = Int(rx['gasUsed'],16),
gas_price = ETHAmt(int(tx['gasPrice'],16),from_unit='wei'),
contract_addr = self.proto.coin_addr(rx['contractAddress'][2:]) if rx['contractAddress'] else None,
tx = tx,
rx = rx,
)
class New(Base,MMGenTX.New):
hexdata_type = 'hex'
desc = 'transaction'
fee_fail_fs = 'Network fee estimation failed'
no_chg_msg = 'Warning: Transaction leaves account with zero balance'
usr_fee_prompt = 'Enter transaction fee or gas price: '
def __init__(self,*args,**kwargs):
MMGenTX.New.__init__(self,*args,**kwargs)
if opt.tx_gas:
self.tx_gas = self.start_gas = ETHAmt(int(opt.tx_gas),'wei')
if opt.contract_data:
m = "'--contract-data' option may not be used with token transaction"
assert not 'Token' in type(self).__name__, m
with open(opt.contract_data) as fp:
self.usr_contract_data = HexStr(fp.read().strip())
self.disable_fee_check = True
async def get_nonce(self):
return ETHNonce(int(await self.rpc.call('eth_getTransactionCount','0x'+self.inputs[0].addr,'pending'),16))
async def make_txobj(self): # called by create_raw()
self.txobj = {
'from': self.inputs[0].addr,
'to': self.outputs[0].addr if self.outputs else Str(''),
'amt': self.outputs[0].amt if self.outputs else ETHAmt('0'),
'gasPrice': self.fee_abs2rel(self.usr_fee,to_unit='eth'),
'startGas': self.start_gas,
'nonce': await self.get_nonce(),
'chainId': self.rpc.chainID,
'data': self.usr_contract_data,
}
# Instead of serializing tx data as with BTC, just create a JSON dump.
# This complicates things but means we avoid using the rlp library to deserialize the data,
# thus removing an attack vector
async def create_raw(self):
assert len(self.inputs) == 1,'Transaction has more than one input!'
o_num = len(self.outputs)
o_ok = 0 if self.usr_contract_data else 1
assert o_num == o_ok, f'Transaction has {o_num} output{suf(o_num)} (should have {o_ok})'
await self.make_txobj()
odict = { k: str(v) for k,v in self.txobj.items() if k != 'token_to' }
self.hex = json.dumps(odict)
self.update_txid()
def update_txid(self):
assert not is_hex_str(self.hex),'update_txid() must be called only when self.hex is not hex data'
self.txid = MMGenTxID(make_chksum_6(self.hex).upper())
def del_output(self,idx):
pass
def process_cmd_args(self,cmd_args,ad_f,ad_w):
lc = len(cmd_args)
if lc == 0 and self.usr_contract_data and not 'Token' in type(self).__name__:
return
if lc != 1:
die(1,f'{lc} output{suf(lc)} specified, but Ethereum transactions must have exactly one')
for a in cmd_args:
self.process_cmd_arg(a,ad_f,ad_w)
def select_unspent(self,unspent):
while True:
reply = line_input('Enter an account to spend from: ').strip()
if reply:
if not is_int(reply):
msg('Account number must be an integer')
elif int(reply) < 1:
msg('Account number must be >= 1')
elif int(reply) > len(unspent):
msg(f'Account number must be <= {len(unspent)}')
else:
return [int(reply)]
# coin-specific fee routines:
@property
def relay_fee(self):
return ETHAmt('0') # TODO
# get rel_fee (gas price) from network, return in native wei
async def get_rel_fee_from_network(self):
return Int(await self.rpc.call('eth_gasPrice'),16),'eth_gasPrice' # ==> rel_fee,fe_type
def check_fee(self):
if not self.disable_fee_check:
assert self.usr_fee <= self.proto.max_tx_fee
# given rel fee and units, return absolute fee using tx_gas
def fee_rel2abs(self,tx_size,units,amt,unit):
return ETHAmt(
ETHAmt(amt,units[unit]).toWei() * self.tx_gas.toWei(),
from_unit='wei'
)
# given fee estimate (gas price) in wei, return absolute fee, adjusting by opt.tx_fee_adj
def fee_est2abs(self,rel_fee,fe_type=None):
ret = self.fee_gasPrice2abs(rel_fee) * opt.tx_fee_adj
if opt.verbose:
msg(f'Estimated fee: {ret} ETH')
return ret
def convert_and_check_fee(self,tx_fee,desc='Missing description'):
abs_fee = self.feespec2abs(tx_fee,None)
if abs_fee == False:
return False
elif not self.disable_fee_check and (abs_fee > self.proto.max_tx_fee):
msg('{} {c}: {} fee too large (maximum fee: {} {c})'.format(
abs_fee.hl(),
desc,
self.proto.max_tx_fee.hl(),
c = self.proto.coin ))
return False
else:
return abs_fee
def update_change_output(self,funds_left):
if self.outputs and self.outputs[0].is_chg:
self.update_output_amt(0,ETHAmt(funds_left))
async def get_cmdline_input_addrs(self):
ret = []
if opt.inputs:
r = (await TrackingWallet(self.proto)).data_root # must create new instance here
m = 'Address {!r} not in tracking wallet'
for i in opt.inputs.split(','):
if is_mmgen_id(self.proto,i):
for addr in r:
if r[addr]['mmid'] == i:
ret.append(addr)
break
else:
raise UserAddressNotInWallet(m.format(i))
elif is_coin_addr(self.proto,i):
if not i in r:
raise UserAddressNotInWallet(m.format(i))
ret.append(i)
else:
die(1,f'{i!r}: not an MMGen ID or coin address')
return ret
def final_inputs_ok_msg(self,funds_left):
chg = '0' if (self.outputs and self.outputs[0].is_chg) else funds_left
return 'Transaction leaves {} {} in the sender’s account'.format(
ETHAmt(chg).hl(),
self.proto.coin
)
class Completed(Base,MMGenTX.Completed):
fn_fee_unit = 'Mwei'
txview_hdr_fs = 'TRANSACTION DATA\n\nID={i} ({a} {c}) UTC={t} Sig={s} Locktime={l}\n'
txview_hdr_fs_short = 'TX {i} ({a} {c}) UTC={t} Sig={s} Locktime={l}\n'
txview_ftr_fs = fmt("""
Total in account: {i} {d}
Total to spend: {o} {d}
Remaining balance: {C} {d}
TX fee: {a} {c}{r}
""")
fmt_keys = ('from','to','amt','nonce')
@property
def send_amt(self):
return self.outputs[0].amt if self.outputs else self.proto.coin_amt('0')
@property
def fee(self):
return self.fee_gasPrice2abs(self.txobj['gasPrice'].toWei())
@property
def change(self):
return self.sum_inputs() - self.send_amt - self.fee
def check_txfile_hex_data(self):
pass
def format_view_body(self,blockcount,nonmm_str,max_mmwid,enl,terse,sort):
m = {}
for k in ('inputs','outputs'):
if len(getattr(self,k)):
m[k] = getattr(self,k)[0].mmid if len(getattr(self,k)) else ''
m[k] = ' ' + m[k].hl() if m[k] else ' ' + MMGenID.hlc(nonmm_str)
fs = """From: {}{f_mmid}
To: {}{t_mmid}
Amount: {} {c}
Gas price: {g} Gwei
Start gas: {G} Kwei
Nonce: {}
Data: {d}
\n""".replace('\t','')
t = self.txobj
td = t['data']
from ...color import yellow
return fs.format(
*((t[k] if t[k] != '' else Str('None')).hl() for k in self.fmt_keys),
d = '{}... ({} bytes)'.format(td[:40],len(td)//2) if len(td) else Str('None'),
c = self.proto.dcoin if len(self.outputs) else '',
g = yellow(str(t['gasPrice'].to_unit('Gwei',show_decimal=True))),
G = yellow(str(t['startGas'].to_unit('Kwei'))),
t_mmid = m['outputs'] if len(self.outputs) else '',
f_mmid = m['inputs'] )
def format_view_abs_fee(self):
return self.fee.hl() + (' (max)' if self.txobj['data'] else '')
def format_view_rel_fee(self,terse):
return ' ({} of spend amount)'.format(
pink('{:0.6f}%'.format( self.fee / self.send_amt * 100 ))
)
def format_view_verbose_footer(self):
if self.txobj['data']:
from .contract import parse_abi
return '\nParsed contract data: ' + pp_fmt(parse_abi(self.txobj['data']))
else:
return ''
def check_sigs(self,deserial_tx=None): # TODO
if is_hex_str(self.hex):
return True
return False
def check_pubkey_scripts(self):
pass
class Unsigned(Completed,MMGenTX.Unsigned):
hexdata_type = 'json'
desc = 'unsigned transaction'
def parse_txfile_hex_data(self):
d = json.loads(self.hex)
o = {
'from': CoinAddr(self.proto,d['from']),
# NB: for token, 'to' is sendto address
'to': CoinAddr(self.proto,d['to']) if d['to'] else Str(''),
'amt': ETHAmt(d['amt']),
'gasPrice': ETHAmt(d['gasPrice']),
'startGas': ETHAmt(d['startGas']),
'nonce': ETHNonce(d['nonce']),
'chainId': None if d['chainId'] == 'None' else Int(d['chainId']),
'data': HexStr(d['data']) }
self.tx_gas = o['startGas'] # approximate, but better than nothing
self.txobj = o
return d # 'token_addr','decimals' required by Token subclass
async def do_sign(self,wif,tx_num_str):
o = self.txobj
o_conv = {
'to': bytes.fromhex(o['to']),
'startgas': o['startGas'].toWei(),
'gasprice': o['gasPrice'].toWei(),
'value': o['amt'].toWei() if o['amt'] else 0,
'nonce': o['nonce'],
'data': bytes.fromhex(o['data']) }
from .pyethereum.transactions import Transaction
etx = Transaction(**o_conv).sign(wif,o['chainId'])
assert etx.sender.hex() == o['from'],(
'Sender address recovered from signature does not match true sender')
from . import rlp
self.hex = rlp.encode(etx).hex()
self.coin_txid = CoinTxID(etx.hash.hex())
if o['data']:
if o['to']:
assert self.txobj['token_addr'] == TokenAddr(etx.creates.hex()),'Token address mismatch'
else: # token- or contract-creating transaction
self.txobj['token_addr'] = TokenAddr(self.proto,etx.creates.hex())
assert self.check_sigs(),'Signature check failed'
async def sign(self,tx_num_str,keys): # return TX object or False; don't exit or raise exception
try:
self.check_correct_chain()
except TransactionChainMismatch:
return False
msg_r(f'Signing transaction{tx_num_str}...')
try:
await self.do_sign(keys[0].sec.wif,tx_num_str)
msg('OK')
return MMGenTX.Signed(data=self.__dict__)
except Exception as e:
msg("{e!s}: transaction signing failed!")
if g.traceback:
import traceback
ymsg('\n'+''.join(traceback.format_exception(*sys.exc_info())))
return False
class Signed(Completed,MMGenTX.Signed):
desc = 'signed transaction'
def parse_txfile_hex_data(self):
from .pyethereum.transactions import Transaction
from . import rlp
etx = rlp.decode(bytes.fromhex(self.hex),Transaction)
d = etx.to_dict() # ==> hex values have '0x' prefix, 0 is '0x'
for k in ('sender','to','data'):
if k in d:
d[k] = d[k].replace('0x','',1)
o = {
'from': CoinAddr(self.proto,d['sender']),
# NB: for token, 'to' is token address
'to': CoinAddr(self.proto,d['to']) if d['to'] else Str(''),
'amt': ETHAmt(d['value'],'wei'),
'gasPrice': ETHAmt(d['gasprice'],'wei'),
'startGas': ETHAmt(d['startgas'],'wei'),
'nonce': ETHNonce(d['nonce']),
'data': HexStr(d['data']) }
if o['data'] and not o['to']: # token- or contract-creating transaction
# NB: could be a non-token contract address:
o['token_addr'] = TokenAddr(self.proto,etx.creates.hex())
self.disable_fee_check = True
txid = CoinTxID(etx.hash.hex())
assert txid == self.coin_txid,"txid in tx.hex doesn't match value in MMGen transaction file"
self.tx_gas = o['startGas'] # approximate, but better than nothing
self.txobj = o
return d # 'token_addr','decimals' required by Token subclass
async def get_status(self,status=False):
class r(object):
pass
async def is_in_mempool():
if not 'full_node' in self.rpc.caps:
return False
if self.rpc.daemon.id in ('parity','openethereum'):
pool = [x['hash'] for x in await self.rpc.call('parity_pendingTransactions')]
elif self.rpc.daemon.id in ('geth','erigon'):
res = await self.rpc.call('txpool_content')
pool = list(res['pending']) + list(res['queued'])
return '0x'+self.coin_txid in pool
async def is_in_wallet():
d = await self.rpc.call('eth_getTransactionReceipt','0x'+self.coin_txid)
if d and 'blockNumber' in d and d['blockNumber'] is not None:
r.confs = 1 + int(await self.rpc.call('eth_blockNumber'),16) - int(d['blockNumber'],16)
r.exec_status = int(d['status'],16)
return True
return False
if await is_in_mempool():
msg('Transaction is in mempool' if status else 'Warning: transaction is in mempool!')
return
if status:
if await is_in_wallet():
if self.txobj['data']:
cd = capfirst(self.contract_desc)
if r.exec_status == 0:
msg(f'{cd} failed to execute!')
else:
msg(f'{cd} successfully executed with status {r.exec_status}')
die(0,f'Transaction has {r.confs} confirmation{suf(r.confs)}')
die(1,'Transaction is neither in mempool nor blockchain!')
async def send(self,prompt_user=True,exit_on_fail=False):
self.check_correct_chain()
if not self.disable_fee_check and (self.fee > self.proto.max_tx_fee):
die(2,'Transaction fee ({}) greater than {} max_tx_fee ({} {})!'.format(
self.fee,
self.proto.name,
self.proto.max_tx_fee,
self.proto.coin ))
await self.get_status()
if prompt_user:
self.confirm_send()
if g.bogus_send:
ret = None
else:
try:
ret = await self.rpc.call('eth_sendRawTransaction','0x'+self.hex)
except:
raise
ret = False
if ret == False:
msg(red(f'Send of MMGen transaction {self.txid} failed'))
if exit_on_fail:
sys.exit(1)
return False
else:
if g.bogus_send:
m = 'BOGUS transaction NOT sent: {}'
else:
m = 'Transaction sent: {}'
assert ret == '0x'+self.coin_txid,'txid mismatch (after sending)'
if self.proto.network == 'regtest' and g.daemon_id == 'erigon': # ERIGON
import asyncio
await asyncio.sleep(5)
self.desc = 'sent transaction'
msg(m.format(self.coin_txid.hl()))
self.add_timestamp()
self.add_blockcount()
return True
def print_contract_addr(self):
if 'token_addr' in self.txobj:
msg('Contract address: {}'.format(self.txobj['token_addr'].hl()))
class Bump(MMGenTX.Bump,Completed,New):
@property
def min_fee(self):
return ETHAmt(self.fee * Decimal('1.101'))
def bump_fee(self,idx,fee):
self.txobj['gasPrice'] = self.fee_abs2rel(fee,to_unit='eth')
async def get_nonce(self):
return self.txobj['nonce']
class EthereumTokenMMGenTX:
class Base(EthereumMMGenTX.Base):
tx_gas = ETHAmt(52000,'wei')
start_gas = ETHAmt(60000,'wei')
contract_desc = 'token contract'
class New(Base,EthereumMMGenTX.New):
desc = 'transaction'
fee_is_approximate = True
async def make_txobj(self): # called by create_raw()
await super().make_txobj()
t = Token(self.proto,self.tw.token,self.tw.decimals)
o = self.txobj
o['token_addr'] = t.addr
o['decimals'] = t.decimals
o['token_to'] = o['to']
o['data'] = t.create_data(o['token_to'],o['amt'])
def update_change_output(self,funds_left):
if self.outputs[0].is_chg:
self.update_output_amt(0,self.inputs[0].amt)
# token transaction, so check both eth and token balances
# TODO: add test with insufficient funds
async def precheck_sufficient_funds(self,inputs_sum,sel_unspent,outputs_sum):
eth_bal = await self.tw.get_eth_balance(sel_unspent[0].addr)
if eth_bal == 0: # we don't know the fee yet
msg('This account has no ether to pay for the transaction fee!')
return False
return await super().precheck_sufficient_funds(inputs_sum,sel_unspent,outputs_sum)
async def get_funds_left(self,fee,outputs_sum):
return ( await self.tw.get_eth_balance(self.inputs[0].addr) ) - fee
def final_inputs_ok_msg(self,funds_left):
token_bal = (
ETHAmt('0') if self.outputs[0].is_chg
else self.inputs[0].amt - self.outputs[0].amt
)
return "Transaction leaves ≈{} {} and {} {} in the sender's account".format(
funds_left.hl(),
self.proto.coin,
token_bal.hl(),
self.proto.dcoin
)
class Completed(Base,EthereumMMGenTX.Completed):
fmt_keys = ('from','token_to','amt','nonce')
@property
def change(self):
return self.sum_inputs() - self.send_amt
def format_view_rel_fee(self,terse):
return ''
def format_view_body(self,*args,**kwargs):
return 'Token: {d} {c}\n{r}'.format(
d = self.txobj['token_addr'].hl(),
c = blue('(' + self.proto.dcoin + ')'),
r = super().format_view_body(*args,**kwargs ))
class Unsigned(Completed,EthereumMMGenTX.Unsigned):
desc = 'unsigned transaction'
def parse_txfile_hex_data(self):
d = EthereumMMGenTX.Unsigned.parse_txfile_hex_data(self)
o = self.txobj
o['token_addr'] = TokenAddr(self.proto,d['token_addr'])
o['decimals'] = Int(d['decimals'])
t = Token(self.proto,o['token_addr'],o['decimals'])
o['data'] = t.create_data(o['to'],o['amt'])
o['token_to'] = t.transferdata2sendaddr(o['data'])
async def do_sign(self,wif,tx_num_str):
o = self.txobj
t = Token(self.proto,o['token_addr'],o['decimals'])
tx_in = t.make_tx_in(o['from'],o['to'],o['amt'],self.start_gas,o['gasPrice'],nonce=o['nonce'])
(self.hex,self.coin_txid) = await t.txsign(tx_in,wif,o['from'],chain_id=o['chainId'])
assert self.check_sigs(),'Signature check failed'
class Signed(Completed,EthereumMMGenTX.Signed):
desc = 'signed transaction'
def parse_txfile_hex_data(self):
d = EthereumMMGenTX.Signed.parse_txfile_hex_data(self)
o = self.txobj
assert self.tw.token == o['to']
o['token_addr'] = TokenAddr(self.proto,o['to'])
o['decimals'] = self.tw.decimals
t = Token(self.proto,o['token_addr'],o['decimals'])
o['amt'] = t.transferdata2amt(o['data'])
o['token_to'] = t.transferdata2sendaddr(o['data'])
class Bump(EthereumMMGenTX.Bump,Completed,New):
pass

View file

View file

@ -0,0 +1,69 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.ethereum.tx.base: Ethereum base transaction class
"""
from collections import namedtuple
import mmgen.tx.base as TxBase
from ....amt import ETHAmt
from ....obj import HexStr,Int
from ....util import dmsg
class Base(TxBase.Base):
rel_fee_desc = 'gas price'
rel_fee_disp = 'gas price in Gwei'
txobj = None # ""
tx_gas = ETHAmt(21000,'wei') # an approximate number, used for fee estimation purposes
start_gas = ETHAmt(21000,'wei') # the actual startgas amt used in the transaction
# for simple sends with no data, tx_gas = start_gas = 21000
contract_desc = 'contract'
usr_contract_data = HexStr('')
disable_fee_check = False
# given absolute fee in ETH, return gas price in Gwei using tx_gas
def fee_abs2rel(self,abs_fee,to_unit='Gwei'):
ret = ETHAmt(int(abs_fee.toWei() // self.tx_gas.toWei()),'wei')
dmsg(f'fee_abs2rel() ==> {ret} ETH')
return ret if to_unit == 'eth' else ret.to_unit(to_unit,show_decimal=True)
# given rel fee (gasPrice) in wei, return absolute fee using tx_gas (Ethereum-only method)
def fee_gasPrice2abs(self,rel_fee):
assert isinstance(rel_fee,int), f'{rel_fee!r}: incorrect type for fee estimate (not an integer)'
return ETHAmt(rel_fee * self.tx_gas.toWei(),'wei')
def is_replaceable(self):
return True
async def get_receipt(self,txid,silent=False):
rx = await self.rpc.call('eth_getTransactionReceipt','0x'+txid) # -> null if pending
if not rx:
return None
tx = await self.rpc.call('eth_getTransactionByHash','0x'+txid)
return namedtuple('exec_status',['status','gas_sent','gas_used','gas_price','contract_addr','tx','rx'])(
status = Int(rx['status'],16), # zero is failure, non-zero success
gas_sent = Int(tx['gas'],16),
gas_used = Int(rx['gasUsed'],16),
gas_price = ETHAmt(int(tx['gasPrice'],16),from_unit='wei'),
contract_addr = self.proto.coin_addr(rx['contractAddress'][2:]) if rx['contractAddress'] else None,
tx = tx,
rx = rx,
)
def check_serialized_integrity(self): # TODO
return True
class TokenBase(Base):
tx_gas = ETHAmt(52000,'wei')
start_gas = ETHAmt(60000,'wei')
contract_desc = 'token contract'

View file

@ -0,0 +1,35 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.ethereum.tx.bump: Ethereum transaction bump class
"""
import mmgen.tx.bump as TxBase
from .completed import Completed,TokenCompleted
from .new import New,TokenNew
from ....amt import ETHAmt
from decimal import Decimal
class Bump(Completed,New,TxBase.Bump):
desc = 'fee-bumped transaction'
@property
def min_fee(self):
return ETHAmt(self.fee * Decimal('1.101'))
def bump_fee(self,idx,fee):
self.txobj['gasPrice'] = self.fee_abs2rel(fee,to_unit='eth')
async def get_nonce(self):
return self.txobj['nonce']
class TokenBump(TokenCompleted,TokenNew,Bump):
desc = 'fee-bumped transaction'

View file

@ -0,0 +1,55 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.ethereum.tx.completed: Ethereum completed transaction class
"""
import mmgen.tx.completed as TxBase
from .base import Base,TokenBase
class Completed(Base,TxBase.Completed):
fn_fee_unit = 'Mwei'
@property
def send_amt(self):
return self.outputs[0].amt if self.outputs else self.proto.coin_amt('0')
@property
def fee(self):
return self.fee_gasPrice2abs(self.txobj['gasPrice'].toWei())
@property
def change(self):
return self.sum_inputs() - self.send_amt - self.fee
def check_txfile_hex_data(self):
pass
def check_sigs(self): # TODO
from ....util import is_hex_str
if is_hex_str(self.serialized):
return True
return False
def check_pubkey_scripts(self):
pass
def strfmt_locktime(self,locktime=None,terse=False):
pass
def get_serialized_locktime(self):
return None # TODO
class TokenCompleted(TokenBase,Completed):
@property
def change(self):
return self.sum_inputs() - self.send_amt

View file

@ -0,0 +1,83 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.ethereum.tx.info: Ethereum transaction info class
"""
from ....tx.info import TxInfo
from ....util import fmt,pp_fmt
from ....color import pink,yellow,blue
from ....addr import MMGenID
from ....obj import Str
class TxInfo(TxInfo):
txinfo_hdr_fs = 'TRANSACTION DATA\n\nID={i} ({a} {c}) UTC={t} Sig={s} Locktime={l}\n'
txinfo_hdr_fs_short = 'TX {i} ({a} {c}) UTC={t} Sig={s} Locktime={l}\n'
txinfo_ftr_fs = fmt("""
Total in account: {i} {d}
Total to spend: {o} {d}
Remaining balance: {C} {d}
TX fee: {a} {c}{r}
""")
fmt_keys = ('from','to','amt','nonce')
def format_body(self,blockcount,nonmm_str,max_mmwid,enl,terse,sort):
tx = self.tx
m = {}
for k in ('inputs','outputs'):
if len(getattr(tx,k)):
m[k] = getattr(tx,k)[0].mmid if len(getattr(tx,k)) else ''
m[k] = ' ' + m[k].hl() if m[k] else ' ' + MMGenID.hlc(nonmm_str)
fs = """From: {}{f_mmid}
To: {}{t_mmid}
Amount: {} {c}
Gas price: {g} Gwei
Start gas: {G} Kwei
Nonce: {}
Data: {d}
\n""".replace('\t','')
t = tx.txobj
td = t['data']
return fs.format(
*((t[k] if t[k] != '' else Str('None')).hl() for k in self.fmt_keys),
d = '{}... ({} bytes)'.format(td[:40],len(td)//2) if len(td) else Str('None'),
c = tx.proto.dcoin if len(tx.outputs) else '',
g = yellow(str(t['gasPrice'].to_unit('Gwei',show_decimal=True))),
G = yellow(str(t['startGas'].to_unit('Kwei'))),
t_mmid = m['outputs'] if len(tx.outputs) else '',
f_mmid = m['inputs'] )
def format_abs_fee(self):
return self.tx.fee.hl() + (' (max)' if self.tx.txobj['data'] else '')
def format_rel_fee(self,terse):
return ' ({} of spend amount)'.format(
pink('{:0.6f}%'.format( self.tx.fee / self.tx.send_amt * 100 ))
)
def format_verbose_footer(self):
if self.tx.txobj['data']:
from ..contract import parse_abi
return '\nParsed contract data: ' + pp_fmt(parse_abi(self.tx.txobj['data']))
else:
return ''
class TokenTxInfo(TxInfo):
fmt_keys = ('from','token_to','amt','nonce')
def format_rel_fee(self,terse):
return ''
def format_body(self,*args,**kwargs):
return 'Token: {d} {c}\n{r}'.format(
d = self.tx.txobj['token_addr'].hl(),
c = blue('(' + self.tx.proto.dcoin + ')'),
r = super().format_body(*args,**kwargs ))

View file

@ -0,0 +1,197 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.ethereum.tx.new: Ethereum new transaction class
"""
import json
import mmgen.tx.new as TxBase
from .base import Base,TokenBase
from ....opts import opt
from ....obj import Int,ETHNonce,MMGenTxID,Str
from ....amt import ETHAmt
from ....util import msg,line_input,is_int,is_hex_str,make_chksum_6
from ....twctl import TrackingWallet
from ....addr import is_mmgen_id,is_coin_addr
from ..contract import Token
class New(Base,TxBase.New):
desc = 'transaction'
fee_fail_fs = 'Network fee estimation failed'
no_chg_msg = 'Warning: Transaction leaves account with zero balance'
usr_fee_prompt = 'Enter transaction fee or gas price: '
hexdata_type = 'hex'
async def get_nonce(self):
return ETHNonce(int(await self.rpc.call('eth_getTransactionCount','0x'+self.inputs[0].addr,'pending'),16))
async def make_txobj(self): # called by create_serialized()
self.txobj = {
'from': self.inputs[0].addr,
'to': self.outputs[0].addr if self.outputs else Str(''),
'amt': self.outputs[0].amt if self.outputs else ETHAmt('0'),
'gasPrice': self.fee_abs2rel(self.usr_fee,to_unit='eth'),
'startGas': self.start_gas,
'nonce': await self.get_nonce(),
'chainId': self.rpc.chainID,
'data': self.usr_contract_data,
}
# Instead of serializing tx data as with BTC, just create a JSON dump.
# This complicates things but means we avoid using the rlp library to deserialize the data,
# thus removing an attack vector
async def create_serialized(self,locktime=None,bump=None):
assert len(self.inputs) == 1,'Transaction has more than one input!'
o_num = len(self.outputs)
o_ok = 0 if self.usr_contract_data else 1
assert o_num == o_ok, f'Transaction has {o_num} output{suf(o_num)} (should have {o_ok})'
await self.make_txobj()
odict = { k: str(v) for k,v in self.txobj.items() if k != 'token_to' }
self.serialized = json.dumps(odict)
self.update_txid()
def update_txid(self):
assert not is_hex_str(self.serialized), (
'update_txid() must be called only when self.serialized is not hex data' )
self.txid = MMGenTxID(make_chksum_6(self.serialized).upper())
def process_cmd_args(self,cmd_args,ad_f,ad_w):
lc = len(cmd_args)
if lc == 0 and self.usr_contract_data and not 'Token' in type(self).__name__:
return
if lc != 1:
die(1,f'{lc} output{suf(lc)} specified, but Ethereum transactions must have exactly one')
for a in cmd_args:
self.process_cmd_arg(a,ad_f,ad_w)
def select_unspent(self,unspent):
while True:
reply = line_input('Enter an account to spend from: ').strip()
if reply:
if not is_int(reply):
msg('Account number must be an integer')
elif int(reply) < 1:
msg('Account number must be >= 1')
elif int(reply) > len(unspent):
msg(f'Account number must be <= {len(unspent)}')
else:
return [int(reply)]
# get rel_fee (gas price) from network, return in native wei
async def get_rel_fee_from_network(self):
return Int(await self.rpc.call('eth_gasPrice'),16),'eth_gasPrice' # ==> rel_fee,fe_type
def check_fee(self):
if not self.disable_fee_check:
assert self.usr_fee <= self.proto.max_tx_fee
# given rel fee and units, return absolute fee using self.tx_gas
def fee_rel2abs(self,tx_size,units,amt,unit):
return ETHAmt(
ETHAmt(amt,units[unit]).toWei() * self.tx_gas.toWei(),
from_unit='wei'
)
# given fee estimate (gas price) in wei, return absolute fee, adjusting by opt.tx_fee_adj
def fee_est2abs(self,rel_fee,fe_type=None):
ret = self.fee_gasPrice2abs(rel_fee) * opt.tx_fee_adj
if opt.verbose:
msg(f'Estimated fee: {ret} ETH')
return ret
def convert_and_check_fee(self,tx_fee,desc):
abs_fee = self.feespec2abs(tx_fee,None)
if abs_fee == False:
return False
elif not self.disable_fee_check and (abs_fee > self.proto.max_tx_fee):
msg('{} {c}: {} fee too large (maximum fee: {} {c})'.format(
abs_fee.hl(),
desc,
self.proto.max_tx_fee.hl(),
c = self.proto.coin ))
return False
else:
return abs_fee
def update_change_output(self,funds_left):
if self.outputs and self.outputs[0].is_chg:
self.update_output_amt(0,ETHAmt(funds_left))
async def get_cmdline_input_addrs(self):
ret = []
if opt.inputs:
data_root = (await TrackingWallet(self.proto)).data_root # must create new instance here
errmsg = 'Address {!r} not in tracking wallet'
for addr in opt.inputs.split(','):
if is_mmgen_id(self.proto,addr):
for waddr in data_root:
if data_root[waddr]['mmid'] == addr:
ret.append(waddr)
break
else:
raise UserAddressNotInWallet(errmsg.format(addr))
elif is_coin_addr(self.proto,addr):
if not addr in data_root:
raise UserAddressNotInWallet(errmsg.format(addr))
ret.append(addr)
else:
die(1,f'{addr!r}: not an MMGen ID or coin address')
return ret
def final_inputs_ok_msg(self,funds_left):
chg = '0' if (self.outputs and self.outputs[0].is_chg) else funds_left
return 'Transaction leaves {} {} in the sender’s account'.format(
ETHAmt(chg).hl(),
self.proto.coin
)
class TokenNew(TokenBase,New):
desc = 'transaction'
fee_is_approximate = True
async def make_txobj(self): # called by create_serialized()
await super().make_txobj()
t = Token(self.proto,self.tw.token,self.tw.decimals)
o = self.txobj
o['token_addr'] = t.addr
o['decimals'] = t.decimals
o['token_to'] = o['to']
o['data'] = t.create_data(o['token_to'],o['amt'])
def update_change_output(self,funds_left):
if self.outputs[0].is_chg:
self.update_output_amt(0,self.inputs[0].amt)
# token transaction, so check both eth and token balances
# TODO: add test with insufficient funds
async def precheck_sufficient_funds(self,inputs_sum,sel_unspent,outputs_sum):
eth_bal = await self.tw.get_eth_balance(sel_unspent[0].addr)
if eth_bal == 0: # we don't know the fee yet
msg('This account has no ether to pay for the transaction fee!')
return False
return await super().precheck_sufficient_funds(inputs_sum,sel_unspent,outputs_sum)
async def get_funds_left(self,fee,outputs_sum):
return ( await self.tw.get_eth_balance(self.inputs[0].addr) ) - fee
def final_inputs_ok_msg(self,funds_left):
token_bal = (
ETHAmt('0') if self.outputs[0].is_chg
else self.inputs[0].amt - self.outputs[0].amt
)
return "Transaction leaves ≈{} {} and {} {} in the sender's account".format(
funds_left.hl(),
self.proto.coin,
token_bal.hl(),
self.proto.dcoin
)

View file

@ -0,0 +1,83 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.ethereum.tx.online: Ethereum online signed transaction class
"""
from ....globalvars import *
import mmgen.tx.online as TxBase
from .signed import Signed,TokenSigned
from .. import erigon_sleep
from ....util import msg,rmsg
class OnlineSigned(Signed,TxBase.OnlineSigned):
async def send(self,prompt_user=True,exit_on_fail=False):
self.check_correct_chain()
if not self.disable_fee_check and (self.fee > self.proto.max_tx_fee):
die(2,'Transaction fee ({}) greater than {} max_tx_fee ({} {})!'.format(
self.fee,
self.proto.name,
self.proto.max_tx_fee,
self.proto.coin ))
await self.status.display()
if prompt_user:
self.confirm_send()
if g.bogus_send:
ret = None
else:
try:
ret = await self.rpc.call('eth_sendRawTransaction','0x'+self.serialized)
except:
raise
ret = False
if ret == False:
rmsg(f'Send of MMGen transaction {self.txid} failed')
if exit_on_fail:
sys.exit(1)
return False
else:
if g.bogus_send:
m = 'BOGUS transaction NOT sent: {}'
else:
m = 'Transaction sent: {}'
assert ret == '0x'+self.coin_txid,'txid mismatch (after sending)'
await erigon_sleep(self)
self.desc = 'sent transaction'
msg(m.format(self.coin_txid.hl()))
self.add_timestamp()
self.add_blockcount()
return True
def print_contract_addr(self):
if 'token_addr' in self.txobj:
msg('Contract address: {}'.format( self.txobj['token_addr'].hl() ))
class TokenOnlineSigned(TokenSigned,OnlineSigned):
def parse_txfile_serialized_data(self):
from ....addr import TokenAddr
from ..contract import Token
d = OnlineSigned.parse_txfile_serialized_data(self)
o = self.txobj
assert self.tw.token == o['to']
o['token_addr'] = TokenAddr(self.proto,o['to'])
o['decimals'] = self.tw.decimals
t = Token(self.proto,o['token_addr'],o['decimals'])
o['amt'] = t.transferdata2amt(o['data'])
o['token_to'] = t.transferdata2sendaddr(o['data'])

View file

@ -0,0 +1,58 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.ethereum.tx.signed: Ethereum signed transaction class
"""
import mmgen.tx.signed as TxBase
from .completed import Completed,TokenCompleted
from ..contract import Token
from ....obj import Str,CoinTxID,ETHNonce,HexStr
from ....addr import CoinAddr,TokenAddr
from ....amt import ETHAmt
class Signed(Completed,TxBase.Signed):
desc = 'signed transaction'
def parse_txfile_serialized_data(self):
from ..pyethereum.transactions import Transaction
from .. import rlp
etx = rlp.decode(bytes.fromhex(self.serialized),Transaction)
d = etx.to_dict() # ==> hex values have '0x' prefix, 0 is '0x'
for k in ('sender','to','data'):
if k in d:
d[k] = d[k].replace('0x','',1)
o = {
'from': CoinAddr(self.proto,d['sender']),
# NB: for token, 'to' is token address
'to': CoinAddr(self.proto,d['to']) if d['to'] else Str(''),
'amt': ETHAmt(d['value'],'wei'),
'gasPrice': ETHAmt(d['gasprice'],'wei'),
'startGas': ETHAmt(d['startgas'],'wei'),
'nonce': ETHNonce(d['nonce']),
'data': HexStr(d['data']) }
if o['data'] and not o['to']: # token- or contract-creating transaction
# NB: could be a non-token contract address:
o['token_addr'] = TokenAddr(self.proto,etx.creates.hex())
self.disable_fee_check = True
txid = CoinTxID(etx.hash.hex())
assert txid == self.coin_txid,"txid in tx.serialized doesn't match value in MMGen transaction file"
self.tx_gas = o['startGas'] # approximate, but better than nothing
self.txobj = o
return d # 'token_addr','decimals' required by Token subclass
class TokenSigned(TokenCompleted,Signed):
desc = 'signed transaction'
def parse_txfile_serialized_data(self):
raise NotImplementedError(
'Signed transaction files cannot be parsed offline, because tracking wallet is required!')

View file

@ -0,0 +1,63 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.ethereum.tx.status: Ethereum transaction status class
"""
import mmgen.tx.status as TxBase
from ....util import msg,die,suf,capfirst
class Status(TxBase.Status):
async def display(self,usr_req=False):
tx = self.tx
async def is_in_mempool():
if not 'full_node' in tx.rpc.caps:
return False
if tx.rpc.daemon.id in ('parity','openethereum'):
pool = [x['hash'] for x in await tx.rpc.call('parity_pendingTransactions')]
elif tx.rpc.daemon.id in ('geth','erigon'):
res = await tx.rpc.call('txpool_content')
pool = list(res['pending']) + list(res['queued'])
return '0x'+tx.coin_txid in pool
async def is_in_wallet():
d = await tx.rpc.call('eth_getTransactionReceipt','0x'+tx.coin_txid)
if d and 'blockNumber' in d and d['blockNumber'] is not None:
from collections import namedtuple
receipt_info = namedtuple('receipt_info',['confs','exec_status'])
return receipt_info(
confs = 1 + int(await tx.rpc.call('eth_blockNumber'),16) - int(d['blockNumber'],16),
exec_status = int(d['status'],16)
)
if await is_in_mempool():
msg(
'Transaction is in mempool' if usr_req else
'Warning: transaction is in mempool!' )
return
if usr_req:
ret = await is_in_wallet()
if ret:
if tx.txobj['data']:
cd = capfirst(tx.contract_desc)
if ret.exec_status == 0:
msg(f'{cd} failed to execute!')
else:
msg(f'{cd} successfully executed with status {ret.exec_status}')
die(0,f'Transaction has {ret.confs} confirmation{suf(ret.confs)}')
die(1,'Transaction is neither in mempool nor blockchain!')
class TokenStatus(Status):
pass

View file

@ -0,0 +1,104 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.ethereum.tx.unsigned: Ethereum unsigned transaction class
"""
import json
import mmgen.tx.unsigned as TxBase
from .completed import Completed,TokenCompleted
from ..contract import Token
from ....util import msg,msg_r,ymsg
from ....obj import Str,CoinTxID,ETHNonce,Int,HexStr
from ....addr import CoinAddr,TokenAddr
from ....amt import ETHAmt
class Unsigned(Completed,TxBase.Unsigned):
desc = 'unsigned transaction'
hexdata_type = 'json'
def parse_txfile_serialized_data(self):
d = json.loads(self.serialized)
o = {
'from': CoinAddr(self.proto,d['from']),
# NB: for token, 'to' is sendto address
'to': CoinAddr(self.proto,d['to']) if d['to'] else Str(''),
'amt': ETHAmt(d['amt']),
'gasPrice': ETHAmt(d['gasPrice']),
'startGas': ETHAmt(d['startGas']),
'nonce': ETHNonce(d['nonce']),
'chainId': None if d['chainId'] == 'None' else Int(d['chainId']),
'data': HexStr(d['data']) }
self.tx_gas = o['startGas'] # approximate, but better than nothing
self.txobj = o
return d # 'token_addr','decimals' required by Token subclass
async def do_sign(self,wif,tx_num_str):
o = self.txobj
o_conv = {
'to': bytes.fromhex(o['to']),
'startgas': o['startGas'].toWei(),
'gasprice': o['gasPrice'].toWei(),
'value': o['amt'].toWei() if o['amt'] else 0,
'nonce': o['nonce'],
'data': bytes.fromhex(o['data']) }
from ..pyethereum.transactions import Transaction
etx = Transaction(**o_conv).sign(wif,o['chainId'])
assert etx.sender.hex() == o['from'],(
'Sender address recovered from signature does not match true sender')
from .. import rlp
self.serialized = rlp.encode(etx).hex()
self.coin_txid = CoinTxID(etx.hash.hex())
if o['data']:
if o['to']:
assert self.txobj['token_addr'] == TokenAddr(etx.creates.hex()),'Token address mismatch'
else: # token- or contract-creating transaction
self.txobj['token_addr'] = TokenAddr(self.proto,etx.creates.hex())
async def sign(self,tx_num_str,keys): # return TX object or False; don't exit or raise exception
try:
self.check_correct_chain()
except TransactionChainMismatch:
return False
msg_r(f'Signing transaction{tx_num_str}...')
try:
await self.do_sign(keys[0].sec.wif,tx_num_str)
msg('OK')
from ....tx import SignedTX
return await SignedTX(data=self.__dict__)
except Exception as e:
msg(f'{e}: transaction signing failed!')
return False
class TokenUnsigned(TokenCompleted,Unsigned):
desc = 'unsigned transaction'
def parse_txfile_serialized_data(self):
d = Unsigned.parse_txfile_serialized_data(self)
o = self.txobj
o['token_addr'] = TokenAddr(self.proto,d['token_addr'])
o['decimals'] = Int(d['decimals'])
t = Token(self.proto,o['token_addr'],o['decimals'])
o['data'] = t.create_data(o['to'],o['amt'])
o['token_to'] = t.transferdata2sendaddr(o['data'])
async def do_sign(self,wif,tx_num_str):
o = self.txobj
t = Token(self.proto,o['token_addr'],o['decimals'])
tx_in = t.make_tx_in(o['from'],o['to'],o['amt'],self.start_gas,o['gasPrice'],nonce=o['nonce'])
(self.serialized,self.coin_txid) = await t.txsign(tx_in,wif,o['from'],chain_id=o['chainId'])

View file

@ -595,6 +595,12 @@ class bitcoin_core_daemon(CoinDaemon):
# RPC args: addr,label,rescan[=true],p2sh[=none]
return ('importaddress',coinaddr,lbl,False)
def estimatefee_args(self,rpc):
return (opt.tx_confs,)
def sigfail_errmsg(self,e):
return e.args[0]
class bitcoin_cash_node_daemon(bitcoin_core_daemon):
daemon_data = _dd('Bitcoin Cash Node', 24000000, '24.0.0')
exec_fn = 'bitcoind-bchn'
@ -612,6 +618,15 @@ class bitcoin_cash_node_daemon(bitcoin_core_daemon):
# Broken behavior: new label is set OK, but old label gets attached to another address
return ('importaddress',coinaddr,lbl,False)
def estimatefee_args(self,rpc):
return () if rpc.daemon_version >= 190100 else (opt.tx_confs,)
def sigfail_errmsg(self,e):
return (
'This is not the BCH chain.\nRe-run the script without the --coin=bch option.'
if 'Invalid sighash param' in e.args[0] else
e.args[0] )
class litecoin_core_daemon(bitcoin_core_daemon):
daemon_data = _dd('Litecoin Core', 180100, '0.18.1')
exec_fn = 'litecoind'

View file

@ -1 +1 @@
13.1.dev012
13.1.dev013

View file

@ -104,14 +104,14 @@ def help_notes_func(proto,po,k):
return fmt_list(CoinDaemon.get_network_ids(),fmt='bare')
def rel_fee_desc():
from .tx import MMGenTX
return MMGenTX.Base().rel_fee_desc
from .tx import BaseTX
return BaseTX(proto=proto).rel_fee_desc
def fee_spec_letters():
return fee_spec_letters()
def fee():
from .tx import MMGenTX
from .tx import BaseTX
return """
FEE SPECIFICATION: Transaction fees, both on the command line and at the
interactive prompt, may be specified as either absolute {c} amounts, using
@ -119,7 +119,7 @@ a plain decimal number, or as {r}, using an integer followed by
'{l}', for {u}.
""".format(
c = proto.coin,
r = MMGenTX.Base().rel_fee_desc,
r = BaseTX(proto=proto).rel_fee_desc,
l = fee_spec_letters(use_quotes=True),
u = fee_spec_names() )

View file

@ -137,7 +137,7 @@ if opt.mnemonic_fmt:
fmt_list(mn_fmts,fmt='no_spc') ))
from .wallet import Wallet
from .tx import MMGenTX
from .tx import UnsignedTX
from .txsign import txsign
from .protocol import init_proto
from .rpc import rpc_init
@ -197,12 +197,12 @@ def do_umount():
async def sign_tx_file(txfile):
try:
tx1 = MMGenTX.Unsigned(filename=txfile)
tx1 = UnsignedTX(filename=txfile)
if tx1.proto.sign_mode == 'daemon':
tx1.rpc = await rpc_init(tx1.proto)
tx2 = await txsign(tx1,wfs,None,None)
if tx2:
tx2.write_to_file(ask_write=False)
tx2.file.write(ask_write=False)
return tx2
else:
return False
@ -260,7 +260,7 @@ def print_summary(signed_txs):
bmsg('\nAutosign summary:\n')
def gen():
for tx in signed_txs:
yield tx.format_view(terse=True)
yield tx.info.format(terse=True)
msg_r(''.join(gen()))
return

View file

@ -145,4 +145,4 @@ async def main():
for tx,desc in ((tx1,'Long chain (timelocked)'),(tx2,'Short chain')):
tx.desc = desc + ' transaction'
tx.write_to_file(ask_write=False,ask_overwrite=not opt.yes,ask_write_default_yes=False)
tx.file.write(ask_write=False,ask_overwrite=not opt.yes,ask_write_default_yes=False)

View file

@ -115,28 +115,22 @@ do_license_msg()
silent = opt.yes and opt.tx_fee != None and opt.output_to_reduce != None
ext = get_extension(tx_file)
ext_data = {
MMGenTX.Unsigned.ext: 'Unsigned',
MMGenTX.Signed.ext: 'Signed',
}
if ext not in ext_data:
die(1,f'{ext!r}: unrecognized file extension')
from .tx import CompletedTX,BumpTX,UnsignedTX,OnlineSignedTX
async def main():
orig_tx = getattr(MMGenTX,ext_data[ext])(filename=tx_file)
orig_tx = await CompletedTX(filename=tx_file)
if not silent:
msg(green('ORIGINAL TRANSACTION'))
msg(orig_tx.format_view(terse=True))
msg(orig_tx.info.format(terse=True))
kal = get_keyaddrlist(orig_tx.proto,opt)
kl = get_keylist(orig_tx.proto,opt)
sign_and_send = bool(seed_files or kl or kal)
from .twctl import TrackingWallet
tx = MMGenTX.Bump(
tx = await BumpTX(
data = orig_tx.__dict__,
send = sign_and_send,
tw = await TrackingWallet(orig_tx.proto) if orig_tx.proto.tokensym else None )
@ -159,13 +153,10 @@ async def main():
assert tx.fee <= tx.proto.max_tx_fee
if tx.proto.base_proto == 'Bitcoin':
tx.outputs.sort_bip69() # output amts have changed, so re-sort
if not opt.yes:
tx.add_comment() # edits an existing comment
await tx.create_raw() # creates tx.hex, tx.txid
await tx.create_serialized(bump=True)
tx.add_timestamp()
tx.add_blockcount()
@ -174,18 +165,19 @@ async def main():
if not silent:
msg(green('\nREPLACEMENT TRANSACTION:'))
msg_r(tx.format_view(terse=True))
msg_r(tx.info.format(terse=True))
if sign_and_send:
tx2 = MMGenTX.Unsigned(data=tx.__dict__)
tx2 = UnsignedTX(data=tx.__dict__)
tx3 = await txsign(tx2,seed_files,kl,kal)
if tx3:
tx3.write_to_file(ask_write=False)
await tx3.send(exit_on_fail=True)
tx3.write_to_file(ask_write=False)
tx4 = await OnlineSignedTX(data=tx3.__dict__)
tx4.file.write(ask_write=False)
await tx4.send(exit_on_fail=True)
tx4.file.write(ask_write=False)
else:
die(2,'Transaction could not be signed')
else:
tx.write_to_file(ask_write=not opt.yes,ask_write_default_yes=False,ask_overwrite=not opt.yes)
tx.file.write(ask_write=not opt.yes,ask_write_default_yes=False,ask_overwrite=not opt.yes)
run_session(main())

View file

@ -81,11 +81,9 @@ async def main():
from .protocol import init_proto_from_opts
proto = init_proto_from_opts(need_amt=True)
from .tx import MMGenTX
from .tx import NewTX
from .twctl import TrackingWallet
tx1 = MMGenTX.New(
proto = proto,
tw = await TrackingWallet(proto) if proto.tokensym else None )
tx1 = await NewTX(proto=proto)
from .rpc import rpc_init
tx1.rpc = await rpc_init(proto)
@ -95,7 +93,7 @@ async def main():
locktime = int(opt.locktime or 0),
do_info = opt.info )
tx2.write_to_file(
tx2.file.write(
ask_write = not opt.yes,
ask_overwrite = not opt.yes,
ask_write_default_yes = False )

View file

@ -116,7 +116,7 @@ FMT CODES:
cmd_args = opts.init(opts_data)
from .tx import *
from .tx import NewTX,OnlineSignedTX
from .txsign import *
seed_files = get_seed_files(opt,cmd_args)
@ -127,9 +127,7 @@ async def main():
from .protocol import init_proto_from_opts
proto = init_proto_from_opts(need_amt=True)
tx1 = MMGenTX.New(
proto = proto,
tw = await TrackingWallet(proto) if proto.tokensym else None )
tx1 = await NewTX(proto=proto)
from .rpc import rpc_init
tx1.rpc = await rpc_init(proto)
@ -145,10 +143,11 @@ async def main():
tx3 = await txsign(tx2,seed_files,kl,kal)
if tx3:
tx3.write_to_file(ask_write=False)
await tx3.send(exit_on_fail=True)
tx3.write_to_file(ask_overwrite=False,ask_write=False)
tx3.print_contract_addr()
tx4 = await OnlineSignedTX(data=tx3.__dict__)
tx4.file.write(ask_write=False)
await tx4.send(exit_on_fail=True)
tx4.file.write(ask_overwrite=False,ask_write=False)
tx4.print_contract_addr()
else:
die(2,'Transaction could not be signed')

View file

@ -52,12 +52,11 @@ if not opt.status:
async def main():
from .tx import MMGenTX
from .tx import OnlineSignedTX
tx = MMGenTX.Signed(
tx = await OnlineSignedTX(
filename = infile,
quiet_open = True,
tw = await MMGenTX.Signed.get_tracking_wallet(infile) )
quiet_open = True )
from .rpc import rpc_init
tx.rpc = await rpc_init(tx.proto)
@ -67,16 +66,16 @@ async def main():
if opt.status:
if tx.coin_txid:
qmsg(f'{tx.proto.coin} txid: {tx.coin_txid.hl()}')
await tx.get_status(status=True)
await tx.status.display(usr_req=True)
sys.exit(0)
if not opt.yes:
tx.view_with_prompt('View transaction details?')
tx.info.view_with_prompt('View transaction details?')
if tx.add_comment(): # edits an existing comment, returns true if changed
tx.write_to_file(ask_write_default_yes=True)
tx.file.write(ask_write_default_yes=True)
await tx.send(exit_on_fail=True)
tx.write_to_file(ask_overwrite=False,ask_write=False)
tx.file.write(ask_overwrite=False,ask_write=False)
tx.print_contract_addr()
run_session(main())

View file

@ -123,7 +123,8 @@ async def main():
tx_num_disp = f' #{tx_num}'
msg(f'\nTransaction{tx_num_disp} of {len(tx_files)}:')
tx1 = MMGenTX.Unsigned(filename=tx_file)
from .tx import UnsignedTX
tx1 = UnsignedTX(filename=tx_file)
vmsg(f'Successfully opened transaction file {tx_file!r}')
@ -140,7 +141,7 @@ async def main():
continue
if not opt.yes:
tx1.view_with_prompt(f'View data for transaction{tx_num_disp}?')
tx1.info.view_with_prompt(f'View data for transaction{tx_num_disp}?')
kal = get_keyaddrlist(tx1.proto,opt)
kl = get_keylist(tx1.proto,opt)
@ -149,7 +150,7 @@ async def main():
if tx2:
if not opt.yes:
tx2.add_comment() # edits an existing comment
tx2.write_to_file(ask_write=not opt.yes,ask_write_default_yes=True,add_desc=tx_num_disp)
tx2.file.write(ask_write=not opt.yes,ask_write_default_yes=True,add_desc=tx_num_disp)
else:
ymsg('Transaction could not be signed')
bad_tx_count += 1

View file

@ -159,15 +159,15 @@ class tool_cmd(tool_cmd_base):
def addr2pubhash(self,addr:'sstr'):
"convert coin address to public key hash"
from ..tx import addr2pubhash
from ..base_proto.bitcoin.tx.base import addr2pubhash
return addr2pubhash( self.proto, CoinAddr(self.proto,addr) )
def addr2scriptpubkey(self,addr:'sstr'):
"convert coin address to scriptPubKey"
from ..tx import addr2scriptPubKey
from ..base_proto.bitcoin.tx.base import addr2scriptPubKey
return addr2scriptPubKey( self.proto, CoinAddr(self.proto,addr) )
def scriptpubkey2addr(self,hexstr:'sstr'):
"convert scriptPubKey to coin address"
from ..tx import scriptPubKey2addr
from ..base_proto.bitcoin.tx.base import scriptPubKey2addr
return scriptPubKey2addr( self.proto, hexstr )[0]

View file

@ -90,20 +90,13 @@ class tool_cmd(tool_cmd_base):
file_sort = kwargs.get('filesort') or 'mtime'
from ..filename import MMGenFileList
from ..tx import MMGenTX
flist = MMGenFileList( infiles, ftype=MMGenTX )
from ..tx import completed,CompletedTX
flist = MMGenFileList( infiles, ftype=completed.Completed )
flist.sort_by_age( key=file_sort ) # in-place sort
async def process_file(fn):
if fn.endswith(MMGenTX.Signed.ext):
tx = MMGenTX.Signed(
filename = fn,
quiet_open = True,
tw = await MMGenTX.Signed.get_tracking_wallet(fn) )
else:
tx = MMGenTX.Unsigned(
filename = fn,
quiet_open = True )
return tx.format_view( terse=terse, sort=tx_sort )
async def process_file(f):
return (await CompletedTX(
filename = f.name,
quiet_open = True)).info.format( terse=terse, sort=tx_sort )
return (''*77+'\n').join([await process_file(fn) for fn in flist.names()]).rstrip()
return (''*77+'\n').join([await process_file(f) for f in flist]).rstrip()

File diff suppressed because it is too large Load diff

98
mmgen/tx/__init__.py Executable file
View file

@ -0,0 +1,98 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
tx.__init__: transaction class initializer
"""
from ..objmethods import MMGenObject
def _base_proto_subclass(clsname,modname,proto):
if proto:
clsname = ('Token' if proto.tokensym else '') + clsname
modname = 'mmgen.base_proto.{}.tx.{}'.format( proto.base_proto.lower(), modname )
else:
modname = 'mmgen.tx.base'
import importlib
return getattr( importlib.import_module(modname), clsname )
def _get_cls_info(clsname,modname,args,kwargs):
assert args == (), f'{clsname}.chk1: only keyword args allowed in {clsname} initializer'
if 'proto' in kwargs:
proto = kwargs['proto']
elif 'data' in kwargs:
proto = kwargs['data']['proto']
elif 'filename' in kwargs:
from ..txfile import MMGenTxFile
proto = MMGenTxFile.get_proto( kwargs['filename'], quiet_open=True )
elif clsname == 'Base':
proto = None
else:
raise ValueError(
f"{clsname} must be instantiated with 'proto','data' or 'filename' keyword" )
if clsname == 'Completed':
from ..util import get_extension,fmt_list
from .unsigned import Unsigned
from .signed import Signed
ext = get_extension(kwargs['filename'])
cls_data = {
Unsigned.ext: ('Unsigned','unsigned'),
Signed.ext: ('OnlineSigned','online') if proto.tokensym else ('Signed','signed')
}
if ext not in cls_data:
die(1,f'{ext!r}: unrecognized file extension for CompletedTX (not in {fmt_list(cls_data)})')
clsname,modname = cls_data[ext]
kwargs['proto'] = proto
return ( proto, clsname, modname, kwargs )
def _get_obj( _clsname, _modname, *args, **kwargs ):
"""
determine cls/mod/proto and pass them to _base_proto_subclass() to get a transaction instance
"""
proto,clsname,modname,kwargs = _get_cls_info(_clsname,_modname,args,kwargs)
return _base_proto_subclass( clsname, modname, proto )(*args,**kwargs)
async def _get_obj_async( _clsname, _modname, *args, **kwargs ):
proto,clsname,modname,kwargs = _get_cls_info(_clsname,_modname,args,kwargs)
# NB: tracking wallet needed to retrieve the 'symbol' and 'decimals' parameters of token addr
# (see twctl:import_token()).
# No tracking wallet required for the Unsigned and Signed(data=unsigned.__dict__) classes used
# during signing.
if proto and proto.tokensym and clsname in ('New','OnlineSigned'):
from ..twctl import TrackingWallet
kwargs['tw'] = await TrackingWallet(proto)
return _base_proto_subclass( clsname, modname, proto )(*args,**kwargs)
def _get(clsname,modname):
return lambda *args,**kwargs: _get_obj(clsname,modname,*args,**kwargs)
def _get_async(clsname,modname):
return lambda *args,**kwargs: _get_obj_async(clsname,modname,*args,**kwargs)
BaseTX = _get('Base', 'base')
UnsignedTX = _get('Unsigned', 'unsigned')
NewTX = _get_async('New', 'new')
CompletedTX = _get_async('Completed', 'completed')
SignedTX = _get_async('Signed', 'signed')
OnlineSignedTX = _get_async('OnlineSigned', 'online')
BumpTX = _get_async('Bump', 'bump')

184
mmgen/tx/base.py Executable file
View file

@ -0,0 +1,184 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
tx.base: base transaction class
"""
from ..globalvars import *
from ..objmethods import MMGenObject
from ..obj import ImmutableAttr,ListItemAttr,MMGenListItem,MMGenTxLabel,TwComment,CoinTxID,HexStr
from ..addr import MMGenID,CoinAddr
from ..util import msg,ymsg,fmt,remove_dups,keypress_confirm,make_timestamp,line_input
from ..opts import opt
class MMGenTxIO(MMGenListItem):
vout = ListItemAttr(int,typeconv=False)
amt = ImmutableAttr(None)
label = ListItemAttr(TwComment,reassign_ok=True)
mmid = ListItemAttr(MMGenID,include_proto=True)
addr = ImmutableAttr(CoinAddr,include_proto=True)
confs = ListItemAttr(int) # confs of type long exist in the wild, so convert
txid = ListItemAttr(CoinTxID)
have_wif = ListItemAttr(bool,typeconv=False,delete_ok=True)
invalid_attrs = {'proto','tw_copy_attrs'}
def __init__(self,proto,**kwargs):
self.__dict__['proto'] = proto
MMGenListItem.__init__(self,**kwargs)
@property
def mmtype(self):
"""
Attempt to determine input or outputs MMGenAddrType. For non-MMGen
addresses, infer the type from the address format, returning None for
P2PKH, which could be either 'L' or 'C'.
"""
return (
str(self.mmid.mmtype) if self.mmid else
'B' if self.addr.addr_fmt == 'bech32' else
'S' if self.addr.addr_fmt == 'p2sh' else
None )
class conv_funcs:
def amt(self,value):
return self.proto.coin_amt(value)
class MMGenTxIOList(list,MMGenObject):
def __init__(self,parent,data=None):
self.parent = parent
if data:
assert isinstance(data,list), 'MMGenTxIOList_check1'
data = data
else:
data = list()
list.__init__(self,data)
class Base(MMGenObject):
desc = 'transaction'
label = None
txid = None
coin_txid = None
timestamp = None
blockcount = None
coin = None
dcoin = None
locktime = None
chain = None
signed = False
non_mmgen_inputs_msg = f"""
This transaction includes inputs with non-{g.proj_name} addresses. When
signing the transaction, private keys for the addresses listed below must
be supplied using the --keys-from-file option. The key file must contain
one key per line. Please note that this transaction cannot be autosigned,
as autosigning does not support the use of key files.
Non-{g.proj_name} addresses found in inputs:
{{}}
"""
class Input(MMGenTxIO):
scriptPubKey = ListItemAttr(HexStr)
sequence = ListItemAttr(int,typeconv=False)
tw_copy_attrs = { 'scriptPubKey','vout','amt','label','mmid','addr','confs','txid' }
class Output(MMGenTxIO):
is_chg = ListItemAttr(bool,typeconv=False)
class InputList(MMGenTxIOList):
desc = 'transaction inputs'
class OutputList(MMGenTxIOList):
desc = 'transaction outputs'
def __init__(self,*args,**kwargs):
self.inputs = self.InputList(self)
self.outputs = self.OutputList(self)
self.name = type(self).__name__
self.proto = kwargs.get('proto')
self.tw = kwargs.get('tw')
@property
def coin(self):
return self.proto.coin
@property
def dcoin(self):
return self.proto.dcoin
def check_correct_chain(self):
if hasattr(self,'rpc'):
if self.chain != self.rpc.chain:
raise TransactionChainMismatch(
f'Transaction is for {self.chain}, but coin daemon chain is {self.rpc.chain}!')
def sum_inputs(self):
return sum(e.amt for e in self.inputs)
def sum_outputs(self,exclude=None):
if exclude == None:
olist = self.outputs
else:
olist = self.outputs[:exclude] + self.outputs[exclude+1:]
if not olist:
return self.proto.coin_amt('0')
return self.proto.coin_amt(sum(e.amt for e in olist))
def get_chg_output_idx(self):
ch_ops = [x.is_chg for x in self.outputs]
try:
return ch_ops.index(True)
except ValueError:
return None
def add_timestamp(self):
self.timestamp = make_timestamp()
def add_blockcount(self):
self.blockcount = self.rpc.blockcount
# returns true if comment added or changed
def add_comment(self,infile=None):
if infile:
from ..fileutil import get_data_from_file
self.label = MMGenTxLabel(get_data_from_file(infile,'transaction comment'))
else: # get comment from user, or edit existing comment
m = ('Add a comment to transaction?','Edit transaction comment?')[bool(self.label)]
if keypress_confirm(m,default_yes=False):
while True:
s = MMGenTxLabel(line_input('Comment: ',insert_txt=self.label))
if not s:
ymsg('Warning: comment is empty')
lbl_save = self.label
self.label = s
return (True,False)[lbl_save == self.label]
return False
def get_non_mmaddrs(self,desc):
return remove_dups(
(i.addr for i in getattr(self,desc) if not i.mmid),
edesc = 'non-MMGen address',
quiet = True )
def check_non_mmgen_inputs(self,caller,non_mmaddrs=None):
non_mmaddrs = non_mmaddrs or self.get_non_mmaddrs('inputs')
if non_mmaddrs:
indent = ' '
fs = fmt(self.non_mmgen_inputs_msg,strip_char='\t',indent=indent).strip()
m = fs.format('\n '.join(non_mmaddrs))
if caller in ('txdo','txsign'):
if not opt.keys_from_file:
raise UserOptError(f'\n{indent}ERROR: {m}\n')
else:
msg(f'\n{indent}WARNING: {m}\n')
if not (opt.yes or keypress_confirm('Continue?',default_yes=True)):
die(1,'Exiting at user request')

84
mmgen/tx/bump.py Executable file
View file

@ -0,0 +1,84 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
tx.bump: transaction bump class
"""
from .new import New
from .completed import Completed
from ..opts import opt
from ..util import line_input,is_int,keypress_confirm
class Bump(Completed,New):
desc = 'fee-bumped transaction'
ext = 'rawtx'
bump_output_idx = None
def __init__(self,send,*args,**kwargs):
super().__init__(*args,**kwargs)
if not self.is_replaceable():
die(1,f'Transaction {self.txid} is not replaceable')
# If sending, require original tx to be sent
if send and not self.coin_txid:
die(1,'Transaction {self.txid!r} was not broadcast to the network')
self.coin_txid = ''
def check_sufficient_funds_for_bump(self):
if not [o.amt for o in self.outputs if o.amt >= self.min_fee]:
die(1,
'Transaction cannot be bumped.\n' +
f'All outputs contain less than the minimum fee ({self.min_fee} {self.coin})')
def choose_output(self):
chg_idx = self.get_chg_output_idx()
init_reply = opt.output_to_reduce
def check_sufficient_funds(o_amt):
if o_amt < self.min_fee:
msg(f'Minimum fee ({self.min_fee} {self.coin}) is greater than output amount ({o_amt} {self.coin})')
return False
return True
if len(self.outputs) == 1:
if check_sufficient_funds(self.outputs[0].amt):
self.bump_output_idx = 0
return 0
else:
die(1,'Insufficient funds to bump transaction')
while True:
if init_reply == None:
m = 'Choose an output to deduct the fee from (Hit ENTER for the change output): '
reply = line_input(m) or 'c'
else:
reply,init_reply = init_reply,None
if chg_idx == None and not is_int(reply):
msg('Output must be an integer')
elif chg_idx != None and not is_int(reply) and reply != 'c':
msg("Output must be an integer, or 'c' for the change output")
else:
idx = chg_idx if reply == 'c' else (int(reply) - 1)
if idx < 0 or idx >= len(self.outputs):
msg(f'Output must be in the range 1-{len(self.outputs)}')
else:
o_amt = self.outputs[idx].amt
cm = ' (change output)' if chg_idx == idx else ''
prompt = f'Fee will be deducted from output {idx+1}{cm} ({o_amt} {self.coin})'
if check_sufficient_funds(o_amt):
if opt.yes or keypress_confirm(prompt+'. OK?',default_yes=True):
if opt.yes:
msg(prompt)
self.bump_output_idx = idx
return idx

53
mmgen/tx/completed.py Executable file
View file

@ -0,0 +1,53 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
tx.completed: completed transaction class
"""
from .base import Base
class Completed(Base):
"""
signed or unsigned transaction with associated file
"""
filename_api = True
def __init__(self,filename=None,data=None,quiet_open=False,*args,**kwargs):
assert (filename or data) and not (filename and data), 'CompletedTX_chk1'
super().__init__(*args,**kwargs)
if data:
data['tw'] = self.tw
self.__dict__ = data
self.name = type(self).__name__
else:
from ..txfile import MMGenTxFile
MMGenTxFile(self).parse(filename,quiet_open=quiet_open)
self.check_serialized_integrity()
# repeat with sign and send, because coin daemon could be restarted
self.check_correct_chain()
if self.check_sigs() != self.signed:
die(1,'Transaction is {}signed!'.format('not' if self.signed else ''))
@property
def info(self):
from .info import init_info
return init_info(self)
@property
def file(self):
from ..txfile import MMGenTxFile
return MMGenTxFile(self)

121
mmgen/tx/info.py Executable file
View file

@ -0,0 +1,121 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
tx.info: transaction info class
"""
from ..globalvars import *
from ..color import red,green,orange
from ..opts import opt
from ..util import msg,msg_r
import importlib
class TxInfo:
def __init__(self,tx):
self.tx = tx
def format(self,terse=False,sort='addr'):
tx = self.tx
if tx.proto.base_proto == 'Ethereum':
blockcount = None
else:
try:
blockcount = tx.rpc.blockcount
except:
blockcount = None
def get_max_mmwid(io):
if io == tx.inputs:
sel_f = lambda o: len(o.mmid) + 2 # len('()')
else:
sel_f = lambda o: len(o.mmid) + (2,8)[bool(o.is_chg)] # + len(' (chg)')
return max(max([sel_f(o) for o in io if o.mmid] or [0]),len(nonmm_str))
nonmm_str = f'(non-{g.proj_name} address)'
max_mmwid = max(get_max_mmwid(tx.inputs),get_max_mmwid(tx.outputs))
def gen_view():
yield (self.txinfo_hdr_fs_short if terse else self.txinfo_hdr_fs).format(
i = tx.txid.hl(),
a = tx.send_amt.hl(),
c = tx.dcoin,
t = tx.timestamp,
r = green('True') if tx.is_replaceable() else red('False'),
s = green('True') if tx.signed else red('False'),
l = (
orange(self.strfmt_locktime(terse=True)) if tx.locktime else
green('None') ))
if tx.chain != 'mainnet': # if mainnet has a coin-specific name, display it
yield green(f'Chain: {tx.chain.upper()}') + '\n'
if tx.coin_txid:
yield f'{tx.coin} TxID: {tx.coin_txid.hl()}\n'
enl = ('\n','')[bool(terse)]
yield enl
if tx.label:
yield f'Comment: {tx.label.hl()}\n{enl}'
yield self.format_body(blockcount,nonmm_str,max_mmwid,enl,terse=terse,sort=sort)
yield self.txinfo_ftr_fs.format(
i = tx.sum_inputs().hl(),
o = tx.sum_outputs().hl(),
C = tx.change.hl(),
s = tx.send_amt.hl(),
a = self.format_abs_fee(),
r = self.format_rel_fee(terse),
d = tx.dcoin,
c = tx.coin )
if opt.verbose:
yield self.format_verbose_footer()
return ''.join(gen_view()) # TX label might contain non-ascii chars
def view_with_prompt(self,prompt,pause=True):
prompt += ' (y)es, (N)o, pager (v)iew, (t)erse view: '
from ..term import get_char
while True:
reply = get_char( prompt, immed_chars='YyNnVvTt' ).strip('\n\r')
msg('')
if reply == '' or reply in 'Nn':
break
elif reply in 'YyVvTt':
self.view(
pager = reply in 'Vv',
pause = pause,
terse = reply in 'Tt' )
break
else:
msg('Invalid reply')
def view(self,pager=False,pause=True,terse=False):
o = self.format(terse=terse)
if pager:
do_pager(o)
else:
msg_r(o)
from ..term import get_char
if pause:
get_char('Press any key to continue: ')
msg('')
def init_info(tx):
return getattr(
importlib.import_module(f'mmgen.base_proto.{tx.proto.base_proto.lower()}.tx.info'),
('Token' if tx.proto.tokensym else '') + 'TxInfo' )(tx)

390
mmgen/tx/new.py Executable file
View file

@ -0,0 +1,390 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
tx.new: new transaction class
"""
from ..globalvars import *
from ..opts import opt
from .base import Base
from ..color import pink
from ..obj import get_obj,HexStr
from ..util import msg,qmsg,fmt,suf,remove_dups,get_extension,keypress_confirm,do_license_msg,line_input
from ..addr import is_mmgen_id,CoinAddr,is_coin_addr
def mmaddr2coinaddr(mmaddr,ad_w,ad_f,proto):
def wmsg(k):
messages = {
'addr_in_addrfile_only': f"""
Warning: output address {mmaddr} is not in the tracking wallet, which
means its balance will not be tracked. You're strongly advised to import
the address into your tracking wallet before broadcasting this transaction.
""",
'addr_not_found': f"""
No data for {g.proj_name} address {mmaddr} could be found in either the
tracking wallet or the supplied address file. Please import this address
into your tracking wallet, or supply an address file on the command line.
""",
'addr_not_found_no_addrfile': f"""
No data for {g.proj_name} address {mmaddr} could be found in the tracking
wallet. Please import this address into your tracking wallet or supply an
address file for it on the command line.
"""
}
return '\n' + fmt(messages[k],indent=' ')
# assume mmaddr has already been checked
coin_addr = ad_w.mmaddr2coinaddr(mmaddr)
if not coin_addr:
if ad_f:
coin_addr = ad_f.mmaddr2coinaddr(mmaddr)
if coin_addr:
msg(wmsg('addr_in_addrfile_only'))
if not (opt.yes or keypress_confirm('Continue anyway?')):
sys.exit(1)
else:
ydie(2,wmsg('addr_not_found'))
else:
ydie(2,wmsg('addr_not_found_no_addrfile'))
return CoinAddr(proto,coin_addr)
class New(Base):
fee_is_approximate = False
msg_low_coin = 'Selected outputs insufficient to fund this transaction ({} {} needed)'
msg_no_change_output = """
ERROR: No change address specified. If you wish to create a transaction with
only one output, specify a single output address with no {} amount
"""
def __init__(self,*args,**kwargs):
super().__init__(*args,**kwargs)
if self.proto.base_proto == 'Ethereum':
if opt.tx_gas:
from ..amt import ETHAmt
self.tx_gas = self.start_gas = ETHAmt(int(opt.tx_gas),'wei')
if opt.contract_data:
m = "'--contract-data' option may not be used with token transaction"
assert not 'Token' in type(self).__name__, m
with open(opt.contract_data) as fp:
self.usr_contract_data = HexStr(fp.read().strip())
self.disable_fee_check = True
def update_output_amt(self,idx,amt):
o = self.outputs[idx]._asdict()
o['amt'] = amt
self.outputs[idx] = self.Output(self.proto,**o)
def add_mmaddrs_to_outputs(self,ad_w,ad_f):
a = [e.addr for e in self.outputs]
d = ad_w.make_reverse_dict(a)
if ad_f:
d.update(ad_f.make_reverse_dict(a))
for e in self.outputs:
if e.addr and e.addr in d:
e.mmid,f = d[e.addr]
if f:
e.label = f
def check_dup_addrs(self,io_str):
assert io_str in ('inputs','outputs')
addrs = [e.addr for e in getattr(self,io_str)]
if len(addrs) != len(set(addrs)):
die(2,f'{addrs}: duplicate address in transaction {io_str}')
# given tx size and absolute fee or fee spec, return absolute fee
# relative fee is N+<first letter of unit name>
def feespec2abs(self,tx_fee,tx_size):
fee = get_obj(self.proto.coin_amt,num=tx_fee,silent=True)
if fee:
return fee
else:
import re
units = {u[0]:u for u in self.proto.coin_amt.units}
pat = re.compile(r'([1-9][0-9]*)({})'.format('|'.join(units)))
if pat.match(tx_fee):
amt,unit = pat.match(tx_fee).groups()
return self.fee_rel2abs(tx_size,units,int(amt),unit)
return False
def get_usr_fee_interactive(self,tx_fee=None,desc='Starting'):
abs_fee = None
while True:
if tx_fee:
abs_fee = self.convert_and_check_fee(tx_fee,desc)
if abs_fee:
prompt = '{} TX fee{}: {}{} {} ({} {})\n'.format(
desc,
(f' (after {opt.tx_fee_adj:.2f}X adjustment)'
if opt.tx_fee_adj != 1 and desc.startswith('Network-estimated')
else ''),
('','')[self.fee_is_approximate],
abs_fee.hl(),
self.coin,
pink(str(self.fee_abs2rel(abs_fee))),
self.rel_fee_disp)
if opt.yes or keypress_confirm(prompt+'OK?',default_yes=True):
if opt.yes:
msg(prompt)
return abs_fee
tx_fee = line_input(self.usr_fee_prompt)
desc = 'User-selected'
# we don't know fee yet, so perform preliminary check with fee == 0
async def precheck_sufficient_funds(self,inputs_sum,sel_unspent,outputs_sum):
if self.twuo.total < outputs_sum:
msg(self.msg_wallet_low_coin.format(outputs_sum-inputs_sum,self.dcoin))
return False
if inputs_sum < outputs_sum:
msg(self.msg_low_coin.format(outputs_sum-inputs_sum,self.dcoin))
return False
return True
async def get_fee_from_user(self,have_estimate_fail=[]):
if opt.tx_fee:
desc = 'User-selected'
start_fee = opt.tx_fee
else:
desc = 'Network-estimated ({}, {} conf{})'.format(
opt.fee_estimate_mode.upper(),
pink(str(opt.tx_confs)),
suf(opt.tx_confs) )
fee_per_kb,fe_type = await self.get_rel_fee_from_network()
if fee_per_kb < 0:
if not have_estimate_fail:
msg(self.fee_fail_fs.format(c=opt.tx_confs,t=fe_type))
have_estimate_fail.append(True)
start_fee = None
else:
start_fee = self.fee_est2abs(fee_per_kb,fe_type)
return self.get_usr_fee_interactive(start_fee,desc=desc)
def add_output(self,coinaddr,amt,is_chg=None):
self.outputs.append(self.Output(self.proto,addr=coinaddr,amt=amt,is_chg=is_chg))
def process_cmd_arg(self,arg,ad_f,ad_w):
def add_output_chk(addr,amt,err_desc):
if not amt and self.get_chg_output_idx() != None:
die(2,'ERROR: More than one change address listed on command line')
if is_mmgen_id(self.proto,addr) or is_coin_addr(self.proto,addr):
coin_addr = ( mmaddr2coinaddr(addr,ad_w,ad_f,self.proto) if is_mmgen_id(self.proto,addr)
else CoinAddr(self.proto,addr) )
self.add_output(coin_addr,self.proto.coin_amt(amt or '0'),is_chg=not amt)
else:
die(2,f'{addr}: invalid {err_desc} {{!r}}'.format(f'{addr},{amt}' if amt else addr))
if ',' in arg:
addr,amt = arg.split(',',1)
add_output_chk(addr,amt,'coin argument in command-line argument')
else:
add_output_chk(arg,None,'command-line argument')
def process_cmd_args(self,cmd_args,ad_f,ad_w):
for a in cmd_args:
self.process_cmd_arg(a,ad_f,ad_w)
if self.get_chg_output_idx() == None:
die(2,(
fmt( self.msg_no_change_output.format(self.dcoin) ).strip()
if len(self.outputs) == 1 else
'ERROR: No change output specified' ))
if self.has_segwit_outputs() and not self.rpc.info('segwit_is_active'):
rdie(2,f'{g.proj_name} Segwit address requested on the command line, '
+ 'but Segwit is not active on this chain')
if not self.outputs:
die(2,'At least one output must be specified on the command line')
async def get_outputs_from_cmdline(self,cmd_args):
from ..addrdata import AddrData,TwAddrData
from ..addrlist import AddrList
from ..addrfile import AddrFile
addrfiles = remove_dups(
tuple(a for a in cmd_args if get_extension(a) == AddrFile.ext),
desc = 'command line',
edesc = 'argument',
)
cmd_args = remove_dups(
tuple(a for a in cmd_args if a not in addrfiles),
desc = 'command line',
edesc = 'argument',
)
ad_f = AddrData(self.proto)
from ..fileutil import check_infile
for a in addrfiles:
check_infile(a)
ad_f.add(AddrList(self.proto,a))
ad_w = await TwAddrData(self.proto,wallet=self.tw)
self.process_cmd_args(cmd_args,ad_f,ad_w)
self.add_mmaddrs_to_outputs(ad_w,ad_f)
self.check_dup_addrs('outputs')
# inputs methods
def select_unspent(self,unspent):
prompt = 'Enter a range or space-separated list of outputs to spend: '
while True:
reply = line_input(prompt).strip()
if reply:
from ..addrlist import AddrIdxList
selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()) )
if selected:
if selected[-1] <= len(unspent):
return selected
msg(f'Unspent output number must be <= {len(unspent)}')
def select_unspent_cmdline(self,unspent):
def idx2num(idx):
uo = unspent[idx]
mmid_disp = f' ({uo.twmmid})' if uo.twmmid.type == 'mmgen' else ''
msg(f'Adding input: {idx + 1} {uo.addr}{mmid_disp}')
return idx + 1
def get_uo_nums():
for addr in opt.inputs.split(','):
if is_mmgen_id(self.proto,addr):
attr = 'twmmid'
elif is_coin_addr(self.proto,addr):
attr = 'addr'
else:
die(1,f'{addr!r}: not an MMGen ID or {self.coin} address')
found = False
for idx in range(len(unspent)):
if getattr(unspent[idx],attr) == addr:
yield idx2num(idx)
found = True
if not found:
die(1,f'{addr!r}: address not found in tracking wallet')
return set(get_uo_nums()) # silently discard duplicates
def copy_inputs_from_tw(self,tw_unspent_data):
def gen_inputs():
for d in tw_unspent_data:
i = self.Input(
self.proto,
**{attr:getattr(d,attr) for attr in d.__dict__ if attr in self.Input.tw_copy_attrs} )
if d.twmmid.type == 'mmgen':
i.mmid = d.twmmid # twmmid -> mmid
yield i
self.inputs = type(self.inputs)(self,list(gen_inputs()))
def warn_insufficient_funds(self,funds_left):
msg(self.msg_low_coin.format(self.proto.coin_amt(-funds_left).hl(),self.coin))
async def get_funds_left(self,fee,outputs_sum):
return self.sum_inputs() - outputs_sum - fee
async def get_inputs_from_user(self,outputs_sum):
while True:
us_f = self.select_unspent_cmdline if opt.inputs else self.select_unspent
sel_nums = us_f(self.twuo.unspent)
msg(f'Selected output{suf(sel_nums)}: {{}}'.format(' '.join(str(n) for n in sel_nums)))
sel_unspent = self.twuo.MMGenTwOutputList([self.twuo.unspent[i-1] for i in sel_nums])
inputs_sum = sum(s.amt for s in sel_unspent)
if not await self.precheck_sufficient_funds(inputs_sum,sel_unspent,outputs_sum):
continue
self.copy_inputs_from_tw(sel_unspent) # makes self.inputs
self.usr_fee = await self.get_fee_from_user()
funds_left = await self.get_funds_left(self.usr_fee,outputs_sum)
if funds_left >= 0:
p = self.final_inputs_ok_msg(funds_left)
if opt.yes or keypress_confirm(p+'. OK?',default_yes=True):
if opt.yes:
msg(p)
return funds_left
else:
self.warn_insufficient_funds(funds_left)
async def create(self,cmd_args,locktime=None,do_info=False,caller='txcreate'):
assert isinstance( locktime, (int,type(None)) ), 'locktime must be of type int'
from ..twuo import TwUnspentOutputs
if opt.comment_file:
self.add_comment(opt.comment_file)
twuo_addrs = await self.get_cmdline_input_addrs()
self.twuo = await TwUnspentOutputs(self.proto,minconf=opt.minconf,addrs=twuo_addrs)
await self.twuo.get_unspent_data()
if not do_info:
await self.get_outputs_from_cmdline(cmd_args)
do_license_msg()
if not opt.inputs:
await self.twuo.view_and_sort(self)
self.twuo.display_total()
if do_info:
del self.twuo.wallet
sys.exit(0)
outputs_sum = self.sum_outputs()
msg('Total amount to spend: {}'.format(
f'{outputs_sum.hl()} {self.dcoin}' if outputs_sum else 'Unknown'
))
funds_left = await self.get_inputs_from_user(outputs_sum)
self.check_non_mmgen_inputs(caller)
self.update_change_output(funds_left)
if not opt.yes:
self.add_comment() # edits an existing comment
await self.create_serialized(locktime=locktime) # creates self.txid too
self.add_timestamp()
self.add_blockcount()
self.chain = self.proto.chain_name
self.check_fee()
qmsg('Transaction successfully created')
from . import UnsignedTX
new = UnsignedTX(data=self.__dict__)
if not opt.yes:
new.info.view_with_prompt('View transaction details?')
del new.twuo.wallet
return new

31
mmgen/tx/online.py Executable file
View file

@ -0,0 +1,31 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
tx.online: online signed transaction class
"""
from .signed import Signed
from ..util import msg,confirm_or_raise
from ..opts import opt
class OnlineSigned(Signed):
@property
def status(self):
from . import _base_proto_subclass
return _base_proto_subclass('Status','status',self.proto)(self)
def confirm_send(self):
confirm_or_raise(
('' if opt.quiet else "Once this transaction is sent, there's no taking it back!"),
f'broadcast this transaction to the {self.proto.coin} {self.proto.network.upper()} network',
('YES' if opt.quiet or opt.yes else 'YES, I REALLY WANT TO DO THIS') )
msg('Sending transaction')

20
mmgen/tx/signed.py Executable file
View file

@ -0,0 +1,20 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
tx.signed: signed transaction class
"""
from .completed import Completed
class Signed(Completed):
desc = 'signed transaction'
ext = 'sigtx'
signed = True

18
mmgen/tx/status.py Executable file
View file

@ -0,0 +1,18 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
tx.status: transaction status class
"""
class Status:
def __init__(self,parent_tx):
self.tx = parent_tx

30
mmgen/tx/unsigned.py Executable file
View file

@ -0,0 +1,30 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
tx.unsigned: unsigned transaction class
"""
from .completed import Completed
from ..util import remove_dups
class Unsigned(Completed):
desc = 'unsigned transaction'
ext = 'rawtx'
def delete_attrs(self,desc,attr):
for e in getattr(self,desc):
if hasattr(e,attr):
delattr(e,attr)
def get_sids(self,desc):
return remove_dups(
(e.mmid.sid for e in getattr(self,desc) if e.mmid),
quiet = True )

View file

@ -21,10 +21,9 @@ txfile.py: Transaction file operations for the MMGen suite
"""
from .common import *
from .obj import HexStr,MMGenTxID,CoinTxID,MMGenTxLabel
from .tx import MMGenTxOutput,MMGenTxOutputList,MMGenTxInput,MMGenTxInputList
from .obj import MMGenObject,HexStr,MMGenTxID,CoinTxID,MMGenTxLabel
class MMGenTxFile:
class MMGenTxFile(MMGenObject):
def __init__(self,tx):
self.tx = tx
@ -50,8 +49,8 @@ class MMGenTxFile:
for e in d:
e['amt'] = tx.proto.coin_amt(e['amt'])
io,io_list = {
'inputs': (MMGenTxInput,MMGenTxInputList),
'outputs': (MMGenTxOutput,MMGenTxOutputList),
'inputs': ( tx.Input, tx.InputList ),
'outputs': ( tx.Output, tx.OutputList ),
}[desc]
return io_list( parent=tx, data=[io(tx.proto,**e) for e in d] )
@ -124,7 +123,7 @@ class MMGenTxFile:
desc = 'transaction file hex data'
tx.check_txfile_hex_data()
desc = 'Ethereum RLP or JSON data'
tx.parse_txfile_hex_data()
tx.parse_txfile_serialized_data()
desc = 'inputs data'
tx.inputs = eval_io_data(inputs_data,'inputs')
desc = 'outputs data'
@ -144,8 +143,8 @@ class MMGenTxFile:
yield f'[{tx.send_amt!s}'
if tx.is_replaceable():
yield ',{}'.format(tx.fee_abs2rel(tx.fee,to_unit=tx.fn_fee_unit))
if tx.get_hex_locktime():
yield ',tl={}'.format(tx.get_hex_locktime())
if tx.get_serialized_locktime():
yield ',tl={}'.format(tx.get_serialized_locktime())
yield ']'
if g.debug_utf8:
yield ''
@ -171,7 +170,7 @@ class MMGenTxFile:
tx.blockcount,
(f' LT={tx.locktime}' if tx.locktime else ''),
),
tx.hex,
tx.serialized,
ascii([amt_to_str(e._asdict()) for e in tx.inputs]),
ascii([amt_to_str(e._asdict()) for e in tx.outputs])
]
@ -220,7 +219,7 @@ class MMGenTxFile:
@classmethod
def get_proto(cls,filename,quiet_open=False):
from .tx import MMGenTX
tmp_tx = MMGenTX.Base()
from .tx import BaseTX
tmp_tx = BaseTX()
cls(tmp_tx).parse(filename,metadata_only=True,quiet_open=quiet_open)
return tmp_tx.proto

View file

@ -25,7 +25,6 @@ from .obj import MMGenList
from .addr import MMGenAddrType
from .addrlist import AddrIdxList,KeyAddrList
from .wallet import Wallet,WalletUnenc,WalletEnc,MMGenWallet
from .tx import MMGenTX
saved_seeds = {}
@ -110,7 +109,8 @@ def _pop_and_return(args,cmplist): # strips found args
return list(reversed([args.pop(args.index(a)) for a in reversed(args) if get_extension(a) in cmplist]))
def get_tx_files(opt,args):
ret = _pop_and_return(args,[MMGenTX.Unsigned.ext])
from .tx.unsigned import Unsigned
ret = _pop_and_return(args,[Unsigned.ext])
if not ret:
die(1,'You must specify a raw transaction file!')
return ret

View file

@ -24,10 +24,11 @@ opts_data = {
cmd_args = opts.init(opts_data)
from mmgen.tx import *
import asyncio
from mmgen.tx import CompletedTX
if len(cmd_args) != 1:
opts.usage()
tx = MMGenTX(cmd_args[0],quiet_open=True)
tx.write_to_file(ask_tty=False,ask_overwrite=not opt.quiet,ask_write=not opt.quiet)
tx = asyncio.run(CompletedTX(cmd_args[0],quiet_open=True))
tx.file.write(ask_tty=False,ask_overwrite=not opt.quiet,ask_write=not opt.quiet)

View file

@ -42,6 +42,8 @@ install_requires =
packages =
mmgen
mmgen.base_proto
mmgen.base_proto.bitcoin
mmgen.base_proto.bitcoin.tx
mmgen.base_proto.ethereum
mmgen.base_proto.ethereum.pyethereum
mmgen.base_proto.ethereum.rlp
@ -50,6 +52,7 @@ packages =
mmgen.proto
mmgen.share
mmgen.tool
mmgen.tx
scripts =
cmds/mmgen-addrgen

View file

@ -132,12 +132,12 @@ def tt_urand():
line_input('Press ENTER to continue: ')
def tt_txview():
cmsg('Testing tx.view_with_prompt() (try each viewing option)')
from mmgen.tx import MMGenTX
cmsg('Testing tx.info.view_with_prompt() (try each viewing option)')
from mmgen.tx import UnsignedTX
fn = 'test/ref/0B8D5A[15.31789,14,tl=1320969600].rawtx'
tx = MMGenTX.Unsigned(filename=fn,quiet_open=True)
tx = UnsignedTX(filename=fn,quiet_open=True)
while True:
tx.view_with_prompt('View data for transaction?',pause=False)
tx.info.view_with_prompt('View data for transaction?',pause=False)
set_vt100()
if not keypress_confirm('Continue testing transaction view?',default_yes=True):
break

View file

@ -34,6 +34,7 @@ from test.objattrtest_py_d.oat_common import *
from mmgen.common import *
from mmgen.addrlist import *
from mmgen.passwdlist import *
from mmgen.tx.base import Base
opts_data = {
'sets': [

View file

@ -132,7 +132,7 @@ tests = {
},
),
# tx.py
'MMGenTxInput': atd({
'Base.Input': atd({
'vout': (0b001, int),
'amt': (0b001, BTCAmt),
'label': (0b101, TwComment),
@ -147,7 +147,7 @@ tests = {
(proto,),
{ 'amt':BTCAmt('0.01'), 'addr':sample_objs['CoinAddr'] },
),
'MMGenTxOutput': atd({
'Base.Output': atd({
'vout': (0b001, int),
'amt': (0b001, BTCAmt),
'label': (0b101, TwComment),

View file

@ -38,14 +38,18 @@ def overlay_setup(repo_root):
for d in (
'mmgen',
'mmgen.base_proto',
'mmgen.base_proto.bitcoin',
'mmgen.base_proto.bitcoin.tx',
'mmgen.base_proto.ethereum',
'mmgen.base_proto.ethereum.pyethereum',
'mmgen.base_proto.ethereum.rlp',
'mmgen.base_proto.ethereum.rlp.sedes',
'mmgen.base_proto.ethereum.tx',
'mmgen.data',
'mmgen.proto',
'mmgen.share',
'mmgen.tool' ):
'mmgen.tool',
'mmgen.tx' ):
process_srcdir(d)
return overlay_dir

View file

@ -771,8 +771,8 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
return self.token_compile(token_data)
async def get_tx_receipt(self,txid):
from mmgen.tx import MMGenTX
tx = MMGenTX.New(proto=self.proto)
from mmgen.tx import NewTX
tx = await NewTX(proto=self.proto)
from mmgen.rpc import rpc_init
tx.rpc = await rpc_init(self.proto)
res = await tx.get_receipt(txid)

View file

@ -5,7 +5,7 @@ test.unit_tests_d.ut_tx: TX unit test for the MMGen suite
import re
from mmgen.common import *
from mmgen.tx import MMGenTX
from mmgen.tx import NewTX,UnsignedTX
from mmgen.txfile import MMGenTxFile
from mmgen.rpc import rpc_init
from mmgen.daemon import CoinDaemon
@ -20,7 +20,7 @@ class unit_tests:
async def do():
proto = init_proto('btc',need_amt=True)
tx = MMGenTX.New(proto=proto)
tx = await NewTX(proto=proto)
tx.rpc = await rpc_init(proto=proto)
run_session(do())
@ -41,7 +41,7 @@ class unit_tests:
for fn in fns:
vmsg(f' parsing: {fn}')
fpath = os.path.join('test','ref',fn)
tx = MMGenTX.Unsigned(filename=fpath,quiet_open=True)
tx = UnsignedTX(filename=fpath,quiet_open=True)
f = MMGenTxFile(tx)
fn_gen = f.make_filename()

View file

@ -8,7 +8,8 @@ import os,json
from mmgen.common import *
from ..include.common import *
from mmgen.protocol import init_proto
from mmgen.tx import MMGenTX,DeserializedTX
from mmgen.tx import UnsignedTX
from mmgen.base_proto.bitcoin.tx.base import DeserializeTX
from mmgen.rpc import rpc_init
from mmgen.daemon import CoinDaemon
@ -35,7 +36,7 @@ class unit_test(object):
if has_nonstandard_outputs(d['vout']): return False
dt = DeserializedTX(tx_proto,tx_hex)
dt = DeserializeTX(tx_proto,tx_hex)
if opt.verbose:
Msg('\n====================================================')
@ -46,12 +47,12 @@ class unit_test(object):
Pmsg(dt)
# metadata
assert dt['txid'] == d['txid'],'TXID does not match'
assert dt['lock_time'] == d['locktime'],'Locktime does not match'
assert dt['version'] == d['version'],'Version does not match'
assert dt.txid == d['txid'],'TXID does not match'
assert dt.locktime == d['locktime'],'Locktime does not match'
assert dt.version == d['version'],'Version does not match'
# inputs
a,b = d['vin'],dt['txins']
a,b = d['vin'],dt.txins
for i in range(len(a)):
assert a[i]['txid'] == b[i]['txid'],f'TxID of input {i} does not match'
assert a[i]['vout'] == b[i]['vout'],f'vout of input {i} does not match'
@ -62,7 +63,7 @@ class unit_test(object):
f'witness of input {i} does not match')
# outputs
a,b = d['vout'],dt['txouts']
a,b = d['vout'],dt.txouts
for i in range(len(a)):
if 'addresses' in a[i]['scriptPubKey']:
A = a[i]['scriptPubKey']['addresses'][0]
@ -121,10 +122,10 @@ class unit_test(object):
)
print_info('test/ref/*rawtx','MMGen reference')
for n,(coin,testnet,fn) in enumerate(fns):
tx = MMGenTX.Unsigned(filename=fn)
tx = UnsignedTX(filename=fn)
await test_tx(
tx_proto = tx.proto,
tx_hex = tx.hex,
tx_hex = tx.serialized,
desc = fn,
n = n+1 )
Msg('OK')