bitcoin.py -> protocol.py: CoinProtocol classes and methods

scripts/test-release.sh: BCH tests added
This commit is contained in:
philemon 2017-10-03 22:26:24 +03:00
commit d3f0f26668
Signed by untrusted user who does not match committer: mmgen
GPG key ID: 62DBE9E5212F05BE
19 changed files with 491 additions and 399 deletions

View file

@ -35,8 +35,8 @@
# Uncomment to override 'rpcpassword' in bitcoin.conf
# rpc_password mypassword
# Uncomment to set the Bitcoin datadir
# bitcoin_data_dir /path/to/datadir
# Uncomment to set the coin daemon datadir
# daemon_data_dir /path/to/datadir
# Set the default hash preset:
# hash_preset 3

View file

@ -36,29 +36,28 @@ class AddrGenerator(MMGenObject):
assert atype in d
return super(cls,cls).__new__(d[atype])
class AddrGeneratorP2PKH(MMGenObject):
class AddrGeneratorP2PKH(AddrGenerator):
desc = 'p2pkh'
def to_addr(self,pubhex):
from mmgen.protocol import hash160
assert type(pubhex) == PubKey
from mmgen.bitcoin import hexaddr2addr,hash160
return BTCAddr(hexaddr2addr(hash160(pubhex)))
return CoinAddr(g.proto.hexaddr2addr(hash160(pubhex)))
def to_segwit_redeem_script(self,pubhex):
raise NotImplemented
raise NotImplementedError
class AddrGeneratorSegwit(MMGenObject):
class AddrGeneratorSegwit(AddrGenerator):
desc = 'segwit'
def to_addr(self,pubhex):
assert pubhex.compressed
from mmgen.bitcoin import pubhex2segwitaddr
return BTCAddr(pubhex2segwitaddr(pubhex))
return CoinAddr(g.proto.pubhex2segwitaddr(pubhex))
def to_segwit_redeem_script(self,pubhex):
assert pubhex.compressed
from mmgen.bitcoin import pubhex2redeem_script
return HexStr(pubhex2redeem_script(pubhex))
return HexStr(g.proto.pubhex2redeem_script(pubhex))
class KeyGenerator(MMGenObject):
def __new__(cls,generator=None,silent=False):
if cls.test_for_secp256k1(silent=silent) and generator != 1:
if not opt.key_generator or opt.key_generator == 2 or generator == 2:
@ -76,12 +75,41 @@ class KeyGenerator(MMGenObject):
except:
return False
import ecdsa
class KeyGeneratorPython(KeyGenerator):
# From electrum:
# secp256k1, http://www.oid-info.com/get/1.3.132.0.10
_p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2FL
_r = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141L
_b = 0x0000000000000000000000000000000000000000000000000000000000000007L
_a = 0x0000000000000000000000000000000000000000000000000000000000000000L
_Gx = 0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798L
_Gy = 0x483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8L
_curve_secp256k1 = ecdsa.ellipticcurve.CurveFp(_p,_a,_b)
_generator_secp256k1 = ecdsa.ellipticcurve.Point(_curve_secp256k1,_Gx,_Gy,_r)
_oid_secp256k1 = (1,3,132,0,10)
_secp256k1 = ecdsa.curves.Curve('secp256k1',_curve_secp256k1,_generator_secp256k1,_oid_secp256k1)
# devdoc/guide_wallets.md:
# Uncompressed public keys start with 0x04; compressed public keys begin with
# 0x03 or 0x02 depending on whether they're greater or less than the midpoint
# of the curve.
def privnum2pubhex(self,numpriv,compressed=False):
pko = ecdsa.SigningKey.from_secret_exponent(numpriv,self._secp256k1)
# pubkey = 32-byte X coord + 32-byte Y coord (unsigned big-endian)
pubkey = hexlify(pko.get_verifying_key().to_string())
if compressed: # discard Y coord, replace with appropriate version byte
# even Y: <0, odd Y: >0 -- https://bitcointalk.org/index.php?topic=129652.0
p = ('03','02')[pubkey[-1] in '02468ace']
return p+pubkey[:64]
else:
return '04'+pubkey
desc = 'python-ecdsa'
def to_pubhex(self,privhex):
assert type(privhex) == PrivKey
from mmgen.bitcoin import privnum2pubhex
return PubKey(privnum2pubhex(int(privhex,16),compressed=privhex.compressed),compressed=privhex.compressed)
return PubKey(self.privnum2pubhex(
int(privhex,16),compressed=privhex.compressed),compressed=privhex.compressed)
class KeyGeneratorSecp256k1(KeyGenerator):
desc = 'secp256k1'
@ -91,7 +119,7 @@ class KeyGeneratorSecp256k1(KeyGenerator):
return PubKey(hexlify(priv2pub(unhexlify(privhex),int(privhex.compressed))),compressed=privhex.compressed)
class AddrListEntry(MMGenListItem):
addr = MMGenListItemAttr('addr','BTCAddr')
addr = MMGenListItemAttr('addr','CoinAddr')
idx = MMGenListItemAttr('idx','AddrIdx') # not present in flat addrlists
label = MMGenListItemAttr('label','TwComment',reassign_ok=True)
sec = MMGenListItemAttr('sec',PrivKey,typeconv=False)
@ -306,7 +334,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
def addrpairs(self):
return [(e.idx,e.addr) for e in self.data]
def btcaddrs(self):
def coinaddrs(self):
return [e.addr for e in self.data]
def comments(self):
@ -316,7 +344,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
for e in self.data:
if idx == e.idx: return e
def btcaddr(self,idx):
def coinaddr(self,idx):
for e in self.data:
if idx == e.idx: return e.addr
@ -329,8 +357,8 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
if idx == e.idx:
e.label = comment
def make_reverse_dict(self,btcaddrs):
d,b = MMGenDict(),btcaddrs
def make_reverse_dict(self,coinaddrs):
d,b = MMGenDict(),coinaddrs
for e in self.data:
try:
d[b[b.index(e.addr)]] = MMGenID('{}:{}'.format(self.al_id,e.idx)),e.label
@ -671,15 +699,15 @@ re-import your addresses.
if al_id in self.al_ids:
return self.al_ids[al_id]
def mmaddr2btcaddr(self,mmaddr):
def mmaddr2coinaddr(self,mmaddr):
al_id,idx = MMGenID(mmaddr).rsplit(':',1)
btcaddr = ''
coinaddr = ''
if al_id in self.al_ids:
btcaddr = self.addrlist(al_id).btcaddr(int(idx))
return btcaddr or None
coinaddr = self.addrlist(al_id).coinaddr(int(idx))
return coinaddr or None
def btcaddr2mmaddr(self,btcaddr):
d = self.make_reverse_dict([btcaddr])
def coinaddr2mmaddr(self,coinaddr):
d = self.make_reverse_dict([coinaddr])
return (d.values()[0][0]) if d else None
def add_tw_data(self):
@ -710,8 +738,8 @@ re-import your addresses.
else:
raise TypeError, 'Error: object %s is not of type AddrList' % repr(addrlist)
def make_reverse_dict(self,btcaddrs):
def make_reverse_dict(self,coinaddrs):
d = MMGenDict()
for al_id in self.al_ids:
d.update(self.al_ids[al_id].make_reverse_dict(btcaddrs))
d.update(self.al_ids[al_id].make_reverse_dict(coinaddrs))
return d

View file

@ -1,150 +0,0 @@
#!/usr/bin/env python
#
# MMGen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2017 Philemon <mmgen-py@yandex.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
bitcoin.py: Bitcoin address/key conversion functions
"""
import ecdsa
from binascii import hexlify, unhexlify
from hashlib import sha256
from hashlib import new as hashlib_new
import sys
# From electrum:
# secp256k1, http://www.oid-info.com/get/1.3.132.0.10
_p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2FL
_r = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141L
_b = 0x0000000000000000000000000000000000000000000000000000000000000007L
_a = 0x0000000000000000000000000000000000000000000000000000000000000000L
_Gx = 0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798L
_Gy = 0x483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8L
_curve_secp256k1 = ecdsa.ellipticcurve.CurveFp(_p,_a,_b)
_generator_secp256k1 = ecdsa.ellipticcurve.Point(_curve_secp256k1,_Gx,_Gy,_r)
_oid_secp256k1 = (1,3,132,0,10)
_secp256k1 = ecdsa.curves.Curve('secp256k1',_curve_secp256k1,_generator_secp256k1,_oid_secp256k1)
# From en.bitcoin.it:
# The Base58 encoding used is home made, and has some differences.
# Especially, leading zeroes are kept as single zeroes when conversion happens.
# Test: 5JbQQTs3cnoYN9vDYaGY6nhQ1DggVsY4FJNBUfEfpSQqrEp3srk
# The 'zero address':
# 1111111111111111111114oLvT2 (pubkeyhash = '\0'*20)
_b58a='123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
def _numtob58(num):
ret = []
while num:
ret.append(_b58a[num % 58])
num /= 58
return ''.join(ret)[::-1]
def _b58tonum(b58num):
b58num = b58num.strip()
for i in b58num:
if not i in _b58a: return False
return sum(_b58a.index(n) * (58**i) for i,n in enumerate(list(b58num[::-1])))
def hash160(hexnum): # take hex, return hex - OP_HASH160
return hashlib_new('ripemd160',sha256(unhexlify(hexnum)).digest()).hexdigest()
def hash256(hexnum): # take hex, return hex - OP_HASH256
return sha256(sha256(unhexlify(hexnum)).digest()).hexdigest()
# devdoc/ref_transactions.md:
btc_addr_ver_nums = {
'p2pkh': { 'mainnet': ('00','1'), 'testnet': ('6f','mn') },
'p2sh': { 'mainnet': ('05','3'), 'testnet': ('c4','2') }
}
btc_addr_pfxs = { 'mainnet': '13', 'testnet': 'mn2', 'regtest': 'mn2' }
btc_uncompressed_wif_pfxs = { 'mainnet':'5','testnet':'9' }
btc_privkey_pfxs = { 'mainnet':'80','testnet':'ef' }
from mmgen.globalvars import g
def verify_addr(addr,verbose=False,return_dict=False,testnet=None):
testnet = testnet if testnet != None else g.testnet # allow override
for addr_fmt in ('p2pkh','p2sh'):
for net in ('mainnet','testnet'):
ver_num,ldigit = btc_addr_ver_nums[addr_fmt][net]
if addr[0] not in ldigit: continue
num = _b58tonum(addr)
if num == False: break
addr_hex = '{:050x}'.format(num)
if addr_hex[:2] != ver_num: continue
if hash256(addr_hex[:42])[:8] == addr_hex[42:]:
return {'hex':addr_hex[2:42],'format':addr_fmt,'net':net} if return_dict else True
else:
if verbose: Msg("Invalid checksum in address '{}'".format(addr))
break
if verbose: Msg("Invalid address '{}'".format(addr))
return False
def hexaddr2addr(hexaddr,p2sh=False,testnet=None):
testnet = testnet if testnet != None else g.testnet # allow override
s = btc_addr_ver_nums[('p2pkh','p2sh')[p2sh]][('mainnet','testnet')[testnet]][0] + hexaddr
lzeroes = (len(s) - len(s.lstrip('0'))) / 2
return ('1' * lzeroes) + _numtob58(int(s+hash256(s)[:8],16))
def wif2hex(wif,testnet=None):
testnet = testnet if testnet != None else g.testnet # allow override
num = _b58tonum(wif)
if num == False: return False
key = '{:x}'.format(num)
compressed = wif[0] != btc_uncompressed_wif_pfxs[('mainnet','testnet')[testnet]]
klen = (66,68)[bool(compressed)]
if compressed and key[66:68] != '01': return False
if (key[:2] == btc_privkey_pfxs[('mainnet','testnet')[testnet]] and key[klen:] == hash256(key[:klen])[:8]):
return {'hex':key[2:66],'compressed':compressed,'testnet':testnet}
else:
return False
def hex2wif(hexpriv,compressed=False,testnet=None):
testnet = testnet if testnet != None else g.testnet # allow override
s = btc_privkey_pfxs[('mainnet','testnet')[testnet]] + hexpriv + ('','01')[bool(compressed)]
return _numtob58(int(s+hash256(s)[:8],16))
# devdoc/guide_wallets.md:
# Uncompressed public keys start with 0x04; compressed public keys begin with
# 0x03 or 0x02 depending on whether they're greater or less than the midpoint
# of the curve.
def privnum2pubhex(numpriv,compressed=False):
pko = ecdsa.SigningKey.from_secret_exponent(numpriv,_secp256k1)
# pubkey = 32-byte X coord + 32-byte Y coord (unsigned big-endian)
pubkey = hexlify(pko.get_verifying_key().to_string())
if compressed: # discard Y coord, replace with appropriate version byte
# even Y: <0, odd Y: >0 -- https://bitcointalk.org/index.php?topic=129652.0
p = ('03','02')[pubkey[-1] in '02468ace']
return p+pubkey[:64]
else:
return '04'+pubkey
def privnum2addr(numpriv,compressed=False,segwit=False): # used only by tool and testsuite
pubhex = privnum2pubhex(numpriv,compressed)
return pubhex2segwitaddr(pubhex) if segwit else hexaddr2addr(hash160(pubhex))
# Segwit:
def pubhex2redeem_script(pubhex):
# https://bitcoincore.org/en/segwit_wallet_dev/
# The P2SH redeemScript is always 22 bytes. It starts with a OP_0, followed
# by a canonical push of the keyhash (i.e. 0x0014{20-byte keyhash})
return '0014' + hash160(pubhex)
def pubhex2segwitaddr(pubhex):
return hexaddr2addr(hash160(pubhex2redeem_script(pubhex)),p2sh=True)

View file

@ -38,7 +38,7 @@ class g(object):
sys.exit(ev)
# Variables - these might be altered at runtime:
version = '0.9.3'
version = '0.9.399'
release_date = 'October 2017'
proj_name = 'MMGen'
@ -51,7 +51,6 @@ class g(object):
coin = 'BTC'
coins = 'BTC','BCH'
ports = { 'BTC': (8332,18332), 'BCH': (8442,18442) }
user_entropy = ''
hash_preset = '3'
@ -67,7 +66,7 @@ class g(object):
http_timeout = 60
max_int = 0xffffffff
# Constants - some of these might be overriden, but they don't change thereafter
# Constants - some of these might be overriden in opts.py, but they don't change thereafter
debug = False
quiet = False
@ -84,7 +83,6 @@ class g(object):
rpc_port = 0
rpc_user = ''
rpc_password = ''
testnet_name = 'testnet3'
bob = False
alice = False
@ -107,13 +105,12 @@ class g(object):
die(2,'$HOME is not set! Unable to determine home directory')
data_dir_root,data_dir,cfg_file = None,None,None
bitcoin_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin') if platform == 'win' \
else os.path.join(home_dir,'.bitcoin')
daemon_data_dir = '' # set by user or protocol
# User opt sets global var:
common_opts = (
'color','no_license','rpc_host','rpc_port','testnet','rpc_user','rpc_password',
'bitcoin_data_dir','force_256_color','regtest','coin','bob','alice'
'daemon_data_dir','force_256_color','regtest','coin','bob','alice'
)
required_opts = (
'quiet','verbose','debug','outdir','echo_passphrase','passwd_file','stdout',
@ -132,7 +129,7 @@ class g(object):
cfg_file_opts = (
'color','debug','hash_preset','http_timeout','no_license','rpc_host','rpc_port',
'quiet','tx_fee_adj','usr_randchars','testnet','rpc_user','rpc_password',
'bitcoin_data_dir','force_256_color','max_tx_fee','regtest'
'daemon_data_dir','force_256_color','max_tx_fee','regtest'
)
env_opts = (
'MMGEN_BOGUS_WALLET_DATA',

View file

@ -54,7 +54,7 @@ if not tx.marked_signed(c):
die(1,'Transaction is not signed!')
if opt.status:
if tx.btc_txid: qmsg('{} txid: {}'.format(g.coin,tx.btc_txid.hl()))
if tx.coin_txid: qmsg('{} txid: {}'.format(g.coin,tx.coin_txid.hl()))
tx.get_status(c,status=True)
sys.exit(0)

View file

@ -28,7 +28,7 @@ from string import hexdigits,ascii_letters,digits
def is_mmgen_seed_id(s): return SeedID(sid=s,on_fail='silent')
def is_mmgen_idx(s): return AddrIdx(s,on_fail='silent')
def is_mmgen_id(s): return MMGenID(s,on_fail='silent')
def is_btc_addr(s): return BTCAddr(s,on_fail='silent')
def is_coin_addr(s): return CoinAddr(s,on_fail='silent')
def is_addrlist_id(s): return AddrListID(s,on_fail='silent')
def is_tw_label(s): return TwLabel(s,on_fail='silent')
def is_wif(s): return WifKey(s,on_fail='silent')
@ -302,7 +302,7 @@ class BTCAmt(Decimal,Hilite,InitErrors):
@classmethod
def fmtc(cls):
raise NotImplemented
raise NotImplementedError
def fmt(self,fs='3.8',color=False,suf=''):
s = self.__str__(color=False)
@ -347,7 +347,7 @@ class BTCAmt(Decimal,Hilite,InitErrors):
def __neg__(self,other,context=None):
return type(self)(Decimal.__neg__(self,other,context))
class BTCAddr(str,Hilite,InitErrors,MMGenObject):
class CoinAddr(str,Hilite,InitErrors,MMGenObject):
color = 'cyan'
width = 35 # max len of testnet p2sh addr
def __new__(cls,s,on_fail='die'):
@ -356,12 +356,11 @@ class BTCAddr(str,Hilite,InitErrors,MMGenObject):
try:
assert set(s) <= set(ascii_letters+digits),'contains non-ascii characters'
me = str.__new__(cls,s)
from mmgen.bitcoin import verify_addr
va = verify_addr(s,return_dict=True)
from mmgen.globalvars import g
va = g.proto.verify_addr(s,return_dict=True)
assert va,'failed verification'
me.addr_fmt = va['format']
me.hex = va['hex']
me.testnet = va['net'] == 'testnet'
return me
except Exception as e:
m = "{!r}: value cannot be converted to Bitcoin address ({})"
@ -380,12 +379,15 @@ class BTCAddr(str,Hilite,InitErrors,MMGenObject):
def is_for_current_chain(self):
from mmgen.globalvars import g
assert g.chain,'global chain variable unset'
from bitcoin import btc_addr_pfxs
return self[0] in btc_addr_pfxs[g.chain]
return self[0] in g.proto.get_chain_protocol(g.chain).addr_pfxs
def is_mainnet(self):
from bitcoin import btc_addr_pfxs
return self[0] in btc_addr_pfxs['mainnet']
from mmgen.globalvars import g
return self[0] in g.proto.get_chain_protocol('mainnet').addr_pfxs
def is_testnet(self):
from mmgen.globalvars import g
return self[0] in g.proto.get_chain_protocol('testnet').addr_pfxs
def is_in_tracking_wallet(self):
from mmgen.rpc import rpc_connection
@ -420,6 +422,7 @@ class MMGenID(str,Hilite,InitErrors,MMGenObject):
trunc_ok = False
def __new__(cls,s,on_fail='die'):
cls.arg_chk(cls,on_fail)
from mmgen.globalvars import g
try:
ss = str(s).split(':')
assert len(ss) in (2,3),'not 2 or 3 colon-separated items'
@ -428,6 +431,7 @@ class MMGenID(str,Hilite,InitErrors,MMGenObject):
me.sid = SeedID(sid=ss[0],on_fail='raise')
me.idx = AddrIdx(ss[-1],on_fail='raise')
me.mmtype = t
assert t in g.proto.mmtypes,'{}: invalid address type for {}'.format(t,g.proto.__name__)
me.al_id = str.__new__(AddrListID,me.sid+':'+me.mmtype) # checks already done
me.sort_key = '{}:{}:{:0{w}}'.format(me.sid,me.mmtype,me.idx,w=me.idx.max_digits)
return me
@ -518,13 +522,13 @@ class BitcoinTxID(MMGenTxID):
class WifKey(str,Hilite,InitErrors):
width = 53
color = 'blue'
def __new__(cls,s,on_fail='die',testnet=None): # fall back to g.testnet
def __new__(cls,s,on_fail='die'):
if type(s) == cls: return s
cls.arg_chk(cls,on_fail)
try:
assert set(s) <= set(ascii_letters+digits),'not an ascii string'
from mmgen.bitcoin import wif2hex
if wif2hex(s,testnet=testnet):
from mmgen.globalvars import g
if g.proto.wif2hex(s):
return str.__new__(cls,s)
raise ValueError,'failed verification'
except Exception as e:
@ -552,7 +556,7 @@ class PrivKey(str,Hilite,InitErrors,MMGenObject):
wif = MMGenImmutableAttr('wif',WifKey,typeconv=False)
# initialize with (priv_bin,compressed), WIF or self
def __new__(cls,s=None,compressed=None,wif=None,on_fail='die',testnet=None): # default to g.testnet
def __new__(cls,s=None,compressed=None,wif=None,on_fail='die'):
if type(s) == cls: return s
assert wif or (s and type(compressed) == bool),'Incorrect args for PrivKey()'
@ -561,12 +565,11 @@ class PrivKey(str,Hilite,InitErrors,MMGenObject):
if wif:
try:
assert set(wif) <= set(ascii_letters+digits),'not an ascii string'
from mmgen.bitcoin import wif2hex
w2h = wif2hex(wif,testnet=testnet)
from mmgen.globalvars import g
w2h = g.proto.wif2hex(wif)
assert w2h,"wif2hex() failed for wif key {!r}".format(wif)
me = str.__new__(cls,w2h['hex'])
me.compressed = w2h['compressed']
me.testnet = w2h['testnet']
me.wif = str.__new__(WifKey,wif) # check has been done
return me
except Exception as e:
@ -578,16 +581,15 @@ class PrivKey(str,Hilite,InitErrors,MMGenObject):
assert len(s) == cls.width / 2,'Key length must be {}'.format(cls.width/2)
me = str.__new__(cls,hexlify(s))
me.compressed = compressed
me.wif = me.towif(testnet=testnet)
# me.testnet = testnet # leave uninitialized for now
me.wif = me.towif()
return me
except Exception as e:
fs = "Key={}\nCompressed={}\nValue pair cannot be converted to PrivKey ({})"
return cls.init_fail(fs.format(repr(s),compressed,e[0]),on_fail)
fs = "Key={!r}\nCompressed={}\nValue pair cannot be converted to PrivKey ({!r})"
return cls.init_fail(fs.format(s,compressed,e),on_fail)
def towif(self,testnet=None):
from mmgen.bitcoin import hex2wif
return WifKey(hex2wif(self,compressed=self.compressed),on_fail='raise',testnet=testnet)
def towif(self):
from mmgen.globalvars import g
return WifKey(g.proto.hex2wif(self,compressed=self.compressed),on_fail='raise')
class AddrListID(str,Hilite,InitErrors,MMGenObject):
width = 10
@ -660,7 +662,7 @@ class MMGenAddrType(str,Hilite,InitErrors,MMGenObject):
width = 1
trunc_ok = False
color = 'blue'
mmtypes = { # since 'name' is used to cook the seed, it must never change!
mmtypes = { # 'name' is used to cook the seed, so it must never change!
'L': { 'name':'legacy',
'comp':False,
'gen':'p2pkh',
@ -704,5 +706,9 @@ class MMGenAddrType(str,Hilite,InitErrors,MMGenObject):
class MMGenPasswordType(MMGenAddrType):
mmtypes = {
'P': {'name':'password','comp':False,'gen':None,'fmt':None,'desc':'Password generated from MMGen seed'}
'P': { 'name':'password',
'comp':False,
'gen':None,
'fmt':None,
'desc':'Password generated from MMGen seed'}
}

View file

@ -213,8 +213,14 @@ def init(opts_f,add_opts=[],opt_filter=None):
if g.regtest: g.testnet = True # These are equivalent for now
# Global vars are now final, including g.testnet, so we can set g.data_dir
g.data_dir=os.path.normpath(os.path.join(g.data_dir_root,('',g.testnet_name)[g.testnet]))
# g.testnet is set, so we can set g.proto
from mmgen.protocol import get_coin_protocol
g.proto = get_coin_protocol(g.coin,g.testnet)
if not g.daemon_data_dir: g.daemon_data_dir = g.proto.daemon_data_dir
# g.proto is set, so we can set g.data_dir
g.data_dir = os.path.normpath(os.path.join(g.data_dir_root,g.proto.data_subdir))
# If user opt is set, convert its type based on value in mmgen.globalvars (g)
# If unset, set it to default value in mmgen.globalvars (g)
@ -243,10 +249,11 @@ def init(opts_f,add_opts=[],opt_filter=None):
mmgen.share.Opts.parse_opts(sys.argv,opts_data,opt_filter=opt_filter)
if g.bob or g.alice:
g.testnet = True
g.proto = get_coin_protocol(g.coin,g.testnet)
g.data_dir = os.path.join(g.data_dir_root,'regtest',('alice','bob')[g.bob])
check_or_create_dir(g.data_dir)
import regtest as rt
g.testnet = True
g.rpc_host = 'localhost'
g.rpc_port = rt.rpc_port
g.rpc_user = rt.rpc_user

181
mmgen/protocol.py Executable file
View file

@ -0,0 +1,181 @@
#!/usr/bin/env python
#
# MMGen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2017 Philemon <mmgen-py@yandex.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
protocol.py: Coin protocol functions, classes and methods
"""
import os,hashlib
from binascii import unhexlify
from mmgen.util import msg,pmsg
def hash160(hexnum): # take hex, return hex - OP_HASH160
return hashlib.new('ripemd160',hashlib.sha256(unhexlify(hexnum)).digest()).hexdigest()
def hash256(hexnum): # take hex, return hex - OP_HASH256
return hashlib.sha256(hashlib.sha256(unhexlify(hexnum)).digest()).hexdigest()
# From en.bitcoin.it:
# The Base58 encoding used is home made, and has some differences.
# Especially, leading zeroes are kept as single zeroes when conversion happens.
# Test: 5JbQQTs3cnoYN9vDYaGY6nhQ1DggVsY4FJNBUfEfpSQqrEp3srk
# The 'zero address':
# 1111111111111111111114oLvT2 (pubkeyhash = '\0'*20)
_b58a='123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
def _numtob58(num):
ret = []
while num:
ret.append(_b58a[num % 58])
num /= 58
return ''.join(ret)[::-1]
def _b58tonum(b58num):
b58num = b58num.strip()
for i in b58num:
if not i in _b58a: return False
return sum(_b58a.index(n) * (58**i) for i,n in enumerate(list(b58num[::-1])))
def get_coin_protocol(coin,testnet):
coin = coin.lower()
coins = {
'btc': (BitcoinProtocol,BitcoinTestnetProtocol),
'bch': (BitcoinCashProtocol,BitcoinCashTestnetProtocol),
'ltc': (LitecoinProtocol,LitecoinTestnetProtocol),
'eth': (EthereumProtocol,EthereumTestnetProtocol),
}
assert type(testnet) == bool
assert coin in coins
return coins[coin][testnet]
from mmgen.obj import MMGenObject
from mmgen.globalvars import g
class BitcoinProtocol(MMGenObject):
# devdoc/ref_transactions.md:
addr_ver_num = { 'p2pkh': ('00','1'), 'p2sh': ('05','3') }
addr_pfxs = '13'
uncompressed_wif_pfx = '5'
privkey_pfx = '80'
mmtypes = ('L','C','S')
data_subdir = ''
rpc_port = 8332
daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin') if g.platform == 'win' \
else os.path.join(g.home_dir,'.bitcoin')
sighash_type = 'ALL'
block0 = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
forks = [
(478559,'00000000000000000019f112ec0a9982926f1258cdcc558dd7c3b7e5dc7fa148','bch')
]
@classmethod
def get_chain_protocol(cls,chain):
chain_protos = { 'mainnet':'', 'testnet':'Testnet', 'regtest':'Testnet' }
assert chain in chain_protos
return globals()['Bitcoin{}Protocol'.format(chain_protos[chain])]
@classmethod
def hex2wif(cls,hexpriv,compressed=False):
s = cls.privkey_pfx + hexpriv + ('','01')[bool(compressed)]
return _numtob58(int(s+hash256(s)[:8],16))
@classmethod
def wif2hex(cls,wif):
num = _b58tonum(wif)
if num == False: return False
key = '{:x}'.format(num)
compressed = wif[0] != cls.uncompressed_wif_pfx
klen = (66,68)[bool(compressed)]
if compressed and key[66:68] != '01': return False
if (key[:2] == cls.privkey_pfx and key[klen:] == hash256(key[:klen])[:8]):
return { 'hex':key[2:66], 'compressed':compressed }
else:
return False
@classmethod
def verify_addr(cls,addr,verbose=False,return_dict=False):
for addr_fmt in ('p2pkh','p2sh'):
ver_num,ldigit = cls.addr_ver_num[addr_fmt]
if addr[0] not in ldigit: continue
num = _b58tonum(addr)
if num == False: break
addr_hex = '{:050x}'.format(num)
if addr_hex[:2] != ver_num: continue
if hash256(addr_hex[:42])[:8] == addr_hex[42:]:
return { 'hex':addr_hex[2:42], 'format':addr_fmt } if return_dict else True
else:
if verbose: Msg("Invalid checksum in address '{}'".format(addr))
break
if verbose: Msg("Invalid address '{}'".format(addr))
return False
@classmethod
def hexaddr2addr(cls,hexaddr,p2sh=False):
s = cls.addr_ver_num[('p2pkh','p2sh')[p2sh]][0] + hexaddr
lzeroes = (len(s) - len(s.lstrip('0'))) / 2
return ('1' * lzeroes) + _numtob58(int(s+hash256(s)[:8],16))
# Segwit:
@classmethod
def pubhex2redeem_script(cls,pubhex):
# https://bitcoincore.org/en/segwit_wallet_dev/
# The P2SH redeemScript is always 22 bytes. It starts with a OP_0, followed
# by a canonical push of the keyhash (i.e. 0x0014{20-byte keyhash})
return '0014' + hash160(pubhex)
@classmethod
def pubhex2segwitaddr(cls,pubhex):
return cls.hexaddr2addr(hash160(cls.pubhex2redeem_script(pubhex)),p2sh=True)
class BitcoinTestnetProtocol(BitcoinProtocol):
addr_ver_num = { 'p2pkh': ('6f','mn'), 'p2sh': ('c4','2') }
addr_pfxs = 'mn2'
uncompressed_wif_pfx = '9'
privkey_pfx = 'ef'
data_subdir = 'testnet3'
rpc_port = 18332
class BitcoinCashProtocol(BitcoinProtocol):
# TODO: assumes MSWin user installs in custom dir 'Bitcoin_ABC'
daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin_ABC') if g.platform == 'win' \
else os.path.join(g.home_dir,'.bitcoin-abc')
rpc_port = 8442
mmtypes = ('L','C')
sighash_type = 'ALL|FORKID'
block0 = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
forks = [
(478559,'000000000000000000651ef99cb9fcbe0dadde1d424bd9f15ff20136191a5eec','btc')
]
@classmethod
def pubhex2redeem_script(cls,pubhex): raise NotImplementedError
@classmethod
def pubhex2segwitaddr(cls,pubhex): raise NotImplementedError
class BitcoinCashTestnetProtocol(BitcoinTestnetProtocol):
rpc_port = 18442
@classmethod
def pubhex2redeem_script(cls,pubhex): raise NotImplementedError
@classmethod
def pubhex2segwitaddr(cls,pubhex): raise NotImplementedError
class LitecoinProtocol(BitcoinProtocol): pass
class LitecoinTestnetProtocol(LitecoinProtocol): pass
class EthereumProtocol(MMGenObject): pass
class EthereumTestnetProtocol(EthereumProtocol): pass

View file

@ -163,6 +163,7 @@ class BitcoinRPCConnection(object):
'getnetworkinfo',
'getpeerinfo',
'getrawmempool',
'getmempoolentry',
'getrawtransaction',
'gettransaction',
'importaddress',

View file

@ -21,9 +21,9 @@
tool.py: Routines and data for the 'mmgen-tool' utility
"""
import binascii as ba
import binascii
import mmgen.bitcoin as mmb
from mmgen.protocol import hash160
from mmgen.common import *
from mmgen.crypto import *
from mmgen.tx import *
@ -61,8 +61,8 @@ cmd_data = OrderedDict([
('Wif2hex', ['<wif> [str-]']),
('Wif2addr', ['<wif> [str-]','segwit [bool=False]']),
('Wif2segwit_pair',['<wif> [str-]']),
('Hexaddr2addr', ['<btc address in hex format> [str-]']),
('Addr2hexaddr', ['<btc address> [str-]']),
('Hexaddr2addr', ['<coin address in hex format> [str-]']),
('Addr2hexaddr', ['<coin address> [str-]']),
('Privhex2addr', ['<private key in hex format> [str-]','compressed [bool=False]','segwit [bool=False]']),
('Privhex2pubhex',['<private key in hex format> [str-]','compressed [bool=False]']),
('Pubhex2addr', ['<public key in hex format> [str-]','p2sh [bool=False]']), # new
@ -247,7 +247,7 @@ def B58randenc():
print_convert_results(r,enc,dec,'str')
def Randhex(nbytes='32'):
Msg(ba.hexlify(get_random(int(nbytes))))
Msg(binascii.hexlify(get_random(int(nbytes))))
def Randwif(compressed=False):
Msg(PrivKey(get_random(32),compressed).wif)
@ -279,21 +279,24 @@ def Wif2segwit_pair(wif):
rs = ag.to_segwit_redeem_script(pubhex)
Msg('{}\n{}'.format(rs,addr))
def Hexaddr2addr(hexaddr): Msg(mmb.hexaddr2addr(hexaddr))
def Addr2hexaddr(addr): Msg(mmb.verify_addr(addr,return_dict=True)['hex'])
def Hash160(pubkeyhex): Msg(mmb.hash160(pubkeyhex))
def Pubhex2addr(pubkeyhex,p2sh=False): Msg(mmb.hexaddr2addr(mmb.hash160(pubkeyhex),p2sh=p2sh))
def Hexaddr2addr(hexaddr): Msg(g.proto.hexaddr2addr(hexaddr))
def Addr2hexaddr(addr): Msg(g.proto.verify_addr(addr,return_dict=True)['hex'])
def Hash160(pubkeyhex): Msg(hash160(pubkeyhex))
def Pubhex2addr(pubkeyhex,p2sh=False): Msg(g.proto.hexaddr2addr(hash160(pubkeyhex),p2sh=p2sh))
def Wif2hex(wif): Msg(wif2hex(wif))
def Hex2wif(hexpriv,compressed=False):
Msg(mmb.hex2wif(hexpriv,compressed))
def Privhex2addr(privhex,compressed=False,segwit=False):
Msg(g.proto.hex2wif(hexpriv,compressed))
def Privhex2addr(privhex,compressed=False,segwit=False,output_pubhex=False):
if segwit and not compressed:
die(1,'Segwit address can be generated only from a compressed pubkey')
Msg(mmb.privnum2addr(int(privhex,16),compressed,segwit=segwit))
pk = PrivKey(binascii.unhexlify(privhex),compressed=compressed)
ph = kg.to_pubhex(pk)
ag = AddrGenerator(('p2pkh','segwit')[bool(segwit)])
Msg(ph if output_pubhex else ag.to_addr(ph))
def Privhex2pubhex(privhex,compressed=False): # new
Msg(mmb.privnum2pubhex(int(privhex,16),compressed))
return Privhex2addr(privhex,compressed=compressed,output_pubhex=True)
def Pubhex2redeem_script(pubhex): # new
Msg(mmb.pubhex2redeem_script(pubhex))
Msg(g.proto.pubhex2redeem_script(pubhex))
def Wif2redeem_script(wif): # new
privhex = PrivKey(wif=wif)
if not privhex.compressed:
@ -309,7 +312,7 @@ wordlists = 'electrum','tirosh'
dfl_wl_id = 'electrum'
def do_random_mn(nbytes,wordlist):
hexrand = ba.hexlify(get_random(nbytes))
hexrand = binascii.hexlify(get_random(nbytes))
Vmsg('Seed: %s' % hexrand)
for wl_id in ([wordlist],wordlists)[wordlist=='all']:
if wordlist == 'all':
@ -324,10 +327,10 @@ def Mn_rand256(wordlist=dfl_wl_id): do_random_mn(32,wordlist)
def Hex2mn(s,wordlist=dfl_wl_id): Msg(' '.join(baseconv.fromhex(s,wordlist)))
def Mn2hex(s,wordlist=dfl_wl_id): Msg(baseconv.tohex(s.split(),wordlist))
def Strtob58(s,pad=None): Msg(''.join(baseconv.fromhex(ba.hexlify(s),'b58',pad)))
def Strtob58(s,pad=None): Msg(''.join(baseconv.fromhex(binascii.hexlify(s),'b58',pad)))
def Hextob58(s,pad=None): Msg(''.join(baseconv.fromhex(s,'b58',pad)))
def Hextob32(s,pad=None): Msg(''.join(baseconv.fromhex(s,'b32',pad)))
def B58tostr(s): Msg(ba.unhexlify(baseconv.tohex(s,'b58')))
def B58tostr(s): Msg(binascii.unhexlify(baseconv.tohex(s,'b58')))
def B58tohex(s,pad=None): Msg(baseconv.tohex(s,'b58',pad))
def B32tohex(s,pad=None): Msg(baseconv.tohex(s.upper(),'b32',pad))
@ -403,7 +406,9 @@ def Listaddresses(addrs='',minconf=1,showempty=False,pager=False,showbtcaddrs=Tr
die(2,'duplicate {} address ({}) for this MMGen address! ({})'.format(
g.coin,d['address'],addrs[label.mmid]['addr']))
else:
addrs[label.mmid] = { 'amt':BTCAmt('0'), 'lbl':label, 'addr':BTCAddr(d['address']) }
addrs[label.mmid] = { 'amt': BTCAmt('0'),
'lbl': label,
'addr': CoinAddr(d['address'])}
addrs[label.mmid]['amt'] += d['amount']
total += d['amount']
@ -425,7 +430,7 @@ def Listaddresses(addrs='',minconf=1,showempty=False,pager=False,showbtcaddrs=Tr
if label.mmid not in addrs:
addrs[label.mmid] = { 'amt':BTCAmt('0'), 'lbl':label, 'addr':'' }
if showbtcaddrs:
addrs[label.mmid]['addr'] = BTCAddr(addr_arr[0])
addrs[label.mmid]['addr'] = CoinAddr(addr_arr[0])
if not addrs:
die(0,('No tracked addresses with balances!','No tracked addresses!')[showempty])
@ -438,7 +443,7 @@ def Listaddresses(addrs='',minconf=1,showempty=False,pager=False,showbtcaddrs=Tr
max_cmt_len = max(max(len(addrs[k]['lbl'].comment) for k in addrs),7)
out += [fs.format(
mid=MMGenID.fmtc('MMGenID',width=max_mmid_len),
addr=BTCAddr.fmtc('ADDRESS'),
addr=CoinAddr.fmtc('ADDRESS'),
cmt=TwComment.fmtc('COMMENT',width=max_cmt_len),
amt='BALANCE'
)]
@ -533,10 +538,10 @@ def Passwdfile_chksum(infile):
PasswordList(infile=infile,chksum_only=True)
def Hexreverse(s):
Msg(ba.hexlify(ba.unhexlify(s.strip())[::-1]))
Msg(binascii.hexlify(binascii.unhexlify(s.strip())[::-1]))
def Hexlify(s):
Msg(ba.hexlify(s))
Msg(binascii.hexlify(s))
def Hash256(s, file_input=False, hex_input=False):
from hashlib import sha256

View file

@ -45,7 +45,7 @@ class MMGenTrackingWallet(MMGenObject):
amt = MMGenImmutableAttr('amt','BTCAmt'),
label = MMGenListItemAttr('label','TwComment',reassign_ok=True),
twmmid = MMGenImmutableAttr('twmmid','TwMMGenID')
addr = MMGenImmutableAttr('addr','BTCAddr'),
addr = MMGenImmutableAttr('addr','CoinAddr'),
confs = MMGenImmutableAttr('confs',int,typeconv=False),
scriptPubKey = MMGenImmutableAttr('scriptPubKey','HexStr')
days = MMGenListItemAttr('days',int,typeconv=False),
@ -71,9 +71,9 @@ watch-only wallet using '{}-addrimport' and then re-run this program.
self.get_unspent_data()
self.sort_key = 'age'
self.do_sort()
self.total = self.get_total_btc()
self.total = self.get_total_coin()
def get_total_btc(self):
def get_total_coin(self):
return sum(i.amt for i in self.unspent)
def get_unspent_data(self):
@ -95,7 +95,7 @@ watch-only wallet using '{}-addrimport' and then re-run this program.
'label': l.comment,
'days': int(o['confirmations'] * g.mins_per_block / (60*24)),
'amt': BTCAmt(o['amount']), # TODO
'addr': BTCAddr(o['address']), # TODO
'addr': CoinAddr(o['address']), # TODO
'confs': o['confirmations']
})
mm_rpc.append(o)
@ -168,7 +168,7 @@ watch-only wallet using '{}-addrimport' and then re-run this program.
out = [hdr_fmt.format(' '.join(self.sort_info()),g.coin,self.total.hl())]
if g.chain in ('testnet','regtest'):
out += [green('Chain: {}'.format(g.chain.upper()))]
af = BTCAddr.fmtc('Address',width=addr_w+1)
af = CoinAddr.fmtc('Address',width=addr_w+1)
cf = ('Conf.','Age(d)')[self.show_days]
out += [fs % ('Num','TX id'.ljust(tx_w - 5) + ' Vout','',af,'Amt({}) '.format(g.coin),cf)]
@ -312,33 +312,33 @@ Display options: show [D]ays, [g]roup, show [m]mgen addr, r[e]draw screen
# returns on failure
@classmethod
def add_label(cls,arg1,label='',addr=None,silent=False):
from mmgen.tx import is_mmgen_id,is_btc_addr
mmaddr,btcaddr = None,None
if is_btc_addr(addr or arg1):
btcaddr = BTCAddr(addr or arg1,on_fail='return')
from mmgen.tx import is_mmgen_id,is_coin_addr
mmaddr,coinaddr = None,None
if is_coin_addr(addr or arg1):
coinaddr = CoinAddr(addr or arg1,on_fail='return')
if is_mmgen_id(arg1):
mmaddr = TwMMGenID(arg1)
if not btcaddr and not mmaddr:
if not coinaddr and not mmaddr:
msg("Address '{}' invalid or not found in tracking wallet".format(addr or arg1))
return False
if not btcaddr:
if not coinaddr:
from mmgen.addr import AddrData
btcaddr = AddrData(source='tw').mmaddr2btcaddr(mmaddr)
coinaddr = AddrData(source='tw').mmaddr2coinaddr(mmaddr)
if not btcaddr:
if not coinaddr:
msg("{} address '{}' not found in tracking wallet".format(g.proj_name,mmaddr))
return False
# Checked that the user isn't importing a random address
if not btcaddr.is_in_tracking_wallet():
msg("Address '{}' not in tracking wallet".format(btcaddr))
if not coinaddr.is_in_tracking_wallet():
msg("Address '{}' not in tracking wallet".format(coinaddr))
return False
c = rpc_connection()
if not btcaddr.is_for_current_chain():
msg("Address '{}' not valid for chain {}".format(btcaddr,g.chain.upper()))
if not coinaddr.is_for_current_chain():
msg("Address '{}' not valid for chain {}".format(coinaddr,g.chain.upper()))
return False
# Allow for the possibility that BTC addr of MMGen addr was entered.
@ -346,9 +346,9 @@ Display options: show [D]ays, [g]roup, show [m]mgen addr, r[e]draw screen
if not mmaddr:
from mmgen.addr import AddrData
ad = AddrData(source='tw')
mmaddr = ad.btcaddr2mmaddr(btcaddr)
mmaddr = ad.coinaddr2mmaddr(coinaddr)
if not mmaddr: mmaddr = 'btc:'+btcaddr
if not mmaddr: mmaddr = 'btc:'+coinaddr
mmaddr = TwMMGenID(mmaddr)
@ -361,7 +361,7 @@ Display options: show [D]ays, [g]roup, show [m]mgen addr, r[e]draw screen
# associating the new account with the address.
# Will be replaced by setlabel() with new RPC label API
# RPC args: addr,label,rescan[=true],p2sh[=none]
ret = c.importaddress(btcaddr,lbl,False,on_fail='return')
ret = c.importaddress(coinaddr,lbl,False,on_fail='return')
from mmgen.rpc import rpc_error,rpc_errmsg
if rpc_error(ret):

View file

@ -118,7 +118,7 @@ txio_attrs = {
'amt': MMGenImmutableAttr('amt','BTCAmt'),
'label': MMGenListItemAttr('label','TwComment',reassign_ok=True),
'mmid': MMGenListItemAttr('mmid','MMGenID'),
'addr': MMGenImmutableAttr('addr','BTCAddr'),
'addr': MMGenImmutableAttr('addr','CoinAddr'),
'confs': MMGenListItemAttr('confs',int,typeconv=True), # long confs exist in the wild, so convert
'txid': MMGenListItemAttr('txid','BitcoinTxID'),
'have_wif': MMGenListItemAttr('have_wif',bool,typeconv=False,delete_ok=True)
@ -150,7 +150,7 @@ class MMGenTX(MMGenObject):
self.hex = '' # raw serialized hex transaction
self.label = MMGenTXLabel('')
self.txid = ''
self.btc_txid = ''
self.coin_txid = ''
self.timestamp = ''
self.chksum = ''
self.fmt_data = ''
@ -168,8 +168,8 @@ class MMGenTX(MMGenObject):
if self.chain and g.chain and self.chain != g.chain:
die(2,'Transaction is for {}, but current chain is {}!'.format(self.chain,g.chain))
def add_output(self,btcaddr,amt,is_chg=None):
self.outputs.append(self.MMGenTxOutput(addr=btcaddr,amt=amt,is_chg=is_chg))
def add_output(self,coinaddr,amt,is_chg=None):
self.outputs.append(self.MMGenTxOutput(addr=coinaddr,amt=amt,is_chg=is_chg))
def get_chg_output_idx(self):
for i in range(len(self.outputs)):
@ -302,8 +302,8 @@ class MMGenTX(MMGenObject):
def get_fee(self):
return self.sum_inputs() - self.sum_outputs()
def btc2spb(self,btc_fee):
return int(btc_fee/g.satoshi/self.estimate_size())
def btc2spb(self,coin_fee):
return int(coin_fee/g.satoshi/self.estimate_size())
def get_relay_fee(self):
assert self.estimate_size()
@ -326,38 +326,38 @@ class MMGenTX(MMGenObject):
assert False, "'{}': invalid tx-fee argument".format(tx_fee)
def get_usr_fee(self,tx_fee,desc='Missing description'):
btc_fee = self.convert_fee_spec(tx_fee,self.estimate_size(),on_fail='return')
if btc_fee == None:
coin_fee = self.convert_fee_spec(tx_fee,self.estimate_size(),on_fail='return')
if coin_fee == None:
# we shouldn't be calling this if tx size is unknown
m = "'{}': cannot convert satoshis-per-byte to {} because transaction size is unknown"
assert False, m.format(tx_fee,g.coin)
elif btc_fee == False:
elif coin_fee == False:
m = "'{}': invalid TX fee (not a {} amount or satoshis-per-byte specification)"
msg(m.format(tx_fee,g.coin))
return False
elif btc_fee > g.max_tx_fee:
elif coin_fee > g.max_tx_fee:
m = '{} {c}: {} fee too large (maximum fee: {} {c})'
msg(m.format(btc_fee,desc,g.max_tx_fee,c=g.coin))
msg(m.format(coin_fee,desc,g.max_tx_fee,c=g.coin))
return False
elif btc_fee < self.get_relay_fee():
elif coin_fee < self.get_relay_fee():
m = '{} {c}: {} fee too small (below relay fee of {} {c})'
msg(m.format(str(btc_fee),desc,str(self.get_relay_fee()),c=g.coin))
msg(m.format(str(coin_fee),desc,str(self.get_relay_fee()),c=g.coin))
return False
else:
return btc_fee
return coin_fee
def get_usr_fee_interactive(self,tx_fee=None,desc='Starting'):
btc_fee = None
coin_fee = None
while True:
if tx_fee:
btc_fee = self.get_usr_fee(tx_fee,desc)
if btc_fee:
coin_fee = self.get_usr_fee(tx_fee,desc)
if coin_fee:
m = ('',' (after {}x adjustment)'.format(opt.tx_fee_adj))[opt.tx_fee_adj != 1]
p = '{} TX fee{}: {} {} ({} satoshis per byte)'.format(desc,m,
btc_fee.hl(),g.coin,pink(str(self.btc2spb(btc_fee))))
coin_fee.hl(),g.coin,pink(str(self.btc2spb(coin_fee))))
if opt.yes or keypress_confirm(p+'. OK?',default_yes=True):
if opt.yes: msg(p)
return btc_fee
return coin_fee
tx_fee = my_raw_input('Enter transaction fee: ')
desc = 'User-selected'
@ -420,9 +420,9 @@ class MMGenTX(MMGenObject):
]
if self.label:
lines.append(baseconv.b58encode(self.label.encode('utf8')))
if self.btc_txid:
if self.coin_txid:
if not self.label: lines.append('-') # keep old tx files backwards compatible
lines.append(self.btc_txid)
lines.append(self.coin_txid)
self.chksum = make_chksum_6(' '.join(lines))
self.fmt_data = '\n'.join([self.chksum] + lines)+'\n'
@ -455,11 +455,10 @@ class MMGenTX(MMGenObject):
sig_data.append(e)
msg_r('Signing transaction{}...'.format(tx_num_str))
ht = ('ALL','ALL|FORKID')[g.coin=='BCH'] # sighashtype defaults to 'ALL'
wifs = [d.sec.wif for d in keys]
# keys.pmsg()
# pmsg(wifs)
ret = c.signrawtransaction(self.hex,sig_data,wifs,ht,on_fail='return')
ret = c.signrawtransaction(self.hex,sig_data,wifs,g.proto.sighash_type,on_fail='return')
from mmgen.rpc import rpc_error,rpc_errmsg
if rpc_error(ret):
@ -479,7 +478,7 @@ class MMGenTX(MMGenObject):
txid = dt['txid']
self.check_sigs(dt)
assert txid == c.decoderawtransaction(self.hex)['txid'], 'txid mismatch (after signing)'
self.btc_txid = BitcoinTxID(txid,on_fail='return')
self.coin_txid = BitcoinTxID(txid,on_fail='return')
msg('OK')
return True
else:
@ -523,10 +522,10 @@ class MMGenTX(MMGenObject):
return any(o.mmid and o.mmid.mmtype == 'S' for o in self.outputs)
def is_in_mempool(self,c):
return 'size' in c.getmempoolentry(self.btc_txid,on_fail='silent')
return 'size' in c.getmempoolentry(self.coin_txid,on_fail='silent')
def is_in_wallet(self,c):
ret = c.gettransaction(self.btc_txid,on_fail='silent')
ret = c.gettransaction(self.coin_txid,on_fail='silent')
if 'confirmations' in ret and ret['confirmations'] > 0:
return ret['confirmations']
else:
@ -534,13 +533,13 @@ class MMGenTX(MMGenObject):
def is_replaced(self,c):
if self.is_in_mempool(c): return False
ret = c.gettransaction(self.btc_txid,on_fail='silent')
ret = c.gettransaction(self.coin_txid,on_fail='silent')
if not 'bip125-replaceable' in ret or not 'confirmations' in ret or ret['confirmations'] > 0:
return False
return -ret['confirmations'] + 1 # 1: replacement in mempool, 2: replacement confirmed
def is_in_utxos(self,c):
return 'txid' in c.getrawtransaction(self.btc_txid,True,on_fail='silent')
return 'txid' in c.getrawtransaction(self.coin_txid,True,on_fail='silent')
def get_status(self,c,status=False):
if self.is_in_mempool(c):
@ -596,17 +595,17 @@ class MMGenTX(MMGenObject):
if bogus_send:
m = 'BOGUS transaction NOT sent: {}'
else:
assert ret == self.btc_txid, 'txid mismatch (after sending)'
assert ret == self.coin_txid, 'txid mismatch (after sending)'
m = 'Transaction sent: {}'
self.desc = 'sent transaction'
msg(m.format(self.btc_txid.hl()))
msg(m.format(self.coin_txid.hl()))
self.add_timestamp()
self.add_blockcount(c)
return True
def write_txid_to_file(self,ask_write=False,ask_write_default_yes=True):
fn = '%s[%s].%s' % (self.txid,self.send_amt,self.txid_ext)
write_data_to_file(fn,self.btc_txid+'\n','transaction ID',
write_data_to_file(fn,self.coin_txid+'\n','transaction ID',
ask_write=ask_write,
ask_write_default_yes=ask_write_default_yes)
@ -704,8 +703,8 @@ class MMGenTX(MMGenObject):
self.is_rbf(color=True),self.marked_signed(color=True))
if self.chain in ('testnet','regtest'):
out += green('Chain: {}\n'.format(self.chain.upper()))
if self.btc_txid:
out += '{} TxID: {}\n'.format(g.coin,self.btc_txid.hl())
if self.coin_txid:
out += '{} TxID: {}\n'.format(g.coin,self.coin_txid.hl())
enl = ('\n','')[bool(terse)]
out += enl
if self.label:
@ -747,8 +746,8 @@ class MMGenTX(MMGenObject):
do_err('checksum')
if len(tx_data) == 6:
self.btc_txid = BitcoinTxID(tx_data.pop(-1),on_fail='return')
if not self.btc_txid:
self.coin_txid = BitcoinTxID(tx_data.pop(-1),on_fail='return')
if not self.coin_txid:
do_err('Bitcoin TxID')
if len(tx_data) == 5:
@ -787,7 +786,7 @@ class MMGenTX(MMGenObject):
try: self.inputs = self.decode_io('inputs',eval(inputs_data))
except: do_err('inputs data')
if not self.chain and not self.inputs[0].addr.testnet:
if not self.chain and not self.inputs[0].addr.is_testnet():
self.chain = 'mainnet'
try: self.outputs = self.decode_io('outputs',eval(outputs_data))
@ -809,10 +808,10 @@ class MMGenBumpTX(MMGenTX):
if send:
if not self.marked_signed():
die(1,"File '{}' is not a signed {} transaction file".format(filename,g.proj_name))
if not self.btc_txid:
if not self.coin_txid:
die(1,"Transaction '{}' was not broadcast to the network".format(self.txid,g.proj_name))
self.btc_txid = ''
self.coin_txid = ''
self.mark_raw()
def choose_output(self):

View file

@ -54,7 +54,7 @@ FEE SPECIFICATION: Transaction fees, both on the command line and at the
interactive prompt, may be specified as either absolute {} amounts, using
a plain decimal number, or as satoshis per byte, using an integer followed by
the letter 's'.
""" # formatted later, after g.coin is initialized
"""
wmsg = {
'addr_in_addrfile_only': """
@ -79,7 +79,7 @@ inputs must be supplied to '{pnl}-txsign' in a file with the '--keys-from-file'
option.
Selected non-{pnm} inputs: {{}}
""".strip().format(pnm=pnm,pnl=pnm.lower()),
'not_enough_btc': """
'not_enough_coin': """
Selected outputs insufficient to fund this transaction ({{}} {} needed)
""".strip().format(g.coin),
'no_change_output': """
@ -98,15 +98,15 @@ def select_unspent(unspent,prompt):
return selected
msg('Unspent output number must be <= %s' % len(unspent))
def mmaddr2baddr(c,mmaddr,ad_w,ad_f):
def mmaddr2coinaddr(c,mmaddr,ad_w,ad_f):
# assume mmaddr has already been checked
btc_addr = ad_w.mmaddr2btcaddr(mmaddr)
coin_addr = ad_w.mmaddr2coinaddr(mmaddr)
if not btc_addr:
if not coin_addr:
if ad_f:
btc_addr = ad_f.mmaddr2btcaddr(mmaddr)
if btc_addr:
coin_addr = ad_f.mmaddr2coinaddr(mmaddr)
if coin_addr:
msg(wmsg['addr_in_addrfile_only'].format(mmgenaddr=mmaddr))
if not keypress_confirm('Continue anyway?'):
sys.exit(1)
@ -115,7 +115,7 @@ def mmaddr2baddr(c,mmaddr,ad_w,ad_f):
else:
die(2,wmsg['addr_not_found_no_addrfile'].format(pnm=pnm,mmgenaddr=mmaddr))
return BTCAddr(btc_addr)
return CoinAddr(coin_addr)
def get_fee_from_estimate_or_user(tx,estimate_fail_msg_shown=[]):
@ -155,16 +155,16 @@ def get_outputs_from_cmdline(cmd_args,tx):
for a in cmd_args:
if ',' in a:
a1,a2 = a.split(',',1)
if is_mmgen_id(a1) or is_btc_addr(a1):
btc_addr = mmaddr2baddr(c,a1,ad_w,ad_f) if is_mmgen_id(a1) else BTCAddr(a1)
tx.add_output(btc_addr,BTCAmt(a2))
if is_mmgen_id(a1) or is_coin_addr(a1):
coin_addr = mmaddr2coinaddr(c,a1,ad_w,ad_f) if is_mmgen_id(a1) else CoinAddr(a1)
tx.add_output(coin_addr,BTCAmt(a2))
else:
die(2,"%s: unrecognized subargument in argument '%s'" % (a1,a))
elif is_mmgen_id(a) or is_btc_addr(a):
elif is_mmgen_id(a) or is_coin_addr(a):
if tx.get_chg_output_idx() != None:
die(2,'ERROR: More than one change address listed on command line')
btc_addr = mmaddr2baddr(c,a,ad_w,ad_f) if is_mmgen_id(a) else BTCAddr(a)
tx.add_output(btc_addr,BTCAmt('0'),is_chg=True)
coin_addr = mmaddr2coinaddr(c,a,ad_w,ad_f) if is_mmgen_id(a) else CoinAddr(a)
tx.add_output(coin_addr,BTCAmt('0'),is_chg=True)
else:
die(2,'%s: unrecognized argument' % a)
@ -191,7 +191,7 @@ def get_inputs_from_user(tw,tx,caller):
t_inputs = sum(s.amt for s in sel_unspent)
if t_inputs < tx.send_amt:
msg(wmsg['not_enough_btc'].format(tx.send_amt-t_inputs))
msg(wmsg['not_enough_coin'].format(tx.send_amt-t_inputs))
continue
non_mmaddrs = [i for i in sel_unspent if i.twmmid.type == 'non-mmgen']
@ -210,7 +210,7 @@ def get_inputs_from_user(tw,tx,caller):
if opt.yes: msg(p)
return change_amt
else:
msg(wmsg['not_enough_btc'].format(abs(change_amt)))
msg(wmsg['not_enough_coin'].format(abs(change_amt)))
def txcreate(cmd_args,do_info=False,caller='txcreate'):

View file

@ -516,8 +516,6 @@ def confirm_or_exit(message,question,expect='YES',exit_msg='Exiting at user requ
if my_raw_input(a+b).strip() != expect:
die(1,exit_msg)
# New function
def write_data_to_file(
outfile,
data,
@ -781,7 +779,7 @@ def do_license_msg(immed=False):
def get_bitcoind_cfg_options(cfg_keys):
cfg_file = os.path.join(g.bitcoin_data_dir,'bitcoin.conf')
cfg_file = os.path.join(g.daemon_data_dir,'bitcoin.conf')
cfg = dict([(k,v) for k,v in [split2(str(line).translate(None,'\t '),'=')
for line in get_lines_from_file(cfg_file,'')] if k in cfg_keys]) \
@ -792,39 +790,37 @@ def get_bitcoind_cfg_options(cfg_keys):
return cfg
def get_bitcoind_auth_cookie():
f = os.path.join(g.bitcoin_data_dir,('',g.testnet_name)[g.testnet],'.cookie')
f = os.path.join(g.daemon_data_dir,g.proto.data_subdir,'.cookie')
return get_lines_from_file(f,'')[0] if file_is_readable(f) else ''
def rpc_connection():
def check_coin_mismatch(c):
if c.getblockcount() == 0:
msg('Warning: no blockchain, so skipping block mismatch check')
return
fb = '00000000000000000019f112ec0a9982926f1258cdcc558dd7c3b7e5dc7fa148'
err = []
if c.getblockchaininfo()['blocks'] <= 478558 or c.getblockhash(478559) == fb:
if g.coin == 'BCH': err = 'BCH','BTC'
elif g.coin == 'BTC': err = 'BTC','BCH'
if err: ydie(2,"'{}' requested, but this is the {} chain!".format(*err))
def check_chainfork_mismatch(c):
block0 = c.getblockhash(0)
latest = c.getblockcount()
try:
assert block0 == g.proto.block0,'Incorrect Genesis block for {}'.format(g.proto.__name__)
for fork in g.proto.forks:
if latest < fork[0]: break
bhash = c.getblockhash(fork[0])
assert bhash == fork[1], (
'Bad block hash at fork block {}. Is this the {} chain?'.format(fork[0],fork[2].upper()))
except Exception as e:
die(2,"{}\n'{c}' requested, but this is not the {c} chain!".format(e,c=g.coin))
def check_chain_mismatch():
err = None
if g.regtest and g.chain != 'regtest':
err = '--regtest option'
elif g.testnet and g.chain == 'mainnet':
err = '--testnet option'
# we won't actually get here, as connect will fail first
elif (not g.testnet) and g.chain != 'mainnet':
err = 'mainnet'
if err:
die(1,'{} selected but chain is {}'.format(err,g.chain))
def check_chaintype_mismatch():
try:
if g.regtest: assert g.chain == 'regtest','--regtest option selected, but chain is not regtest'
if g.testnet: assert g.chain != 'mainnet','--testnet option selected, but chain is mainnet'
if not g.testnet: assert g.chain == 'mainnet','mainnet selected, but chain is not mainnet'
except Exception as e:
die(1,'{}\nChain is {}!'.format(e,g.chain))
cfg = get_bitcoind_cfg_options(('rpcuser','rpcpassword'))
import mmgen.rpc
c = mmgen.rpc.BitcoinRPCConnection(
g.rpc_host or 'localhost',
g.rpc_port or g.ports[g.coin][g.testnet],
g.rpc_port or g.proto.rpc_port,
g.rpc_user or cfg['rpcuser'], # MMGen's rpcuser,rpcpassword override bitcoind's
g.rpc_password or cfg['rpcpassword'],
auth_cookie=get_bitcoind_auth_cookie())
@ -835,9 +831,9 @@ def rpc_connection():
rt.user(('alice','bob')[g.bob],quiet=True)
g.bitcoind_version = int(c.getnetworkinfo()['version'])
g.chain = c.getblockchaininfo()['chain']
if g.chain != 'regtest':
g.chain += 'net'
if g.chain != 'regtest': g.chain += 'net'
assert g.chain in g.chains
check_chaintype_mismatch()
if g.chain == 'mainnet':
check_coin_mismatch(c)
check_chainfork_mismatch(c)
return c

View file

@ -13,9 +13,9 @@ do
echo " '-t' Print the tests without running them"
echo " AVAILABLE TESTS:"
echo " 1 - main"
echo " 2 - regtest"
echo " 3 - tool"
echo " 4 - gen"
echo " 2 - tooltest"
echo " 3 - gentest"
echo " 4 - regtest"
echo " By default, all tests are run"
exit ;;
i) INSTALL_ONLY=1 ;;
@ -67,15 +67,16 @@ function do_test {
T1=('test/test.py -On'
'test/test.py -On --segwit dfl_wallet main ref ref_other'
'test/test.py -On --coin=bch dfl_wallet main ref ref_other'
'test/test.py -On --segwit-random dfl_wallet main')
T2=('test/test.py -On regtest')
T3=('test/tooltest.py') # tooltest tests both segwit and non-segwit
T4=("test/gentest.py -q 2 $REFDIR/btcwallet.dump"
T2=('test/tooltest.py' 'test/tooltest.py --testnet=1') # tooltest tests both segwit and non-segwit
T3=("test/gentest.py -q 2 $REFDIR/btcwallet.dump"
"test/gentest.py -q --testnet=1 2 $REFDIR/btcwallet-testnet.dump"
'test/gentest.py -q 1:2 10'
'test/gentest.py -q --segwit 1:2 10'
# "scripts/tx-old2new.py -S $REFDIR/tx_*raw >/dev/null 2>&1"
"scripts/compute-file-chksum.py $REFDIR/*testnet.rawtx >/dev/null 2>&1")
T4=('test/test.py -On regtest')
[ -d .git -a -z "$NO_INSTALL" -a -z "$TESTING" ] && {
check
@ -84,9 +85,26 @@ T4=("test/gentest.py -q 2 $REFDIR/btcwallet.dump"
}
[ "$INSTALL_ONLY" ] && exit
if [ "$*" ]; then TESTS=$@; else TESTS='1 2 3 4'; fi
for t in $TESTS; do
[ $t == 4 ] && LS=''
eval "do_test \"\${T$t[@]}\""
done
function run_tests {
for t in $1; do
[ $t == 4 ] && LS=''
eval "do_test \"\${T$t[@]}\""
done
}
if [ "$*" ]; then
run_tests "$*"
else
echo 'Bitcoin and Bitcoin ABC must both be running for the following tests'
echo 'The bitcoin-abc daemon must be listening on RPC port 8442 (-rpcport 8442)'
echo -n 'Hit ENTER to continue: '; read
run_tests '1'
echo 'The bitcoin (mainnet) and testnet daemons must both be running for the following tests'
echo -n 'Hit ENTER to continue: '; read
run_tests '2 3'
echo 'You may stop the mainnet and testnet daemons now'
echo -n 'Hit ENTER to continue: '; read
run_tests '4'
fi
echo -e "$LS${GREEN}All OK$RESET"

View file

@ -109,7 +109,7 @@ setup(
py_modules = [
'mmgen.__init__',
'mmgen.addr',
'mmgen.bitcoin',
'mmgen.protocol',
'mmgen.color',
'mmgen.common',
'mmgen.crypto',

View file

@ -92,15 +92,11 @@ class MMGenPexpect(object):
atexit.register(lambda: os.system('stty sane'))
NL = '\n'
data_dir = os.path.join('test','data_dir')
add_spawn_args = ' '.join(['{} {}'.format('--'+k.replace('_','-'),
getattr(opt,k) if getattr(opt,k) != True else ''
) for k in ('testnet','rpc_host','rpc_port','regtest','coin') if getattr(opt,k)]).split()
add_spawn_args += ['--data-dir',data_dir]
def __init__(self,name,mmgen_cmd,cmd_args,desc,no_output=False):
cmd_args = self.add_spawn_args + cmd_args
def __init__(self,name,mmgen_cmd,cmd_args,desc,no_output=False,passthru_args=[]):
cmd_args = ['--{}{}'.format(k.replace('_','-'),
'='+getattr(opt,k) if getattr(opt,k) != True else ''
) for k in passthru_args if getattr(opt,k)] \
+ ['--data-dir='+os.path.join('test','data_dir')] + cmd_args
cmd = (('./','')[bool(opt.system)]+mmgen_cmd,'python')[g.platform=='win']
args = (cmd_args,[mmgen_cmd]+cmd_args)[g.platform=='win']

View file

@ -29,6 +29,7 @@ sys.path.__setitem__(0,os.path.abspath(os.curdir))
# Import these _after_ local path's been added to sys.path
from mmgen.common import *
from mmgen.test import *
from mmgen.protocol import get_coin_protocol
g.quiet = False # if 'quiet' was set in config file, disable here
os.environ['MMGEN_QUIET'] = '0' # and for the spawned scripts
@ -41,7 +42,7 @@ hincog_offset = 98765
hincog_seedlen = 256
incog_id_fn = 'incog_id'
non_mmgen_fn = 'btckey'
non_mmgen_fn = 'coinkey'
pwfile = 'passwd_file'
ref_dir = os.path.join('test','ref')
@ -147,6 +148,9 @@ sys.argv = [sys.argv[0]] + ['--data-dir',data_dir] + sys.argv[1:]
cmd_args = opts.init(opts_data)
opt.popen_spawn = True # popen has issues, so use popen_spawn always
if opt.segwit and 'S' not in g.proto.mmtypes:
die(1,'--segwit option incompatible with {}'.format(g.proto.__name__))
tn_desc = ('','.testnet')[g.testnet]
def randbool():
@ -794,22 +798,23 @@ def verify_checksum_or_exit(checksum,chk):
from test.mmgen_pexpect import MMGenPexpect
class MMGenExpect(MMGenPexpect):
def __init__(self,name,mmgen_cmd,cmd_args=[],extra_desc='',no_output=False):
desc = (cmd_data[name][1],name)[bool(opt.names)] + (' ' + extra_desc).strip()
return MMGenPexpect.__init__(self,name,mmgen_cmd,cmd_args,desc,no_output=no_output)
pa = ['testnet','rpc_host','rpc_port','regtest','coin']
return MMGenPexpect.__init__(self,name,mmgen_cmd,cmd_args,desc,no_output=no_output,passthru_args=pa)
def create_fake_unspent_entry(btcaddr,al_id=None,idx=None,lbl=None,non_mmgen=False,segwit=False):
def create_fake_unspent_entry(coinaddr,al_id=None,idx=None,lbl=None,non_mmgen=False,segwit=False):
if 'S' not in g.proto.mmtypes: segwit = False
if lbl: lbl = ' ' + lbl
spk1,spk2 = (('76a914','88ac'),('a914','87'))[segwit and btcaddr.addr_fmt=='p2sh']
spk1,spk2 = (('76a914','88ac'),('a914','87'))[segwit and coinaddr.addr_fmt=='p2sh']
return {
'account': 'btc:{}'.format(btcaddr) if non_mmgen else (u'{}:{}{}'.format(al_id,idx,lbl.decode('utf8'))),
'account': 'btc:{}'.format(coinaddr) if non_mmgen else (u'{}:{}{}'.format(al_id,idx,lbl.decode('utf8'))),
'vout': int(getrandnum(4) % 8),
'txid': hexlify(os.urandom(32)).decode('utf8'),
'amount': BTCAmt('%s.%s' % (10+(getrandnum(4) % 40), getrandnum(4) % 100000000)),
'address': btcaddr,
'address': coinaddr,
'spendable': False,
'scriptPubKey': '{}{}{}'.format(spk1,btcaddr.hex,spk2),
'scriptPubKey': '{}{}{}'.format(spk1,coinaddr.hex,spk2),
'confirmations': getrandnum(4) % 50000
}
@ -842,21 +847,21 @@ def create_fake_unspent_data(adata,tx_data,non_mmgen_input=''):
out = []
for d in tx_data.values():
al = adata.addrlist(d['al_id'])
for n,(idx,btcaddr) in enumerate(al.addrpairs()):
for n,(idx,coinaddr) in enumerate(al.addrpairs()):
while True:
try: lbl = next(label_iter)
except: label_iter = iter(labels)
else: break
out.append(create_fake_unspent_entry(btcaddr,d['al_id'],idx,lbl,segwit=d['segwit']))
out.append(create_fake_unspent_entry(coinaddr,d['al_id'],idx,lbl,segwit=d['segwit']))
if n == 0: # create a duplicate address. This means addrs_per_wallet += 1
out.append(create_fake_unspent_entry(btcaddr,d['al_id'],idx,lbl,segwit=d['segwit']))
out.append(create_fake_unspent_entry(coinaddr,d['al_id'],idx,lbl,segwit=d['segwit']))
if non_mmgen_input:
privkey = PrivKey(os.urandom(32),compressed=True)
btcaddr = AddrGenerator('p2pkh').to_addr(KeyGenerator().to_pubhex(privkey))
coinaddr = AddrGenerator('p2pkh').to_addr(KeyGenerator().to_pubhex(privkey))
of = os.path.join(cfgs[non_mmgen_input]['tmpdir'],non_mmgen_fn)
write_data_to_file(of,privkey.wif+'\n','compressed bitcoin key',silent=True)
out.append(create_fake_unspent_entry(btcaddr,non_mmgen=True,segwit=False))
out.append(create_fake_unspent_entry(coinaddr,non_mmgen=True,segwit=False))
# msg('\n'.join([repr(o) for o in out])); sys.exit(0)
return out
@ -893,7 +898,8 @@ def create_tx_data(sources):
def make_txcreate_cmdline(tx_data):
privkey = PrivKey(os.urandom(32),compressed=True)
btcaddr = AddrGenerator('segwit').to_addr(KeyGenerator().to_pubhex(privkey))
t = ('p2pkh','segwit')['S' in g.proto.mmtypes]
coinaddr = AddrGenerator(t).to_addr(KeyGenerator().to_pubhex(privkey))
cmd_args = ['-d',cfg['tmpdir']]
for num in tx_data:
@ -904,7 +910,7 @@ def make_txcreate_cmdline(tx_data):
# + one change address and one BTC address
if num is tx_data.keys()[-1]:
cmd_args += ['{}:{}'.format(s['al_id'],s['addr_idxs'][1])]
cmd_args += ['{},{}'.format(btcaddr,cfgs[num]['amts'][1])]
cmd_args += ['{},{}'.format(coinaddr,cfgs[num]['amts'][1])]
return cmd_args + [tx_data[num]['addrfile'] for num in tx_data]
@ -1933,10 +1939,8 @@ class MMGenTestSuite(object):
def regtest_addrimport_alice(self,name): self.regtest_addrimport(name,'alice')
def regtest_fund_wallet(self,name,user,mmtype,amt):
fn = get_file_with_ext('-{}[1-5].addrs'.format(mmtype),self.regtest_user_dir(user),no_dot=True)
silence()
addr = AddrList(fn).data[0].addr
end_silence()
sid = self.regtest_user_sid(user)
addr = self.get_addr_from_regtest_addrlist(user,sid,mmtype,0)
t = MMGenExpect(name,'mmgen-regtest', ['send',str(addr),str(amt)])
t.expect('Sending {} BTC'.format(amt))
t.expect('Mined 1 block')
@ -2004,23 +2008,27 @@ class MMGenTestSuite(object):
outputs_cl = [sid+':C:1,100', sid+':L:2,200',sid+':S:2']
return self.regtest_user_txdo(name,'bob','20s',outputs_cl,'1')
def get_addr_from_regtest_addrlist(self,user,sid,mmtype,idx):
id_str = { 'L':'', 'S':'-S', 'C':'-C' }[mmtype]
fn = get_file_with_ext('{}{}[1-5].addrs'.format(sid,id_str),self.regtest_user_dir(user),no_dot=True)
silence()
g.proto = get_coin_protocol(g.coin,True)
addr = AddrList(fn).data[idx].addr
g.proto = get_coin_protocol(g.coin,g.testnet)
end_silence()
return addr
def create_tx_outputs(self,user,data):
o,sid = [],self.regtest_user_sid(user)
for id_str,idx,amt_str in data:
fn = get_file_with_ext('{}{}[1-5].addrs'.format(sid,id_str),self.regtest_user_dir(user),no_dot=True)
silence()
addr = AddrList(fn).data[idx-1].addr
end_silence()
o.append(addr+amt_str)
return o
sid = self.regtest_user_sid(user)
return [self.get_addr_from_regtest_addrlist(user,sid,mmtype,idx-1)+amt_str for mmtype,idx,amt_str in data]
def regtest_bob_rbf_send(self,name):
outputs_cl = self.create_tx_outputs('alice',(('',1,',60'),('-C',1,',40'))) # alice_sid:L:1, alice_sid:C:1
outputs_cl = self.create_tx_outputs('alice',(('L',1,',60'),('C',1,',40'))) # alice_sid:L:1, alice_sid:C:1
outputs_cl += [self.regtest_user_sid('bob')+':S:2']
return self.regtest_user_txdo(name,'bob','10s',outputs_cl,'3',extra_args=['--rbf'])
def regtest_bob_send_non_mmgen(self,name):
outputs_cl = self.create_tx_outputs('alice',(('-S',2,',10'),('-S',3,''))) # alice_sid:S:2, alice_sid:S:3
outputs_cl = self.create_tx_outputs('alice',(('S',2,',10'),('S',3,''))) # alice_sid:S:2, alice_sid:S:3
fn = os.path.join(cfg['tmpdir'],'non-mmgen.keys')
return self.regtest_user_txdo(name,'bob','0.0001',outputs_cl,'3-9',extra_args=['--keys-from-file='+fn])

View file

@ -110,9 +110,9 @@ cfg = {
'tmpdir': 'test/tmp10',
'tmpdir_num': 10,
'refdir': 'test/ref',
'txfile': 'FFB367[1.234].rawtx',
'addrfile': '98831F3A[1,31-33,500-501,1010-1011].addrs',
'addrfile_chk': '6FEF 6FB9 7B13 5D91',
'txfile': 'FFB367[1.234]{}.rawtx',
'addrfile': '98831F3A[1,31-33,500-501,1010-1011]{}.addrs',
'addrfile_chk': ('6FEF 6FB9 7B13 5D91','3C2C 8558 BB54 079E'),
}
opts_data = lambda: {
@ -169,7 +169,7 @@ if opt.list_names:
import binascii
from mmgen.test import *
from mmgen.tx import is_wif,is_btc_addr
from mmgen.tx import is_wif,is_coin_addr
msg_w = 35
def test_msg(m):
@ -329,7 +329,7 @@ class MMGenToolTestSuite(object):
for n,k in enumerate(['','compressed=1','segwit=1 compressed=1']):
wif,addr = self.run_cmd_out(name,kwargs=k,Return=True,fn_idx=n+1).split()
ok_or_die(wif,is_wif,'WIF key',skip_ok=True)
ok_or_die(addr,is_btc_addr,'Bitcoin address')
ok_or_die(addr,is_coin_addr,'Bitcoin address')
def Wif2addr(self,name,f1,f2,f3):
for n,f,k,m in ((1,f1,'',''),(2,f2,'','compressed'),(3,f3,'segwit=1','compressed')):
wif = read_from_file(f).split()[0]
@ -356,7 +356,7 @@ class MMGenToolTestSuite(object):
self.run_cmd_chk(name,fi,fo,extra_msg=m)
def Privhex2pubhex(self,name,f1,f2,f3): # from Hex2wif
addr = read_from_file(f3).strip()
self.run_cmd_out(name,addr,kwargs='compressed=1',fn_idx=3)
self.run_cmd_out(name,addr,kwargs='compressed=1',fn_idx=3) # what about uncompressed?
def Pubhex2redeem_script(self,name,f1,f2,f3): # from above
addr = read_from_file(f3).strip()
self.run_cmd_out(name,addr,fn_idx=3)
@ -406,8 +406,8 @@ class MMGenToolTestSuite(object):
# RPC
def Addrfile_chksum(self,name):
fn = os.path.join(cfg['refdir'],cfg['addrfile'])
self.run_cmd_out(name,fn,literal=True,chkdata=cfg['addrfile_chk'])
fn = os.path.join(cfg['refdir'],cfg['addrfile'].format(('','.testnet')[g.testnet]))
self.run_cmd_out(name,fn,literal=True,chkdata=cfg['addrfile_chk'][g.testnet])
def Getbalance(self,name):
self.run_cmd_out(name,literal=True)
def Listaddresses(self,name):
@ -415,7 +415,7 @@ class MMGenToolTestSuite(object):
def Twview(self,name):
self.run_cmd_out(name,literal=True)
def Txview(self,name):
fn = os.path.join(cfg['refdir'],cfg['txfile'])
fn = os.path.join(cfg['refdir'],cfg['txfile'].format(('','.testnet')[g.testnet]))
self.run_cmd_out(name,fn,literal=True)
# main()