whitespace: proto.btc

This commit is contained in:
The MMGen Project 2024-10-18 10:32:08 +00:00
commit 4a0a199e85
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
26 changed files with 428 additions and 410 deletions

View file

@ -25,14 +25,14 @@ class BitcoinTwAddrData(TwAddrData):
"""
}
async def get_tw_data(self,twctl=None):
async def get_tw_data(self, twctl=None):
self.cfg._util.vmsg('Getting address data from tracking wallet')
c = self.rpc
if 'label_api' in c.caps:
accts = await c.call('listlabels')
ll = await c.batch_call('getaddressesbylabel',[(k,) for k in accts])
ll = await c.batch_call('getaddressesbylabel', [(k,) for k in accts])
alists = [list(a.keys()) for a in ll]
else:
accts = await c.call('listaccounts',0,True)
alists = await c.batch_call('getaddressesbyaccount',[(k,) for k in accts])
return list(zip(accts,alists))
accts = await c.call('listaccounts', 0, True)
alists = await c.batch_call('getaddressesbyaccount', [(k,) for k in accts])
return list(zip(accts, alists))

View file

@ -30,14 +30,14 @@ class compressed(p2pkh):
class segwit(addr_generator.base):
@check_data
def to_addr(self,data):
def to_addr(self, data):
return self.proto.pubhash2segwitaddr(hash160(data.pubkey))
def to_segwit_redeem_script(self,data): # NB: returns hex
def to_segwit_redeem_script(self, data): # NB: returns hex
return self.proto.pubhash2redeem_script(hash160(data.pubkey)).hex()
class bech32(addr_generator.base):
@check_data
def to_addr(self,data):
def to_addr(self, data):
return self.proto.pubhash2bech32addr(hash160(data.pubkey))

View file

@ -17,7 +17,7 @@ import hashlib
b58a = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
def hash160(in_bytes): # OP_HASH160
return hashlib.new('ripemd160',hashlib.sha256(in_bytes).digest()).digest()
return hashlib.new('ripemd160', hashlib.sha256(in_bytes).digest()).digest()
def hash256(in_bytes): # OP_HASH256
return hashlib.sha256(hashlib.sha256(in_bytes).digest()).digest()
@ -35,13 +35,13 @@ def b58chk_encode(in_bytes):
while n:
yield b58a[n % 58]
n //= 58
return ('1' * lzeroes) + ''.join(do_enc(int.from_bytes(in_bytes+hash256(in_bytes)[:4],'big')))[::-1]
return ('1' * lzeroes) + ''.join(do_enc(int.from_bytes(in_bytes+hash256(in_bytes)[:4], 'big')))[::-1]
def b58chk_decode(s):
lzeroes = len(s) - len(s.lstrip('1'))
res = sum(b58a.index(ch) * 58**n for n,ch in enumerate(s[::-1]))
res = sum(b58a.index(ch) * 58**n for n, ch in enumerate(s[::-1]))
bl = res.bit_length()
out = b'\x00' * lzeroes + res.to_bytes(bl//8 + bool(bl%8),'big')
out = b'\x00' * lzeroes + res.to_bytes(bl//8 + bool(bl%8), 'big')
if out[-4:] != hash256(out[:-4])[:4]:
raise ValueError('b58chk_decode(): incorrect checksum')
return out[:-4]

View file

@ -16,7 +16,7 @@ import os
from ...cfg import gc
from ...util import list_gen
from ...daemon import CoinDaemon,_nw,_dd
from ...daemon import CoinDaemon, _nw, _dd
class bitcoin_core_daemon(CoinDaemon):
daemon_data = _dd('Bitcoin Core', 270100, '27.1.0')
@ -28,15 +28,15 @@ class bitcoin_core_daemon(CoinDaemon):
cfg_file = 'bitcoin.conf'
nonstd_datadir = False
datadirs = {
'linux': [gc.home_dir,'.bitcoin'],
'linux': [gc.home_dir, '.bitcoin'],
'darwin': [gc.home_dir, 'Library', 'Application Support', 'Bitcoin'],
'win32': [os.getenv('APPDATA'),'Bitcoin']
'win32': [os.getenv('APPDATA'), 'Bitcoin']
}
avail_opts = ('no_daemonize', 'online', 'bdb_wallet')
def init_datadir(self):
if self.network == 'regtest' and not self.test_suite:
return os.path.join( self.cfg.data_dir_root, 'regtest', self.cfg.coin.lower() )
return os.path.join(self.cfg.data_dir_root, 'regtest', self.cfg.coin.lower())
else:
return super().init_datadir()
@ -48,11 +48,11 @@ class bitcoin_core_daemon(CoinDaemon):
'mainnet': '',
'testnet': self.testnet_dir,
'regtest': 'regtest',
}[self.network] )
}[self.network])
@property
def auth_cookie_fn(self):
return os.path.join(self.network_datadir,'.cookie')
return os.path.join(self.network_datadir, '.cookie')
def init_subclass(self):
@ -85,15 +85,15 @@ class bitcoin_core_daemon(CoinDaemon):
['--addresstype=bech32', self.coin == 'LTC' and self.network == 'regtest'],
)
self.lockfile = os.path.join(self.network_datadir,'.cookie')
self.lockfile = os.path.join(self.network_datadir, '.cookie')
@property
def state(self):
cp = self.cli('getblockcount',silent=True)
cp = self.cli('getblockcount', silent=True)
err = cp.stderr.decode()
if ("error: couldn't connect" in err
or "error: Could not connect" in err
or "does not exist" in err ):
or "does not exist" in err):
# regtest has no cookie file, so test will always fail
ret = 'busy' if (self.lockfile and os.path.exists(self.lockfile)) else 'stopped'
elif cp.returncode == 0:
@ -108,19 +108,19 @@ class bitcoin_core_daemon(CoinDaemon):
def stop_cmd(self):
return self.cli_cmd('stop')
def set_comment_args(self,rpc,coinaddr,lbl):
def set_comment_args(self, rpc, coinaddr, lbl):
if 'label_api' in rpc.caps:
return ('setlabel',coinaddr,lbl)
return ('setlabel', coinaddr, lbl)
else:
# NOTE: this works because importaddress() removes the old account before
# associating the new account with the address.
# RPC args: addr,label,rescan[=true],p2sh[=none]
return ('importaddress',coinaddr,lbl,False)
# RPC args: addr, label, rescan[=true], p2sh[=none]
return ('importaddress', coinaddr, lbl, False)
def estimatefee_args(self,rpc):
def estimatefee_args(self, rpc):
return (self.cfg.fee_estimate_confs,)
def sigfail_errmsg(self,e):
def sigfail_errmsg(self, e):
return e.args[0]
class bitcoin_cash_node_daemon(bitcoin_core_daemon):
@ -131,24 +131,24 @@ class bitcoin_cash_node_daemon(bitcoin_core_daemon):
cfg_file_hdr = '# Bitcoin Cash Node config file\n'
nonstd_datadir = True
datadirs = {
'linux': [gc.home_dir,'.bitcoin-bchn'],
'linux': [gc.home_dir, '.bitcoin-bchn'],
'darwin': [gc.home_dir, 'Library', 'Application Support', 'Bitcoin-Cash-Node'],
'win32': [os.getenv('APPDATA'),'Bitcoin-Cash-Node']
'win32': [os.getenv('APPDATA'), 'Bitcoin-Cash-Node']
}
def set_comment_args(self,rpc,coinaddr,lbl):
# bitcoin-{abc,bchn} 'setlabel' RPC is broken, so use old 'importaddress' method to set label
def set_comment_args(self, rpc, coinaddr, lbl):
# bitcoin-{abc, bchn} 'setlabel' RPC is broken, so use old 'importaddress' method to set label
# Broken behavior: new label is set OK, but old label gets attached to another address
return ('importaddress',coinaddr,lbl,False)
return ('importaddress', coinaddr, lbl, False)
def estimatefee_args(self,rpc):
def estimatefee_args(self, rpc):
return () if rpc.daemon_version >= 190100 else (self.cfg.fee_estimate_confs,)
def sigfail_errmsg(self,e):
def sigfail_errmsg(self, e):
return (
'This is not the BCH chain.\nRe-run the script without the --coin=bch option.'
if 'Invalid sighash param' in e.args[0] else
e.args[0] )
e.args[0])
class litecoin_core_daemon(bitcoin_core_daemon):
# v0.21.2rc5 crashes when mining more than 431 blocks in regtest mode:
@ -161,7 +161,7 @@ class litecoin_core_daemon(bitcoin_core_daemon):
cfg_file = 'litecoin.conf'
cfg_file_hdr = '# Litecoin Core config file\n'
datadirs = {
'linux': [gc.home_dir,'.litecoin'],
'linux': [gc.home_dir, '.litecoin'],
'darwin': [gc.home_dir, 'Library', 'Application Support', 'Litecoin'],
'win32': [os.getenv('APPDATA'),'Litecoin']
'win32': [os.getenv('APPDATA'), 'Litecoin']
}

View file

@ -12,9 +12,9 @@
proto.btc.misc: miscellaneous functions for Bitcoin base protocol
"""
from ...util import msg,msg_r
from ...util import msg, msg_r
async def scantxoutset(cfg,rpc,descriptor_list):
async def scantxoutset(cfg, rpc, descriptor_list):
import asyncio
@ -23,7 +23,7 @@ async def scantxoutset(cfg,rpc,descriptor_list):
'scantxoutset',
'start',
descriptor_list,
timeout = 720 ) # call may take several minutes to complete
timeout = 720) # call may take several minutes to complete
async def do_status():
@ -34,23 +34,23 @@ async def scantxoutset(cfg,rpc,descriptor_list):
while True:
await asyncio.sleep(sleep_secs)
res = await rpc.call('scantxoutset','status')
res = await rpc.call('scantxoutset', 'status')
if res:
msg_r(m + f'{res["progress"]}% completed ')
if task1.done():
msg(m + '100% completed')
return
res = await rpc.call('scantxoutset','status')
res = await rpc.call('scantxoutset', 'status')
if res and res.get('progress'):
msg_r('Aborting scan in progress...')
await rpc.call('scantxoutset','abort')
await rpc.call('scantxoutset', 'abort')
await asyncio.sleep(1)
msg('done')
if rpc.backend.name == 'aiohttp':
task1 = asyncio.create_task( do_scan() )
task2 = asyncio.create_task( do_status() )
task1 = asyncio.create_task(do_scan())
task2 = asyncio.create_task(do_status())
ret = await task1
await task2
else:

View file

@ -22,13 +22,13 @@ class coin_msg(coin_msg):
class unsigned(coin_msg.unsigned):
async def do_sign(self,wif,message,msghash_type):
return await self.rpc.call( 'signmessagewithprivkey', wif, message )
async def do_sign(self, wif, message, msghash_type):
return await self.rpc.call('signmessagewithprivkey', wif, message)
class signed_online(coin_msg.signed_online):
async def do_verify(self,addr,sig,message,msghash_type):
return await self.rpc.call( 'verifymessage', addr, sig, message )
async def do_verify(self, addr, sig, message, msghash_type):
return await self.rpc.call('verifymessage', addr, sig, message)
class exported_sigs(coin_msg.exported_sigs,signed_online):
class exported_sigs(coin_msg.exported_sigs, signed_online):
pass

View file

@ -21,20 +21,20 @@ class mainnet(CoinProtocol.Secp256k1): # chainparams.cpp
All Bitcoin code and chain forks inherit from this class
"""
mod_clsname = 'Bitcoin'
network_names = _nw('mainnet','testnet','regtest')
addr_ver_info = { '00': 'p2pkh', '05': 'p2sh' }
network_names = _nw('mainnet', 'testnet', 'regtest')
addr_ver_info = {'00': 'p2pkh', '05': 'p2sh'}
addr_len = 20
wif_ver_num = { 'std': '80' }
mmtypes = ('L','C','S','B')
wif_ver_num = {'std': '80'}
mmtypes = ('L', 'C', 'S', 'B')
dfl_mmtype = 'L'
coin_amt = 'BTCAmt'
max_tx_fee = '0.003'
sighash_type = 'ALL'
block0 = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
forks = [
_finfo(478559,'00000000000000000019f112ec0a9982926f1258cdcc558dd7c3b7e5dc7fa148','BCH',False),
_finfo(478559, '00000000000000000019f112ec0a9982926f1258cdcc558dd7c3b7e5dc7fa148', 'BCH', False),
]
caps = ('rbf','segwit')
caps = ('rbf', 'segwit')
mmcaps = ('rpc', 'rpc_init', 'tw', 'msg')
base_proto = 'Bitcoin'
base_proto_coin = 'BTC'
@ -42,7 +42,7 @@ class mainnet(CoinProtocol.Secp256k1): # chainparams.cpp
# From BIP173: witness version 'n' is stored as 'OP_n'. OP_0 is encoded as 0x00,
# but OP_1 through OP_16 are encoded as 0x51 though 0x60 (81 to 96 in decimal).
witness_vernum_hex = '00'
witness_vernum = int(witness_vernum_hex,16)
witness_vernum = int(witness_vernum_hex, 16)
bech32_hrp = 'bc'
sign_mode = 'daemon'
avg_bdi = int(9.7 * 60) # average block discovery interval (historical)
@ -53,15 +53,15 @@ class mainnet(CoinProtocol.Secp256k1): # chainparams.cpp
ignore_daemon_version = False
max_int = 0xffffffff
def encode_wif(self,privbytes,pubkey_type,compressed): # input is preprocessed hex
def encode_wif(self, privbytes, pubkey_type, compressed): # input is preprocessed hex
assert len(privbytes) == self.privkey_len, f'{len(privbytes)} bytes: incorrect private key length!'
assert pubkey_type in self.wif_ver_bytes, f'{pubkey_type!r}: invalid pubkey_type'
return b58chk_encode(
self.wif_ver_bytes[pubkey_type]
+ privbytes
+ (b'',b'\x01')[bool(compressed)])
+ (b'', b'\x01')[bool(compressed)])
def decode_wif(self,wif):
def decode_wif(self, wif):
key_data = b58chk_decode(wif)
vlen = self.wif_ver_bytes_len or self.get_wif_ver_bytes_len(key_data)
key = key_data[vlen:]
@ -74,13 +74,13 @@ class mainnet(CoinProtocol.Secp256k1): # chainparams.cpp
return decoded_wif(
sec = key[:self.privkey_len],
pubkey_type = self.wif_ver_bytes_to_pubkey_type[key_data[:vlen]],
compressed = len(key) == self.privkey_len + 1 )
compressed = len(key) == self.privkey_len + 1)
def decode_addr(self, addr):
if 'B' in self.mmtypes and addr[:len(self.bech32_hrp)] == self.bech32_hrp:
from ...contrib import bech32
ret = bech32.decode(self.bech32_hrp,addr)
ret = bech32.decode(self.bech32_hrp, addr)
if ret[0] != self.witness_vernum:
from ...util import msg
@ -115,11 +115,11 @@ class mainnet(CoinProtocol.Secp256k1): # chainparams.cpp
self,
bech32.bech32_encode(
hrp = self.bech32_hrp,
data = [self.witness_vernum] + bech32.convertbits(list(pubhash),8,5)))
data = [self.witness_vernum] + bech32.convertbits(list(pubhash), 8, 5)))
class testnet(mainnet):
addr_ver_info = { '6f': 'p2pkh', 'c4': 'p2sh' }
wif_ver_num = { 'std': 'ef' }
addr_ver_info = {'6f': 'p2pkh', 'c4': 'p2sh'}
wif_ver_num = {'std': 'ef'}
bech32_hrp = 'tb'
class regtest(testnet):

View file

@ -20,16 +20,16 @@
proto.btc.regtest: Coin daemon regression test mode setup and operations
"""
import os,shutil,json
from ...util import msg,gmsg,die,capfirst,suf
import os, shutil, json
from ...util import msg, gmsg, die, capfirst, suf
from ...protocol import init_proto
from ...rpc import rpc_init,json_encoder
from ...rpc import rpc_init, json_encoder
from ...objmethods import MMGenObject
from ...daemon import CoinDaemon
def create_data_dir(cfg,data_dir):
def create_data_dir(cfg, data_dir):
try:
os.stat(os.path.join(data_dir,'regtest'))
os.stat(os.path.join(data_dir, 'regtest'))
except:
pass
else:
@ -39,7 +39,7 @@ def create_data_dir(cfg,data_dir):
f'Delete your existing MMGen regtest setup at {data_dir!r} and create a new one?'):
shutil.rmtree(data_dir)
else:
die(1,'Exiting')
die(1, 'Exiting')
try:
os.makedirs(data_dir)
@ -60,10 +60,19 @@ class MMGenRegtest(MMGenObject):
rpc_user = 'bobandalice'
rpc_password = 'hodltothemoon'
users = ('bob','alice','carol','miner')
coins = ('btc','bch','ltc')
usr_cmds = ('setup','generate','send','start','stop', 'state', 'balances','mempool','cli','wallet_cli')
users = ('bob', 'alice', 'carol', 'miner')
coins = ('btc', 'bch', 'ltc')
usr_cmds = (
'setup',
'generate',
'send',
'start',
'stop',
'state',
'balances',
'mempool',
'cli',
'wallet_cli')
bdb_hdseed = 'beadcafe' * 8
bdb_miner_wif = 'cTyMdQ2BgfAsjopRVZrj7AoEGp97pKfrC2NkqLuwHr4KHfPNAKwp'
bdb_miner_addrs = {
@ -95,7 +104,7 @@ class MMGenRegtest(MMGenObject):
@property
async def miner_addr(self):
if not hasattr(self,'_miner_addr'):
if not hasattr(self, '_miner_addr'):
self._miner_addr = (
self.bdb_miner_addrs[self.coin] if self.bdb_wallet else
await self.rpc_call('getnewaddress', wallet='miner'))
@ -103,7 +112,7 @@ class MMGenRegtest(MMGenObject):
@property
async def miner_wif(self):
if not hasattr(self,'_miner_wif'):
if not hasattr(self, '_miner_wif'):
self._miner_wif = (
self.bdb_miner_wif if self.bdb_wallet else
await self.rpc_call('dumpprivkey', (await self.miner_addr), wallet='miner'))
@ -112,16 +121,16 @@ class MMGenRegtest(MMGenObject):
def create_hdseed_wif(self):
from ...tool.api import tool_api
t = tool_api(self.cfg)
t.init_coin(self.proto.coin,self.proto.network)
t.init_coin(self.proto.coin, self.proto.network)
t.addrtype = 'compressed' if self.proto.coin == 'BCH' else 'bech32'
return t.hex2wif(self.bdb_hdseed)
async def generate(self,blocks=1,silent=False):
async def generate(self, blocks=1, silent=False):
blocks = int(blocks)
if self.d.state == 'stopped':
die(1,'Regtest daemon is not running')
die(1, 'Regtest daemon is not running')
self.d.wait_for_state('ready')
@ -130,15 +139,15 @@ class MMGenRegtest(MMGenObject):
'generatetoaddress',
blocks,
await self.miner_addr,
wallet = 'miner' )
wallet = 'miner')
if len(out) != blocks:
die(4,'Error generating blocks')
die(4, 'Error generating blocks')
if not silent:
gmsg(f'Mined {blocks} block{suf(blocks)}')
async def create_wallet(self,user):
async def create_wallet(self, user):
return await (await self.rpc).icall(
'createwallet',
wallet_name = user,
@ -157,13 +166,13 @@ class MMGenRegtest(MMGenObject):
if self.d.state != 'stopped':
await self.rpc_call('stop')
create_data_dir( self.cfg, self.d.datadir )
create_data_dir(self.cfg, self.d.datadir)
gmsg(f'Starting {self.coin.upper()} regtest setup')
self.d.start(silent=True)
for user in ('miner','bob','alice'):
for user in ('miner', 'bob', 'alice'):
gmsg(f'Creating {capfirst(user)}’s tracking wallet')
await self.create_wallet(user)
@ -194,18 +203,18 @@ class MMGenRegtest(MMGenObject):
msg('Stopping regtest daemon')
await self.rpc_call('stop')
def init_daemon(self,reindex=False):
def init_daemon(self, reindex=False):
if reindex:
self.d.usr_coind_args.append('--reindex')
async def start_daemon(self,reindex=False,silent=True):
async def start_daemon(self, reindex=False, silent=True):
self.init_daemon(reindex=reindex)
self.d.start(silent=silent)
for user in ('miner','bob','alice'):
for user in ('miner', 'bob', 'alice'):
msg(f'Loading {capfirst(user)}’s wallet')
await self.rpc_call('loadwallet',user,start_daemon=False)
await self.rpc_call('loadwallet', user, start_daemon=False)
async def rpc_call(self,*args,wallet=None,start_daemon=True):
async def rpc_call(self, *args, wallet=None, start_daemon=True):
if start_daemon and self.d.state == 'stopped':
await self.start_daemon()
return await (await self.rpc).call(*args, wallet=wallet)
@ -228,41 +237,41 @@ class MMGenRegtest(MMGenObject):
async def balances(self):
bal = {}
users = ('bob','alice')
users = ('bob', 'alice')
for user in users:
out = await self.rpc_call('listunspent',0,wallet=user)
out = await self.rpc_call('listunspent', 0, wallet=user)
bal[user] = sum(self.proto.coin_amt(e['amount']) for e in out)
fs = '{:<16} {:18.8f}'
for user in users:
msg(fs.format(user.capitalize()+"'s balance:",bal[user]))
msg(fs.format('Total balance:',sum(v for k,v in bal.items())))
msg(fs.format(user.capitalize()+"'s balance:", bal[user]))
msg(fs.format('Total balance:', sum(v for k, v in bal.items())))
async def send(self,addr,amt):
async def send(self, addr, amt):
gmsg(f'Sending {amt} miner {self.d.coin} to address {addr}')
await self.rpc_call('sendtoaddress',addr,str(amt),wallet='miner')
await self.rpc_call('sendtoaddress', addr, str(amt), wallet='miner')
await self.generate(1)
async def mempool(self):
await self.cli('getrawmempool')
async def cli(self,*args):
async def cli(self, *args):
ret = await self.rpc_call(*cliargs_convert(args))
print(ret if isinstance(ret,str) else json.dumps(ret,cls=json_encoder,indent=4))
print(ret if isinstance(ret, str) else json.dumps(ret, cls=json_encoder, indent=4))
async def wallet_cli(self,wallet,*args):
ret = await self.rpc_call(*cliargs_convert(args),wallet=wallet)
print(ret if isinstance(ret,str) else json.dumps(ret,cls=json_encoder,indent=4))
async def wallet_cli(self, wallet, *args):
ret = await self.rpc_call(*cliargs_convert(args), wallet=wallet)
print(ret if isinstance(ret, str) else json.dumps(ret, cls=json_encoder, indent=4))
async def cmd(self,args):
ret = getattr(self,args[0])(*args[1:])
async def cmd(self, args):
ret = getattr(self, args[0])(*args[1:])
return (await ret) if type(ret).__name__ == 'coroutine' else ret
async def fork(self,coin): # currently disabled
async def fork(self, coin): # currently disabled
proto = init_proto( self.cfg, coin, False )
proto = init_proto(self.cfg, coin, False)
if not [f for f in proto.forks if f[2] == proto.coin.lower() and f[3] is True]:
die(1,f'Coin {proto.coin} is not a replayable fork of coin {coin}')
die(1, f'Coin {proto.coin} is not a replayable fork of coin {coin}')
gmsg(f'Creating fork from coin {coin} to coin {proto.coin}')
@ -271,7 +280,7 @@ class MMGenRegtest(MMGenObject):
try:
os.stat(source_rt.d.datadir)
except:
die(1,f'Source directory {source_rt.d.datadir!r} does not exist!')
die(1, f'Source directory {source_rt.d.datadir!r} does not exist!')
# stop the source daemon
if source_rt.d.state != 'stopped':
@ -286,9 +295,9 @@ class MMGenRegtest(MMGenObject):
except:
pass
create_data_dir( self.cfg, self.d.datadir )
create_data_dir(self.cfg, self.d.datadir)
os.rmdir(self.d.datadir)
shutil.copytree(source_rt.d.datadir,self.d.datadir,symlinks=True)
shutil.copytree(source_rt.d.datadir, self.d.datadir, symlinks=True)
await self.start_daemon(reindex=True)
await self.rpc_call('stop')

View file

@ -16,9 +16,9 @@ import os
from ...base_obj import AsyncInit
from ...obj import TrackingWalletName
from ...util import ymsg,die,fmt
from ...util import ymsg, die, fmt
from ...fileutil import get_lines_from_file
from ...rpc import RPCClient,auth_data
from ...rpc import RPCClient, auth_data
no_credentials_errmsg = """
Error: no {proto_name} RPC authentication method found
@ -65,23 +65,21 @@ class CallSigs:
"""
return (
'createwallet',
wallet_name, # 1. wallet_name
no_keys, # 2. disable_private_keys
blank, # 3. blank (no keys or seed)
passphrase, # 4. passphrase (empty string for non-encrypted)
False, # 5. avoid_reuse (track address reuse)
descriptors, # 6. descriptors (native descriptor wallet)
load_on_startup # 7. load_on_startup
)
wallet_name, # 1. wallet_name
no_keys, # 2. disable_private_keys
blank, # 3. blank (no keys or seed)
passphrase, # 4. passphrase (empty string for non-encrypted)
False, # 5. avoid_reuse (track address reuse)
descriptors, # 6. descriptors (native descriptor wallet)
load_on_startup) # 7. load_on_startup
def gettransaction(self, txid, include_watchonly, verbose):
return (
'gettransaction',
txid, # 1. transaction id
include_watchonly, # 2. optional, default=true for watch-only wallets, otherwise false
verbose, # 3. optional, default=false -- include a `decoded` field containing
# the decoded transaction (equivalent to RPC decoderawtransaction)
)
verbose) # 3. optional, default=false -- include a `decoded` field containing
# => the decoded transaction (equivalent to RPC decoderawtransaction)
class litecoin_core(bitcoin_core):
@ -97,20 +95,18 @@ class CallSigs:
'createwallet',
wallet_name, # 1. wallet_name
no_keys, # 2. disable_private_keys
blank, # 3. blank (no keys or seed)
)
blank) # 3. blank (no keys or seed)
def gettransaction(self, txid, include_watchonly, verbose):
return (
'gettransaction',
txid, # 1. transaction id
include_watchonly, # 2. optional, default=true for watch-only wallets, otherwise false
)
include_watchonly) # 2. optional, default=true for watch-only wallets, otherwise false
class bitcoin_cash_node(litecoin_core):
pass
class BitcoinRPCClient(RPCClient,metaclass=AsyncInit):
class BitcoinRPCClient(RPCClient, metaclass=AsyncInit):
auth_type = 'basic'
has_auth_cookie = True
@ -123,17 +119,17 @@ class BitcoinRPCClient(RPCClient,metaclass=AsyncInit):
proto,
daemon,
backend,
ignore_wallet ):
ignore_wallet):
self.proto = proto
self.daemon = daemon
self.call_sigs = getattr(CallSigs,daemon.id)(cfg)
self.call_sigs = getattr(CallSigs, daemon.id)(cfg)
self.twname = TrackingWalletName(cfg.regtest_user or cfg.tw_name or self.dfl_twname)
super().__init__(
cfg = cfg,
host = 'localhost' if cfg.test_suite or cfg.network == 'regtest' else (cfg.rpc_host or 'localhost'),
port = daemon.rpc_port )
port = daemon.rpc_port)
self.set_auth()
@ -142,20 +138,20 @@ class BitcoinRPCClient(RPCClient,metaclass=AsyncInit):
self.cached = {}
self.caps = ('full_node',)
for func,cap in (
('setlabel','label_api'),
('getdeploymentinfo','deployment_info'),
('signrawtransactionwithkey','sign_with_key') ):
if len((await self.call('help',func)).split('\n')) > 3:
for func, cap in (
('setlabel', 'label_api'),
('getdeploymentinfo', 'deployment_info'),
('signrawtransactionwithkey', 'sign_with_key')):
if len((await self.call('help', func)).split('\n')) > 3:
self.caps += (cap,)
call_group = [
('getblockcount',()),
('getblockhash',(0,)),
('getnetworkinfo',()),
('getblockchaininfo',()),
('getblockcount', ()),
('getblockhash', (0,)),
('getnetworkinfo', ()),
('getblockchaininfo', ()),
] + (
[('getdeploymentinfo',())] if 'deployment_info' in self.caps else []
[('getdeploymentinfo', ())] if 'deployment_info' in self.caps else []
)
(
@ -165,7 +161,7 @@ class BitcoinRPCClient(RPCClient,metaclass=AsyncInit):
self.cached['blockchaininfo'],
self.cached['deploymentinfo'],
) = (
await self.gathered_call(None,tuple(call_group))
await self.gathered_call(None, tuple(call_group))
) + (
[] if 'deployment_info' in self.caps else [None]
)
@ -174,8 +170,8 @@ class BitcoinRPCClient(RPCClient,metaclass=AsyncInit):
self.daemon_version_str = self.cached['networkinfo']['subversion']
self.chain = self.cached['blockchaininfo']['chain']
tip = await self.call('getblockhash',self.blockcount)
self.cur_date = (await self.call('getblockheader',tip))['time']
tip = await self.call('getblockhash', self.blockcount)
self.cur_date = (await self.call('getblockheader', tip))['time']
if self.chain != 'regtest':
self.chain += 'net'
assert self.chain in self.proto.networks
@ -187,10 +183,10 @@ class BitcoinRPCClient(RPCClient,metaclass=AsyncInit):
for fork in self.proto.forks:
if fork.height is None or self.blockcount < fork.height:
break
if fork.hash != await self.call('getblockhash',fork.height):
die(3,f'Bad block hash at fork block {fork.height}. Is this the {fork.name} chain?')
if fork.hash != await self.call('getblockhash', fork.height):
die(3, f'Bad block hash at fork block {fork.height}. Is this the {fork.name} chain?')
except Exception as e:
die(2,'{!s}\n{c!r} requested, but this is not the {c} chain!'.format(e,c=self.proto.coin))
die(2, '{!s}\n{c!r} requested, but this is not the {c} chain!'.format(e, c=self.proto.coin))
if self.chain == 'mainnet': # skip this for testnet, as Genesis block may change
await check_chainfork_mismatch(block0)
@ -204,7 +200,7 @@ class BitcoinRPCClient(RPCClient,metaclass=AsyncInit):
@property
async def walletinfo(self):
if not hasattr(self,'_walletinfo'):
if not hasattr(self, '_walletinfo'):
self._walletinfo = await self.call('getwalletinfo')
return self._walletinfo
@ -214,17 +210,17 @@ class BitcoinRPCClient(RPCClient,metaclass=AsyncInit):
"""
if self.cfg.network == 'regtest':
from .regtest import MMGenRegtest
user,passwd = (MMGenRegtest.rpc_user, MMGenRegtest.rpc_password)
user, passwd = (MMGenRegtest.rpc_user, MMGenRegtest.rpc_password)
elif self.cfg.rpc_user:
user,passwd = (self.cfg.rpc_user,self.cfg.rpc_password)
user, passwd = (self.cfg.rpc_user, self.cfg.rpc_password)
else:
user,passwd = self.get_daemon_cfg_options(('rpcuser','rpcpassword')).values()
user, passwd = self.get_daemon_cfg_options(('rpcuser', 'rpcpassword')).values()
if not (user and passwd):
user,passwd = (self.daemon.rpc_user,self.daemon.rpc_password)
user, passwd = (self.daemon.rpc_user, self.daemon.rpc_password)
if user and passwd:
self.auth = auth_data(user,passwd)
self.auth = auth_data(user, passwd)
return
if self.has_auth_cookie:
@ -233,12 +229,12 @@ class BitcoinRPCClient(RPCClient,metaclass=AsyncInit):
self.auth = auth_data(*cookie.split(':'))
return
die(1, '\n\n' + fmt(no_credentials_errmsg,strip_char='\t',indent=' ').format(
die(1, '\n\n' + fmt(no_credentials_errmsg, strip_char='\t', indent=' ').format(
proto_name = self.proto.name,
cf_name = (self.proto.is_fork_of or self.proto.name).lower(),
))
def make_host_path(self,wallet):
def make_host_path(self, wallet):
return f'/wallet/{wallet}' if wallet else self.wallet_path
@property
@ -264,41 +260,41 @@ class BitcoinRPCClient(RPCClient,metaclass=AsyncInit):
# Use dirname() to remove 'bob' or 'alice' component
return os.path.join(
(os.path.dirname(self.cfg.data_dir) if self.proto.regtest else self.daemon.datadir),
self.daemon.cfg_file )
self.daemon.cfg_file)
def get_daemon_cfg_options(self,req_keys):
def get_daemon_cfg_options(self, req_keys):
fn = self.get_daemon_cfg_fn()
try:
lines = get_lines_from_file( self.cfg, fn, 'daemon config file', silent=not self.cfg.verbose )
lines = get_lines_from_file(self.cfg, fn, 'daemon config file', silent=not self.cfg.verbose)
except:
self.cfg._util.vmsg(f'Warning: {fn!r} does not exist or is unreadable')
return dict((k,None) for k in req_keys)
return dict((k, None) for k in req_keys)
def gen():
for key in req_keys:
val = None
for l in lines:
if l.startswith(key):
res = l.split('=',1)
res = l.split('=', 1)
if len(res) == 2 and not ' ' in res[1].strip():
val = res[1].strip()
yield (key,val)
yield (key, val)
return dict(gen())
def get_daemon_auth_cookie(self):
fn = self.daemon.auth_cookie_fn
return get_lines_from_file( self.cfg, fn, 'cookie', quiet=True )[0] if os.access(fn,os.R_OK) else ''
return get_lines_from_file(self.cfg, fn, 'cookie', quiet=True)[0] if os.access(fn, os.R_OK) else ''
def info(self,info_id):
def info(self, info_id):
def segwit_is_active():
if 'deployment_info' in self.caps:
return (
self.cached['deploymentinfo']['deployments']['segwit']['active']
or ( self.cfg.test_suite and not self.chain == 'regtest' )
or (self.cfg.test_suite and not self.chain == 'regtest')
)
d = self.cached['blockchaininfo']

View file

@ -14,11 +14,11 @@ proto.btc.tw.addresses: Bitcoin base protocol tracking wallet address list class
from ....tw.addresses import TwAddresses
from ....tw.shared import TwLabel
from ....util import msg,msg_r
from ....util import msg, msg_r
from ....obj import get_obj
from .rpc import BitcoinTwRPC
class BitcoinTwAddresses(TwAddresses,BitcoinTwRPC):
class BitcoinTwAddresses(TwAddresses, BitcoinTwRPC):
has_age = True
prompt_fs_in = [
@ -43,7 +43,7 @@ class BitcoinTwAddresses(TwAddresses,BitcoinTwRPC):
'v':'a_view',
'w':'a_view_detail',
'p':'a_print_detail',
'l':'i_comment_add' }
'l':'i_comment_add'}
async def get_rpc_data(self):
@ -68,8 +68,8 @@ class BitcoinTwAddresses(TwAddresses,BitcoinTwRPC):
msg_r('Getting received funds data...')
# args: 1:minconf, 2:include_empty, 3:include_watchonly, 4:include_immature_coinbase (>=v23.0.0)
for d in await self.rpc.call( 'listreceivedbylabel', 1, True, True ):
label = get_obj( TwLabel, proto=self.proto, text=d['label'] )
for d in await self.rpc.call('listreceivedbylabel', 1, True, True):
label = get_obj(TwLabel, proto=self.proto, text=d['label'])
if label:
assert label.mmid in addrs, f'{label.mmid!r} not found in addrlist!'
addrs[label.mmid]['recvd'] = coin_amt(d['amount'])

View file

@ -23,7 +23,7 @@ class BitcoinTwGetBalance(TwGetBalance):
self.walletinfo = await self.rpc.walletinfo
await super().__init__(cfg, proto, minconf, quiet)
start_labels = ('TOTAL','Non-MMGen','Non-wallet')
start_labels = ('TOTAL', 'Non-MMGen', 'Non-wallet')
conf_cols = {
'unconfirmed': 'Unconfirmed',
'lt_minconf': '<{minconf} confs',
@ -31,9 +31,9 @@ class BitcoinTwGetBalance(TwGetBalance):
}
async def create_data(self):
lbl_id = ('account','label')['label_api' in self.rpc.caps]
for d in await self.rpc.call('listunspent',0):
tw_lbl = get_tw_label(self.proto,d[lbl_id])
lbl_id = ('account', 'label')['label_api' in self.rpc.caps]
for d in await self.rpc.call('listunspent', 0):
tw_lbl = get_tw_label(self.proto, d[lbl_id])
if tw_lbl:
if tw_lbl.mmid.type == 'mmgen':
label = tw_lbl.mmid.obj.sid
@ -50,7 +50,7 @@ class BitcoinTwGetBalance(TwGetBalance):
self.data['TOTAL']['unconfirmed'] += amt
self.data[label]['unconfirmed'] += amt
col_key = ('lt_minconf','ge_minconf')[d['confirmations'] >= self.minconf]
col_key = ('lt_minconf', 'ge_minconf')[d['confirmations'] >= self.minconf]
self.data['TOTAL'][col_key] += amt
self.data[label][col_key] += amt
@ -61,7 +61,7 @@ class BitcoinTwGetBalance(TwGetBalance):
def gen_spendable_warning():
if check_spendable:
for k,v in self.data.items():
for k, v in self.data.items():
if v['spendable']:
yield red(f'Warning: this wallet contains PRIVATE KEYS for {k} outputs!')

View file

@ -12,18 +12,18 @@
proto.btc.tw.ctl: Bitcoin base protocol tracking wallet control class
"""
from ....tw.ctl import TwCtl,write_mode
from ....util import msg,msg_r,rmsg,die,suf,fmt_list
from ....tw.ctl import TwCtl, write_mode
from ....util import msg, msg_r, rmsg, die, suf, fmt_list
class BitcoinTwCtl(TwCtl):
def init_empty(self):
self.data = { 'coin': self.proto.coin, 'addresses': {} }
self.data = {'coin': self.proto.coin, 'addresses': {}}
def upgrade_wallet_maybe(self):
pass
async def rpc_get_balance(self,addr):
async def rpc_get_balance(self, addr):
raise NotImplementedError('not implemented')
@write_mode
@ -34,7 +34,7 @@ class BitcoinTwCtl(TwCtl):
return await self.rpc.call('importaddress', addr, label, rescan)
@write_mode
async def batch_import_address(self,arg_list):
async def batch_import_address(self, arg_list):
if (await self.rpc.walletinfo).get('descriptors'):
from ....contrib.descriptors import descsum_create
return await self.rpc.call(
@ -48,12 +48,12 @@ class BitcoinTwCtl(TwCtl):
return await self.rpc.batch_call('importaddress', arg_list)
@write_mode
async def remove_address(self,addr):
async def remove_address(self, addr):
raise NotImplementedError(f'address removal not implemented for coin {self.proto.coin}')
@write_mode
async def set_label(self,coinaddr,lbl):
args = self.rpc.daemon.set_comment_args( self.rpc, coinaddr, lbl )
async def set_label(self, coinaddr, lbl):
args = self.rpc.daemon.set_comment_args(self.rpc, coinaddr, lbl)
try:
await self.rpc.call(*args)
return True
@ -62,80 +62,80 @@ class BitcoinTwCtl(TwCtl):
return False
@write_mode
async def rescan_blockchain(self,start,stop):
async def rescan_blockchain(self, start, stop):
start = start or 0
endless = stop is None
CR = '\n' if self.cfg.test_suite else '\r'
if not (start >= 0 and (stop if stop is not None else start) >= start):
die(1,f'{start} {stop}: invalid range')
die(1, f'{start} {stop}: invalid range')
async def do_scan(chunks,tip):
async def do_scan(chunks, tip):
res = None
for a,b in chunks:
for a, b in chunks:
msg_r(f'{CR}Scanning blocks {a}-{b} ')
res = await self.rpc.call('rescanblockchain',a,b,timeout=7200)
res = await self.rpc.call('rescanblockchain', a, b, timeout=7200)
if res['start_height'] != a or res['stop_height'] != b:
die(1,f'\nAn error occurred in block range {a}-{b}')
die(1, f'\nAn error occurred in block range {a}-{b}')
msg('')
return b if res else tip
def gen_chunks(start,stop,tip):
def gen_chunks(start, stop, tip):
n = start
if endless:
stop = tip
elif stop > tip:
die(1,f'{stop}: stop value is higher than chain tip')
die(1, f'{stop}: stop value is higher than chain tip')
while n <= stop:
yield ( n, min(n+99,stop) )
yield (n, min(n+99, stop))
n += 100
last_block = await do_scan(gen_chunks(start,stop,self.rpc.blockcount),self.rpc.blockcount)
last_block = await do_scan(gen_chunks(start, stop, self.rpc.blockcount), self.rpc.blockcount)
if endless:
tip = await self.rpc.call('getblockcount')
while last_block < tip:
last_block = await do_scan(gen_chunks(last_block+1,tip,tip),tip)
last_block = await do_scan(gen_chunks(last_block+1, tip, tip), tip)
tip = await self.rpc.call('getblockcount')
msg('Done')
@write_mode
async def rescan_address(self,addrspec):
async def rescan_address(self, addrspec):
res = await self.resolve_address(addrspec)
if not res:
return False
return await self.rescan_addresses([res.coinaddr])
@write_mode
async def rescan_addresses(self,coin_addrs):
async def rescan_addresses(self, coin_addrs):
from ..misc import scantxoutset
res = await scantxoutset( self.cfg, self.rpc, [f'addr({addr})' for addr in coin_addrs] )
res = await scantxoutset(self.cfg, self.rpc, [f'addr({addr})' for addr in coin_addrs])
if not res['success']:
msg('UTXO scanning failed or was interrupted')
return False
elif res['unspents']:
blocks = sorted({ i['height'] for i in res['unspents'] })
blocks = sorted({i['height'] for i in res['unspents']})
msg('Found {} unspent output{} in {} block{}'.format(
len(res['unspents']),
suf(res['unspents']),
len(blocks),
suf(blocks) ))
self.cfg._util.vmsg(f'Blocks to rescan: {fmt_list(blocks,fmt="bare")}')
suf(blocks)))
self.cfg._util.vmsg(f'Blocks to rescan: {fmt_list(blocks, fmt="bare")}')
CR = '\n' if self.cfg.test_suite else '\r'
for n,block in enumerate(blocks):
for n, block in enumerate(blocks):
msg_r(f'{CR}Rescanning block: {block} ({n+1}/{len(blocks)})')
# httplib seems to require fresh connection here, so specify timeout
await self.rpc.call('rescanblockchain',block,block,timeout=60)
await self.rpc.call('rescanblockchain', block, block, timeout=60)
msg(f'\nAddress balance{suf(coin_addrs)} updated successfully')
return True
else:
msg('Address has no balance' if len(coin_addrs) == 1 else
'Addresses have no balances' )
'Addresses have no balances')
return True
async def get_label_addr_pairs(self):

View file

@ -24,13 +24,13 @@ class BitcoinTwJSON(TwJSON):
@property
def mappings_json(self):
return self.json_dump([(e.mmgen_id,e.address) for e in self.entries])
return self.json_dump([(e.mmgen_id, e.address) for e in self.entries])
@property
def num_entries(self):
return len(self.entries)
class Import(TwJSON.Import,Base):
class Import(TwJSON.Import, Base):
info_msg = """
This utility will create a new tracking wallet, import the addresses from
@ -63,48 +63,48 @@ class BitcoinTwJSON(TwJSON):
entries_in = [self.entry_tuple_in(*e) for e in self.data['data']['entries']]
return sorted(
[self.entry_tuple(
TwMMGenID(self.proto,d.mmgen_id),
TwMMGenID(self.proto, d.mmgen_id),
d.address,
getattr(d,'amount',None),
getattr(d, 'amount', None),
d.comment)
for d in entries_in],
key = lambda x: x.mmgen_id.sort_key )
key = lambda x: x.mmgen_id.sort_key)
async def do_import(self,batch):
import_tuple = namedtuple('import_data',['addr','twmmid','comment'])
async def do_import(self, batch):
import_tuple = namedtuple('import_data', ['addr', 'twmmid', 'comment'])
await self.twctl.import_address_common(
[import_tuple(e.address, e.mmgen_id, e.comment) for e in self.entries],
batch = batch )
batch = batch)
return [e.address for e in self.entries]
class Export(TwJSON.Export,Base):
class Export(TwJSON.Export, Base):
@property
async def addrlist(self):
if not hasattr(self,'_addrlist'):
if not hasattr(self, '_addrlist'):
if self.prune:
from .prune import TwAddressesPrune
self._addrlist = al = await TwAddressesPrune(
self.cfg,
self.proto,
get_data = True,
warn_used = self.warn_used )
warn_used = self.warn_used)
await al.view_filter_and_sort()
self.pruned = al.do_prune()
else:
from .addresses import TwAddresses
self._addrlist = await TwAddresses(self.cfg,self.proto,get_data=True)
self._addrlist = await TwAddresses(self.cfg, self.proto, get_data=True)
return self._addrlist
async def get_entries(self): # TODO: include 'received' field
return sorted(
[self.entry_tuple(d.twmmid, d.addr, d.amt, d.comment)
for d in (await self.addrlist).data],
key = lambda x: x.mmgen_id.sort_key )
key = lambda x: x.mmgen_id.sort_key)
@property
async def entries_out(self):
return [[getattr(d,k) for k in self.keys] for d in self.entries]
return [[getattr(d, k) for k in self.keys] for d in self.entries]
@property
async def total(self):

View file

@ -15,7 +15,7 @@ proto.btc.tw.prune: Bitcoin base protocol tracking wallet address list prune cla
from ....tw.prune import TwAddressesPrune
from .addresses import BitcoinTwAddresses
class BitcoinTwAddressesPrune(BitcoinTwAddresses,TwAddressesPrune):
class BitcoinTwAddressesPrune(BitcoinTwAddresses, TwAddressesPrune):
prompt_fs_in = [
'Sort options: [a]mt, [A]ge, [M]mgen addr, [r]everse',
@ -40,4 +40,4 @@ class BitcoinTwAddressesPrune(BitcoinTwAddresses,TwAddressesPrune):
'w':'a_view_detail',
'p':'a_prune',
'u':'a_unprune',
'c':'a_clear_prune_list' }
'c':'a_clear_prune_list'}

View file

@ -23,32 +23,32 @@ class BitcoinTwRPC(TwRPC):
async def get_label_addr_pairs(self):
"""
Get all the accounts in the tracking wallet and their associated addresses.
Returns list of (label,address) tuples.
Returns list of (label, address) tuples.
"""
def check_dup_mmid(acct_labels):
mmid_prev,err = None,False
mmid_prev, err = None, False
for mmid in sorted(label.mmid for label in acct_labels if label):
if mmid == mmid_prev:
err = True
msg(f'Duplicate MMGen ID ({mmid}) discovered in tracking wallet!\n')
mmid_prev = mmid
if err:
die(4,'Tracking wallet is corrupted!')
die(4, 'Tracking wallet is corrupted!')
async def get_acct_list():
if 'label_api' in self.rpc.caps:
return await self.rpc.call('listlabels')
else:
return (await self.rpc.call('listaccounts',0,True)).keys()
return (await self.rpc.call('listaccounts', 0, True)).keys()
async def get_acct_addrs(acct_list):
if 'label_api' in self.rpc.caps:
return [list(a.keys())
for a in await self.rpc.batch_call('getaddressesbylabel',[(k,) for k in acct_list])]
for a in await self.rpc.batch_call('getaddressesbylabel', [(k,) for k in acct_list])]
else:
return await self.rpc.batch_call('getaddressesbyaccount',[(a,) for a in acct_list])
return await self.rpc.batch_call('getaddressesbyaccount', [(a,) for a in acct_list])
acct_labels = [get_tw_label(self.proto,a) for a in await get_acct_list()]
acct_labels = [get_tw_label(self.proto, a) for a in await get_acct_list()]
if not acct_labels:
return []
@ -57,23 +57,23 @@ class BitcoinTwRPC(TwRPC):
acct_addrs = await get_acct_addrs(acct_labels)
for n,a in enumerate(acct_addrs):
for n, a in enumerate(acct_addrs):
if len(a) != 1:
raise ValueError(f'{a}: label {acct_labels[n]!r} has != 1 associated address!')
return [label_addr_pair(label, CoinAddr(self.proto,addrs[0]))
return [label_addr_pair(label, CoinAddr(self.proto, addrs[0]))
for label, addrs in zip(acct_labels, acct_addrs)]
async def get_unspent_by_mmid(self,minconf=1,mmid_filter=[]):
async def get_unspent_by_mmid(self, minconf=1, mmid_filter=[]):
"""
get unspent outputs in tracking wallet, compute balances per address
and return a dict with elements { 'twmmid': {'addr','lbl','amt'} }
and return a dict with elements {'twmmid': {'addr', 'lbl', 'amt'}}
"""
data = {}
lbl_id = ('account','label')['label_api' in self.rpc.caps]
lbl_id = ('account', 'label')['label_api' in self.rpc.caps]
amt0 = self.proto.coin_amt('0')
for d in await self.rpc.call('listunspent',0):
for d in await self.rpc.call('listunspent', 0):
if not lbl_id in d:
continue # skip coinbase outputs with missing account
@ -81,7 +81,7 @@ class BitcoinTwRPC(TwRPC):
if d['confirmations'] < minconf:
continue
label = get_tw_label(self.proto,d[lbl_id])
label = get_tw_label(self.proto, d[lbl_id])
if label:
lm = label.mmid
@ -89,10 +89,10 @@ class BitcoinTwRPC(TwRPC):
continue
if lm in data:
if data[lm]['addr'] != d['address']:
die(2,'duplicate {} address ({}) for this MMGen address! ({})'.format(
die(2, 'duplicate {} address ({}) for this MMGen address! ({})'.format(
self.proto.coin,
d['address'],
data[lm]['addr'] ))
data[lm]['addr']))
else:
lm.confs = d['confirmations']
lm.txid = d['txid']
@ -101,7 +101,7 @@ class BitcoinTwRPC(TwRPC):
data[lm] = {
'amt': amt0,
'lbl': label,
'addr': CoinAddr(self.proto,d['address']) }
'addr': CoinAddr(self.proto, d['address'])}
data[lm]['amt'] += self.proto.coin_amt(d['amount'])

View file

@ -14,19 +14,19 @@ proto.btc.tw.txhistory: Bitcoin base protocol tracking wallet transaction histor
from collections import namedtuple
from ....tw.txhistory import TwTxHistory
from ....tw.shared import get_tw_label,TwMMGenID
from ....tw.shared import get_tw_label, TwMMGenID
from ....addr import CoinAddr
from ....util import msg,msg_r
from ....color import nocolor,red,pink,gray
from ....obj import TwComment,CoinTxID,Int
from ....util import msg, msg_r
from ....color import nocolor, red, pink, gray
from ....obj import TwComment, CoinTxID, Int
from .rpc import BitcoinTwRPC
class BitcoinTwTransaction:
def __init__(self,parent,proto,rpc,
def __init__(self, parent, proto, rpc,
idx, # unique numeric identifier of this transaction in listing
unspent_info, # addrs in wallet with balances: { 'mmid': {'addr','comment','amt'} }
mm_map, # all addrs in wallet: ['addr', ['twmmid','comment']]
unspent_info, # addrs in wallet with balances: {'mmid': {'addr', 'comment', 'amt'}}
mm_map, # all addrs in wallet: ['addr', ['twmmid', 'comment']]
tx, # the decoded transaction data
wallet_vouts, # list of ints - wallet-related vouts
prevouts, # list of (txid,vout) pairs
@ -41,21 +41,21 @@ class BitcoinTwTransaction:
self.tx = tx
def gen_prevouts_data():
_d = namedtuple('prevout_data',['txid','data'])
_d = namedtuple('prevout_data', ['txid', 'data'])
for tx in prevout_txs:
for e in prevouts:
if e.txid == tx['txid']:
yield _d( e.txid, tx['vout'][e.vout] )
yield _d(e.txid, tx['vout'][e.vout])
def gen_wallet_vouts_data():
_d = namedtuple('wallet_vout_data',['txid','data'])
_d = namedtuple('wallet_vout_data', ['txid', 'data'])
txid = self.tx['txid']
vouts = self.tx['decoded']['vout']
for n in wallet_vouts:
yield _d( txid, vouts[n] )
yield _d(txid, vouts[n])
def gen_vouts_info(data):
_d = namedtuple('vout_info',['txid','coin_addr','twlabel','data'])
_d = namedtuple('vout_info', ['txid', 'coin_addr', 'twlabel', 'data'])
def gen():
for d in data:
addr = d.data['scriptPubKey'].get('address') or d.data['scriptPubKey']['addresses'][0]
@ -63,7 +63,7 @@ class BitcoinTwTransaction:
txid = d.txid,
coin_addr = addr,
twlabel = mm_map[addr] if (addr in mm_map and mm_map[addr].twmmid) else None,
data = d.data )
data = d.data)
return sorted(
gen(),
# if address is not MMGen, ignore address and sort by TxID + vout only
@ -79,7 +79,7 @@ class BitcoinTwTransaction:
if e.twlabel:
mmid = e.twlabel.twmmid
yield (
(mmid if mmid.type == 'mmgen' else mmid.split(':',1)[1]) +
(mmid if mmid.type == 'mmgen' else mmid.split(':', 1)[1]) +
('*' if mmid in self.unspent_info else '')
)
else:
@ -93,15 +93,15 @@ class BitcoinTwTransaction:
find the most relevant comment for tabular (squeezed) display
"""
def vouts_labels(src):
return [ d.twlabel.comment for d in self.vouts_info[src] if d.twlabel and d.twlabel.comment ]
return [d.twlabel.comment for d in self.vouts_info[src] if d.twlabel and d.twlabel.comment]
ret = vouts_labels('outputs') or vouts_labels('inputs')
return ret[0] if ret else TwComment('')
coin_amt = self.proto.coin_amt
# 'outputs' refers to wallet-related outputs only
self.vouts_info = {
'inputs': gen_vouts_info( gen_prevouts_data() ),
'outputs': gen_vouts_info( gen_wallet_vouts_data() )
'inputs': gen_vouts_info(gen_prevouts_data()),
'outputs': gen_vouts_info(gen_wallet_vouts_data())
}
self.max_addrlen = {
'inputs': max(len(addr) for addr in gen_all_addrs('inputs')),
@ -123,13 +123,13 @@ class BitcoinTwTransaction:
self.time = self.tx.get('blocktime') or self.tx['time']
self.time_received = self.tx.get('timereceived')
def blockheight_disp(self,color):
def blockheight_disp(self, color):
return (
# old/altcoin daemons return no 'blockheight' field, so use confirmations instead
Int( self.rpc.blockcount + 1 - self.confirmations ).hl(color=color)
if self.confirmations > 0 else None )
Int(self.rpc.blockcount + 1 - self.confirmations).hl(color=color)
if self.confirmations > 0 else None)
def age_disp(self,age_fmt,width,color):
def age_disp(self, age_fmt, width, color):
if age_fmt == 'confs':
ret_str = str(self.confirmations).ljust(width)
return gray(ret_str) if self.confirmations < 0 and color else ret_str
@ -138,17 +138,17 @@ class BitcoinTwTransaction:
ret_str = str(ret).ljust(width)
return gray(ret_str) if ret < 0 and color else ret_str
else:
return self.parent.date_formatter[age_fmt](self.rpc,self.tx.get('blocktime',0))
return self.parent.date_formatter[age_fmt](self.rpc, self.tx.get('blocktime', 0))
def txdate_disp(self,age_fmt):
return self.parent.date_formatter[age_fmt](self.rpc,self.time)
def txdate_disp(self, age_fmt):
return self.parent.date_formatter[age_fmt](self.rpc, self.time)
def txid_disp(self,color,width=None):
return self.txid.hl(color=color) if width is None else self.txid.truncate(width=width,color=color)
def txid_disp(self, color, width=None):
return self.txid.hl(color=color) if width is None else self.txid.truncate(width=width, color=color)
def vouts_list_disp(self, src, color, indent, addr_view_pref):
fs1,fs2 = {
fs1, fs2 = {
'inputs': ('{i},{n} {a} {A}', '{i},{n} {a} {A} {l}'),
'outputs': ( '{n} {a} {A}', '{n} {a} {A} {l}')
}[src]
@ -159,27 +159,27 @@ class BitcoinTwTransaction:
if not mmid:
yield fs1.format(
i = CoinTxID(e.txid).hl(color=color),
n = (nocolor,red)[color](str(e.data['n']).ljust(3)),
n = (nocolor, red)[color](str(e.data['n']).ljust(3)),
a = CoinAddr(self.proto, e.coin_addr).fmt(
addr_view_pref, width=self.max_addrlen[src], color=color),
A = self.proto.coin_amt( e.data['value'] ).fmt(color=color)
A = self.proto.coin_amt(e.data['value']).fmt(color=color)
).rstrip()
else:
bal_star,co = ('*','melon') if mmid in self.unspent_info else ('','brown')
addr_out = mmid if mmid.type == 'mmgen' else mmid.split(':',1)[1]
bal_star, co = ('*', 'melon') if mmid in self.unspent_info else ('', 'brown')
addr_out = mmid if mmid.type == 'mmgen' else mmid.split(':', 1)[1]
yield fs2.format(
i = CoinTxID(e.txid).hl(color=color),
n = (nocolor,red)[color](str(e.data['n']).ljust(3)),
n = (nocolor, red)[color](str(e.data['n']).ljust(3)),
a = TwMMGenID.hl2(
TwMMGenID,
s = '{:{w}}'.format( addr_out + bal_star, w=self.max_addrlen[src] ),
s = '{:{w}}'.format(addr_out + bal_star, w=self.max_addrlen[src]),
color = color,
color_override = co ),
A = self.proto.coin_amt( e.data['value'] ).fmt(color=color),
color_override = co),
A = self.proto.coin_amt(e.data['value']).fmt(color=color),
l = e.twlabel.comment.hl(color=color)
).rstrip()
return f'\n{indent}'.join( gen_output() ).strip()
return f'\n{indent}'.join(gen_output()).strip()
def vouts_disp(self, src, width, color, addr_view_pref):
@ -189,7 +189,7 @@ class BitcoinTwTransaction:
for e in self.vouts_info[src]:
mmid = e.twlabel.twmmid if e.twlabel else None
bal_star,addr_w,co = ('*',16,'melon') if mmid in self.unspent_info else ('',15,'brown')
bal_star, addr_w, co = ('*', 16, 'melon') if mmid in self.unspent_info else ('', 15, 'brown')
if not mmid:
if width and space_left < addr_w:
break
@ -199,16 +199,16 @@ class BitcoinTwTransaction:
mmid_disp = mmid + bal_star
if width and space_left < len(mmid_disp):
break
yield TwMMGenID.hl2( TwMMGenID, s=mmid_disp, color=color, color_override=co )
yield TwMMGenID.hl2(TwMMGenID, s=mmid_disp, color=color, color_override=co)
space_left -= len(mmid_disp)
else:
if width and space_left < addr_w:
break
yield TwMMGenID.hl2(
TwMMGenID,
s = CoinAddr.fmtc( mmid.split(':',1)[1] + bal_star, width=addr_w ),
s = CoinAddr.fmtc(mmid.split(':', 1)[1] + bal_star, width=addr_w),
color = color,
color_override = co )
color_override = co)
space_left -= addr_w
space_left -= 1
@ -216,20 +216,20 @@ class BitcoinTwTransaction:
return ' '.join(gen_output()) + ' ' * (space_left + 1 if width else 0)
def amt_disp(self,show_total_amt):
def amt_disp(self, show_total_amt):
return (
self.outputs_total if show_total_amt else
self.wallet_outputs_total )
self.wallet_outputs_total)
def fee_disp(self,color):
def fee_disp(self, color):
atomic_unit = self.proto.coin_amt.units[0]
return '{} {}'.format(
self.fee.hl(color=color),
(nocolor,pink)[color]('({:,} {}s/byte)'.format(
(nocolor, pink)[color]('({:,} {}s/byte)'.format(
self.fee.to_unit(atomic_unit) // self.vsize,
atomic_unit )) )
atomic_unit)))
class BitcoinTwTxHistory(TwTxHistory,BitcoinTwRPC):
class BitcoinTwTxHistory(TwTxHistory, BitcoinTwRPC):
has_age = True
hdr_lbl = 'transaction history'
@ -259,12 +259,12 @@ class BitcoinTwTxHistory(TwTxHistory,BitcoinTwRPC):
'v':'a_view',
'V':'a_view_detail',
'p':'a_print_squeezed',
'P':'a_print_detail' }
'P':'a_print_detail'}
async def get_rpc_data(self):
blockhash = (
await self.rpc.call( 'getblockhash', self.sinceblock )
if self.sinceblock else '' )
await self.rpc.call('getblockhash', self.sinceblock)
if self.sinceblock else '')
# bitcoin-cli help listsinceblock:
# Arguments:
# 1. blockhash (string, optional) If set, the block hash to list transactions since,
@ -277,14 +277,14 @@ class BitcoinTwTxHistory(TwTxHistory,BitcoinTwRPC):
# 4. include_removed (boolean, optional, default=true) Show transactions that were removed
# due to a reorg in the "removed" array (not guaranteed to work on
# pruned nodes)
return (await self.rpc.call('listsinceblock',blockhash,1,True,False))['transactions']
return (await self.rpc.call('listsinceblock', blockhash, 1, True, False))['transactions']
async def gen_data(self,rpc_data,lbl_id):
async def gen_data(self, rpc_data, lbl_id):
def gen_parsed_data():
for o in rpc_data:
if lbl_id in o:
l = get_tw_label(self.proto,o[lbl_id])
l = get_tw_label(self.proto, o[lbl_id])
else:
assert o['category'] == 'send', f"{o['address']}: {o['category']} != 'send'"
l = None
@ -301,18 +301,18 @@ class BitcoinTwTxHistory(TwTxHistory,BitcoinTwRPC):
from ....rpc import json_encoder
def do_json_dump(*data):
nw = f'{self.proto.coin.lower()}-{self.proto.network}'
for d,fn_stem in data:
with open(f'/tmp/{fn_stem}-{nw}.json','w') as fh:
fh.write(json.dumps(d,cls=json_encoder))
for d, fn_stem in data:
with open(f'/tmp/{fn_stem}-{nw}.json', 'w') as fh:
fh.write(json.dumps(d, cls=json_encoder))
_mmp = namedtuple('mmap_datum',['twmmid','comment'])
_mmp = namedtuple('mmap_datum', ['twmmid', 'comment'])
mm_map = {
i['address']: (
_mmp( TwMMGenID(self.proto,i['twmmid']), TwComment(i['comment']) )
if i['twmmid'] else _mmp(None,None)
_mmp(TwMMGenID(self.proto, i['twmmid']), TwComment(i['comment']))
if i['twmmid'] else _mmp(None, None)
)
for i in data }
for i in data}
if self.sinceblock: # mapping data may be incomplete for inputs, so update from 'listlabels'
mm_map.update(
@ -323,41 +323,41 @@ class BitcoinTwTxHistory(TwTxHistory,BitcoinTwRPC):
msg_r('Getting wallet transactions...')
_wallet_txs = await self.rpc.gathered_icall(
'gettransaction',
[ (i,True,True) for i in {d['txid'] for d in data} ] )
[(i, True, True) for i in {d['txid'] for d in data}])
msg('done')
if not 'decoded' in _wallet_txs[0]:
_decoded_txs = iter(
await self.rpc.gathered_call(
'decoderawtransaction',
[ (d['hex'],) for d in _wallet_txs ] ))
[(d['hex'],) for d in _wallet_txs]))
for tx in _wallet_txs:
tx['decoded'] = next(_decoded_txs)
if self.cfg.debug_tw:
do_json_dump((_wallet_txs, 'wallet-txs'),)
_wip = namedtuple('prevout',['txid','vout'])
_wip = namedtuple('prevout', ['txid', 'vout'])
txdata = [
{
'tx': tx,
'wallet_vouts': sorted({i.vout for i in
[_wip( CoinTxID(d['txid']), d['vout'] ) for d in data]
[_wip(CoinTxID(d['txid']), d['vout']) for d in data]
if i.txid == tx['txid']}),
'prevouts': [_wip( CoinTxID(vin['txid']), vin['vout'] ) for vin in tx['decoded']['vin']]
'prevouts': [_wip(CoinTxID(vin['txid']), vin['vout']) for vin in tx['decoded']['vin']]
}
for tx in _wallet_txs]
_prevout_txids = {i.txid for d in txdata for i in d['prevouts']}
msg_r('Getting input transactions...')
_prevout_txs = await self.rpc.gathered_call('getrawtransaction', [ (i,True) for i in _prevout_txids ])
_prevout_txs = await self.rpc.gathered_call('getrawtransaction', [(i, True) for i in _prevout_txids])
msg('done')
_prevout_txs_dict = dict(zip(_prevout_txids,_prevout_txs))
_prevout_txs_dict = dict(zip(_prevout_txids, _prevout_txs))
for d in txdata:
d['prevout_txs'] = [_prevout_txs_dict[txid] for txid in {i.txid for i in d['prevouts']} ]
d['prevout_txs'] = [_prevout_txs_dict[txid] for txid in {i.txid for i in d['prevouts']}]
if self.cfg.debug_tw:
do_json_dump(
@ -378,4 +378,4 @@ class BitcoinTwTxHistory(TwTxHistory,BitcoinTwRPC):
idx = idx,
unspent_info = unspent_info,
mm_map = mm_map,
**d ) for idx,d in enumerate(txdata) )
**d) for idx, d in enumerate(txdata))

View file

@ -18,7 +18,18 @@ class BitcoinTwUnspentOutputs(TwUnspentOutputs):
class MMGenTwUnspentOutput(TwUnspentOutputs.MMGenTwUnspentOutput):
# required by gen_unspent(); setting valid_attrs explicitly is also more efficient
valid_attrs = {'txid','vout','amt','amt2','comment','twmmid','addr','confs','date','scriptPubKey','skip'}
valid_attrs = {
'txid',
'vout',
'amt',
'amt2',
'comment',
'twmmid',
'addr',
'confs',
'date',
'scriptPubKey',
'skip'}
invalid_attrs = {'proto'}
has_age = True
@ -50,7 +61,7 @@ class BitcoinTwUnspentOutputs(TwUnspentOutputs):
'p':'a_print_detail',
'v':'a_view',
'w':'a_view_detail',
'l':'i_comment_add' }
'l':'i_comment_add'}
async def get_rpc_data(self):
# bitcoin-cli help listunspent:
@ -62,5 +73,5 @@ class BitcoinTwUnspentOutputs(TwUnspentOutputs):
# 5. query_options (json object, optional) JSON with query options
# for now, self.addrs is just an empty list for Bitcoin and friends
add_args = (9999999,self.addrs) if self.addrs else ()
return await self.rpc.call('listunspent',self.minconf,*add_args)
add_args = (9999999, self.addrs) if self.addrs else ()
return await self.rpc.call('listunspent', self.minconf, *add_args)

View file

@ -91,7 +91,7 @@ def DeserializeTX(proto, txhex):
raw_tx = bytearray()
idx = 0
d = { 'version': bytes2int(bshift(4)) }
d = {'version': bytes2int(bshift(4))}
if d['version'] > 0x7fffffff: # version is signed integer
die(3, f"{d['version']}: transaction version greater than maximum allowed value (int32_t)!")
@ -134,7 +134,7 @@ def DeserializeTX(proto, txhex):
bshift(1, skip=True)
continue
txin['witness'] = [
bshift(readVInt(skip=True), skip=True).hex() for item in range(readVInt(skip=True)) ]
bshift(readVInt(skip=True), skip=True).hex() for item in range(readVInt(skip=True))]
else:
d['txid'] = make_txid(tx)
d['witness_size'] = 0

View file

@ -18,21 +18,21 @@ from .new import New
from .completed import Completed
from .unsigned import AutomountUnsigned
class Bump(Completed,New,TxBase.Bump):
class Bump(Completed, New, TxBase.Bump):
desc = 'fee-bumped transaction'
@property
def min_fee(self):
return self.sum_inputs() - self.sum_outputs() + self.relay_fee
def bump_fee(self,idx,fee):
def bump_fee(self, idx, fee):
self.update_output_amt(
idx,
self.sum_inputs() - self.sum_outputs(exclude=idx) - fee
)
def convert_and_check_fee(self,fee,desc):
ret = super().convert_and_check_fee(fee,desc)
def convert_and_check_fee(self, fee, desc):
ret = super().convert_and_check_fee(fee, desc)
if ret is False:
return ret
if ret < self.min_fee:
@ -42,7 +42,7 @@ class Bump(Completed,New,TxBase.Bump):
self.min_fee,
self.fee_abs2rel(self.min_fee),
self.rel_fee_desc,
c = self.coin ))
c = self.coin))
return False
output_amt = self.outputs[self.bump_output_idx].amt
if ret >= output_amt:
@ -50,7 +50,7 @@ class Bump(Completed,New,TxBase.Bump):
ret.hl(),
desc,
output_amt.hl(),
c = self.coin ))
c = self.coin))
return False
return ret

View file

@ -14,10 +14,10 @@ proto.btc.tx.completed: Bitcoin completed transaction class
from ....tx import completed as TxBase
from ....obj import HexStr
from ....util import msg,die
from .base import Base,scriptPubKey2addr
from ....util import msg, die
from .base import Base, scriptPubKey2addr
class Completed(Base,TxBase.Completed):
class Completed(Base, TxBase.Completed):
fn_fee_unit = 'satoshi'
# check signature and witness data
@ -30,33 +30,35 @@ class Completed(Base,TxBase.Completed):
fs = "Hex TX has {} scriptSig but input is of type '{}'!"
for n, ti in enumerate(txins):
mmti = self.inputs[n]
if ti['scriptSig'] == '' or ( len(ti['scriptSig']) == 46 and # native P2WPKH or P2SH-P2WPKH
ti['scriptSig'][:6] == '16' + self.proto.witness_vernum_hex + '14' ):
if ti['scriptSig'] == '' or (len(ti['scriptSig']) == 46 and # native P2WPKH or P2SH-P2WPKH
ti['scriptSig'][:6] == '16' + self.proto.witness_vernum_hex + '14'):
assert 'witness' in ti, 'missing witness'
assert isinstance(ti['witness'],list) and len(ti['witness']) == 2, 'malformed witness'
assert isinstance(ti['witness'], list) and len(ti['witness']) == 2, 'malformed witness'
assert len(ti['witness'][1]) == 66, 'incorrect witness pubkey length'
assert mmti.mmtype == ('S','B')[ti['scriptSig']==''], fs.format('witness-type',mmti.mmtype)
assert mmti.mmtype == ('S', 'B')[ti['scriptSig']==''], fs.format('witness-type', mmti.mmtype)
else: # non-witness
assert mmti.mmtype not in ('S','B'), fs.format('signature in',mmti.mmtype)
assert mmti.mmtype not in ('S', 'B'), fs.format('signature in', mmti.mmtype)
assert not 'witness' in ti, 'non-witness input has witness'
# sig_size 72 (DER format), pubkey_size 'compressed':33, 'uncompressed':65
assert (200 < len(ti['scriptSig']) < 300), 'malformed scriptSig' # VERY rough check
return True
def check_pubkey_scripts(self):
for n,i in enumerate(self.inputs,1):
addr,fmt = scriptPubKey2addr(self.proto,i.scriptPubKey)
for n, i in enumerate(self.inputs, 1):
addr, fmt = scriptPubKey2addr(self.proto, i.scriptPubKey)
if i.addr != addr:
if fmt != i.addr.addr_fmt:
m = 'Address format of scriptPubKey ({}) does not match that of address ({}) in input #{}'
msg(m.format(fmt,i.addr.addr_fmt,n))
msg(m.format(fmt, i.addr.addr_fmt, n))
m = 'ERROR: Address and scriptPubKey of transaction input #{} do not match!'
die(3,(m+'\n {:23}{}'*3).format(n, 'address:',i.addr,
'scriptPubKey:',i.scriptPubKey,
'scriptPubKey->address:',addr ))
die(3, (m+'\n {:23}{}'*3).format(
n,
'address:', i.addr,
'scriptPubKey:', i.scriptPubKey,
'scriptPubKey->address:', addr))
# def is_replaceable_from_rpc(self):
# dec_tx = await self.rpc.call('decoderawtransaction',self.serialized)
# dec_tx = await self.rpc.call('decoderawtransaction', self.serialized)
# return None < dec_tx['vin'][0]['sequence'] <= self.proto.max_int - 2
def is_replaceable(self):
@ -83,4 +85,4 @@ class Completed(Base,TxBase.Completed):
return self.sum_outputs() - self.send_amt
def get_serialized_locktime(self):
return int(bytes.fromhex(self.serialized[-8:])[::-1].hex(),16)
return int(bytes.fromhex(self.serialized[-8:])[::-1].hex(), 16)

View file

@ -13,12 +13,12 @@ proto.btc.tx.info: Bitcoin transaction info class
"""
from ....tx.info import TxInfo
from ....util import fmt,die
from ....color import red,green,pink
from ....util import fmt, die
from ....color import red, green, pink
from ....addr import MMGenID
class TxInfo(TxInfo):
sort_orders = ('addr','raw')
sort_orders = ('addr', 'raw')
txinfo_hdr_fs = 'TRANSACTION DATA\n\nID={i} ({a} {c}) RBF={r} Sig={s} Locktime={l}\n'
txinfo_hdr_fs_short = 'TX {i} ({a} {c}) RBF={r} Sig={s} Locktime={l}\n'
txinfo_ftr_fs = fmt("""
@ -33,17 +33,17 @@ class TxInfo(TxInfo):
return ' ({} {}, {} of spend amount)'.format(
pink(tx.fee_abs2rel(tx.fee)),
tx.rel_fee_disp,
pink('{:0.6f}%'.format( tx.fee / tx.send_amt * 100 ))
pink('{:0.6f}%'.format(tx.fee / tx.send_amt * 100))
)
def format_abs_fee(self,color,iwidth):
def format_abs_fee(self, color, iwidth):
return self.tx.fee.fmt(color=color, iwidth=iwidth)
def format_verbose_footer(self):
tx = self.tx
tsize = len(tx.serialized) // 2 if tx.serialized else 'unknown'
out = f'Transaction size: Vsize {tx.estimate_size()} (estimated), Total {tsize}'
if tx.name in ('Signed','OnlineSigned'):
if tx.name in ('Signed', 'OnlineSigned'):
wsize = tx.deserialized.witness_size
out += f', Base {tsize-wsize}, Witness {wsize}'
return out + '\n'
@ -51,9 +51,9 @@ class TxInfo(TxInfo):
def format_body(self, blockcount, nonmm_str, max_mmwid, enl, terse, sort):
if sort not in self.sort_orders:
die(1,'{!r}: invalid transaction view sort order. Valid options: {}'.format(
die(1, '{!r}: invalid transaction view sort order. Valid options: {}'.format(
sort,
','.join(self.sort_orders) ))
','.join(self.sort_orders)))
def get_mmid_fmt(e, is_input):
if e.mmid:
@ -61,13 +61,13 @@ class TxInfo(TxInfo):
width=max_mmwid,
encl='()',
color=True,
append_chars=('',' (chg)')[bool(not is_input and e.is_chg and terse)],
append_chars=('', ' (chg)')[bool(not is_input and e.is_chg and terse)],
append_color='green')
else:
return MMGenID.fmtc( nonmm_str, width=max_mmwid, color=True )
return MMGenID.fmtc(nonmm_str, width=max_mmwid, color=True)
def format_io(desc):
io = getattr(tx,desc)
io = getattr(tx, desc)
is_input = desc == 'inputs'
yield desc.capitalize() + ':\n' + enl
confs_per_day = 60*60*24 // tx.proto.avg_bdi
@ -75,25 +75,25 @@ class TxInfo(TxInfo):
io_sorted = {
'addr': lambda: sorted(
io, # prepend '+' (sorts before '0') to ensure non-MMGen addrs are displayed first
key = lambda o: (o.mmid.sort_key if o.mmid else f'+{o.addr}') + f'{o.amt:040.20f}' ),
key = lambda o: (o.mmid.sort_key if o.mmid else f'+{o.addr}') + f'{o.amt:040.20f}'),
'raw': lambda: io
}[sort]
if terse:
iwidth = max(len(str(int(e.amt))) for e in io)
addr_w = max(len(e.addr.views[vp1]) for f in (tx.inputs,tx.outputs) for e in f)
for n,e in enumerate(io_sorted()):
addr_w = max(len(e.addr.views[vp1]) for f in (tx.inputs, tx.outputs) for e in f)
for n, e in enumerate(io_sorted()):
yield '{:3} {} {} {} {}\n'.format(
n+1,
e.addr.fmt(vp1, width=addr_w, color=True),
get_mmid_fmt(e, is_input),
e.amt.fmt(iwidth=iwidth,color=True),
tx.dcoin )
e.amt.fmt(iwidth=iwidth, color=True),
tx.dcoin)
if have_bch:
yield '{:3} [{}]\n'.format('', e.addr.hl(vp2, color=False))
else:
col1_w = len(str(len(io))) + 1
for n,e in enumerate(io_sorted()):
for n, e in enumerate(io_sorted()):
mmid_fmt = get_mmid_fmt(e, is_input)
if is_input and blockcount:
confs = e.confs + blockcount - tx.blockcount
@ -115,7 +115,7 @@ class TxInfo(TxInfo):
yield ('', 'confirmations:', f'{confs} (around {days} days)')
if not is_input and e.is_chg:
yield ('', 'change:', green('True'))
yield '\n'.join('{:>{w}} {:<8} {}'.format(*d,w=col1_w) for d in gen()) + '\n\n'
yield '\n'.join('{:>{w}} {:<8} {}'.format(*d, w=col1_w) for d in gen()) + '\n\n'
tx = self.tx
@ -128,12 +128,12 @@ class TxInfo(TxInfo):
vp1 = 0
return (
'Displaying inputs and outputs in {} sort order'.format({'raw':'raw','addr':'address'}[sort])
+ ('\n\n','\n')[terse]
'Displaying inputs and outputs in {} sort order'.format({'raw':'raw', 'addr':'address'}[sort])
+ ('\n\n', '\n')[terse]
+ ''.join(format_io('inputs'))
+ ''.join(format_io('outputs')) )
+ ''.join(format_io('outputs')))
def strfmt_locktime(self,locktime=None,terse=False):
def strfmt_locktime(self, locktime=None, terse=False):
# Locktime itself is an unsigned 4-byte integer which can be parsed two ways:
#
# If less than 500 million, locktime is parsed as a block height. The transaction can be
@ -147,11 +147,11 @@ class TxInfo(TxInfo):
if num is None:
return '(None)'
elif num.bit_length() > 32:
die(2,f'{num!r}: invalid nLockTime value (integer size greater than 4 bytes)!')
die(2, f'{num!r}: invalid nLockTime value (integer size greater than 4 bytes)!')
elif num >= 500_000_000:
import time
return ' '.join(time.strftime('%c',time.gmtime(num)).split()[1:])
return ' '.join(time.strftime('%c', time.gmtime(num)).split()[1:])
elif num > 0:
return '{}{}'.format(('block height ','')[terse],num)
return '{}{}'.format(('block height ', '')[terse], num)
else:
die(2,f'{num!r}: invalid nLockTime value!')
die(2, f'{num!r}: invalid nLockTime value!')

View file

@ -18,7 +18,7 @@ from ....util import msg, fmt, make_chksum_6, die, suf
from ....color import pink
from .base import Base
class New(Base,TxBase.New):
class New(Base, TxBase.New):
usr_fee_prompt = 'Enter transaction fee: '
fee_fail_fs = 'Network fee estimation for {c} confirmations failed ({t})'
no_chg_msg = 'Warning: Change address will be deleted as transaction produces no change'
@ -68,7 +68,7 @@ class New(Base,TxBase.New):
return self.proto.coin_amt(amt_in_units * tx_size, from_unit=units[unit])
# given network fee estimate in BTC/kB, return absolute fee using estimated tx size
def fee_est2abs(self,fee_per_kb,fe_type=None):
def fee_est2abs(self, fee_per_kb, fe_type=None):
from decimal import Decimal
tx_size = self.estimate_size()
ret = self.proto.coin_amt('1') * (fee_per_kb * self.cfg.fee_adjust * tx_size / 1024)
@ -81,8 +81,8 @@ class New(Base,TxBase.New):
""").strip())
return ret
def convert_and_check_fee(self,fee,desc):
abs_fee = self.feespec2abs(fee,self.estimate_size())
def convert_and_check_fee(self, fee, desc):
abs_fee = self.feespec2abs(fee, self.estimate_size())
if abs_fee is None:
raise ValueError(f'{fee}: cannot convert {self.rel_fee_desc} to {self.coin}'
+ ' because transaction size is unknown')
@ -101,7 +101,7 @@ class New(Base,TxBase.New):
# Bitcoin full node, call doesn't go to the network, so just call listunspent with addrs=[]
return []
def update_change_output(self,funds_left):
def update_change_output(self, funds_left):
if funds_left == 0: # TODO: test
msg(self.no_chg_msg)
self.outputs.pop(self.chg_idx)
@ -112,12 +112,12 @@ class New(Base,TxBase.New):
fee = self.sum_inputs() - self.sum_outputs()
if fee > self.proto.max_tx_fee:
c = self.proto.coin
die( 'MaxFeeExceeded', f'Transaction fee of {fee} {c} too high! (> {self.proto.max_tx_fee} {c})' )
die('MaxFeeExceeded', f'Transaction fee of {fee} {c} too high! (> {self.proto.max_tx_fee} {c})')
def final_inputs_ok_msg(self,funds_left):
def final_inputs_ok_msg(self, funds_left):
return 'Transaction produces {} {} in change'.format(funds_left.hl(), self.coin)
async def create_serialized(self,locktime=None,bump=None):
async def create_serialized(self, locktime=None, bump=None):
if not bump:
self.inputs.sort_bip69()
@ -133,15 +133,15 @@ class New(Base,TxBase.New):
'txid': e.txid,
'vout': e.vout,
'sequence': e.sequence
} for e in self.inputs ]
} for e in self.inputs]
outputs_dict = {e.addr:e.amt for e in self.outputs}
ret = await self.rpc.call( 'createrawtransaction', inputs_list, outputs_dict )
ret = await self.rpc.call('createrawtransaction', inputs_list, outputs_dict)
if locktime and not bump:
msg(f'Setting nLockTime to {self.info.strfmt_locktime(locktime)}!')
assert isinstance(locktime,int), 'locktime value not an integer'
assert isinstance(locktime, int), 'locktime value not an integer'
self.locktime = locktime
ret = ret[:-8] + bytes.fromhex(f'{locktime:08x}')[::-1].hex()

View file

@ -13,27 +13,27 @@ proto.btc.tx.online: Bitcoin online signed transaction class
"""
from ....tx import online as TxBase
from ....util import msg,die
from ....util import msg, die
from ....color import orange
from .signed import Signed
class OnlineSigned(Signed,TxBase.OnlineSigned):
class OnlineSigned(Signed, TxBase.OnlineSigned):
async def send(self,prompt_user=True):
async def send(self, prompt_user=True):
self.check_correct_chain()
if not self.cfg.bogus_send:
if self.has_segwit_outputs() and not self.rpc.info('segwit_is_active'):
die(2,'Transaction has Segwit outputs, but this blockchain does not support Segwit'
die(2, 'Transaction has Segwit outputs, but this blockchain does not support Segwit'
+ ' at the current height')
if self.fee > self.proto.max_tx_fee:
die(2,'Transaction fee ({}) greater than {} max_tx_fee ({} {})!'.format(
die(2, 'Transaction fee ({}) greater than {} max_tx_fee ({} {})!'.format(
self.fee,
self.proto.name,
self.proto.max_tx_fee,
self.proto.coin ))
self.proto.coin))
await self.status.display()
@ -45,7 +45,7 @@ class OnlineSigned(Signed,TxBase.OnlineSigned):
else:
m = 'Transaction sent: {}'
try:
ret = await self.rpc.call('sendrawtransaction',self.serialized)
ret = await self.rpc.call('sendrawtransaction', self.serialized)
except Exception as e:
errmsg = str(e)
nl = '\n'
@ -61,7 +61,7 @@ class OnlineSigned(Signed,TxBase.OnlineSigned):
m = "Transaction with nLockTime {!r} can’t be included in this block!".format(
self.info.strfmt_locktime(self.get_serialized_locktime()))
else:
m,nl = ('','')
m, nl = ('', '')
msg(orange('\n'+errmsg))
die(2, f'{m}{nl}Send of MMGen transaction {self.txid} failed')
else:

View file

@ -13,19 +13,19 @@ proto.btc.tx.signed: Bitcoin signed transaction class
"""
from ....tx import signed as TxBase
from ....util import fmt,die
from ....util import fmt, die
from .completed import Completed
class Signed(Completed,TxBase.Signed):
class Signed(Completed, TxBase.Signed):
def compare_size_and_estimated_size(self,tx_decoded):
def compare_size_and_estimated_size(self, tx_decoded):
est_vsize = self.estimate_size()
d = tx_decoded
vsize = d['vsize'] if 'vsize' in d else d['size']
self.cfg._util.vmsg(f'\nVsize: {vsize} (true) {est_vsize} (estimated)')
ratio = float(est_vsize) / vsize
if not (0.95 < ratio < 1.05): # allow for 5% error
die( 'BadTxSizeEstimate', fmt(f"""
die('BadTxSizeEstimate', fmt(f"""
Estimated transaction vsize is {ratio:1.2f} times the true vsize
Your transaction fee estimates will be inaccurate
Please re-create and re-sign the transaction using the option --vsize-adj={1/ratio:1.2f}

View file

@ -15,12 +15,12 @@ proto.btc.tx.status: Bitcoin transaction status class
import time
from ....tx import status as TxBase
from ....util import msg,suf,die
from ....util import msg, suf, die
from ....util2 import format_elapsed_hr
class Status(TxBase.Status):
async def display(self,usr_req=False):
async def display(self, usr_req=False):
tx = self.tx
@ -33,10 +33,10 @@ class Status(TxBase.Status):
'gettransaction',
txid = tx.coin_txid,
include_watchonly = True,
verbose = False )
verbose = False)
except:
return False
if ret.get('confirmations',0) > 0:
if ret.get('confirmations', 0) > 0:
r.confs = ret['confirmations']
return True
else:
@ -44,13 +44,13 @@ class Status(TxBase.Status):
async def is_in_utxos():
try:
return 'txid' in await tx.rpc.call('getrawtransaction',tx.coin_txid,True)
return 'txid' in await tx.rpc.call('getrawtransaction', tx.coin_txid, True)
except:
return False
async def is_in_mempool():
try:
await tx.rpc.call('getmempoolentry',tx.coin_txid)
await tx.rpc.call('getmempoolentry', tx.coin_txid)
return True
except:
return False
@ -63,11 +63,11 @@ class Status(TxBase.Status):
'gettransaction',
txid = tx.coin_txid,
include_watchonly = True,
verbose = False )
verbose = False)
except:
return False
else:
if 'bip125-replaceable' in ret and ret.get('confirmations',1) <= 0:
if 'bip125-replaceable' in ret and ret.get('confirmations', 1) <= 0:
r.replacing_confs = -ret['confirmations']
r.replacing_txs = ret['walletconflicts']
return True
@ -80,34 +80,34 @@ class Status(TxBase.Status):
'gettransaction',
txid = tx.coin_txid,
include_watchonly = True,
verbose = False )
verbose = False)
rep = ('' if d.get('bip125-replaceable') == 'yes' else 'NOT ') + 'replaceable'
t = d['timereceived']
if tx.cfg.quiet:
msg('Transaction is in mempool')
else:
msg(f'TX status: in mempool, {rep}')
msg('Sent {} ({})'.format(time.strftime('%c',time.gmtime(t)), format_elapsed_hr(t)))
msg('Sent {} ({})'.format(time.strftime('%c', time.gmtime(t)), format_elapsed_hr(t)))
else:
msg('Warning: transaction is in mempool!')
elif await is_in_wallet():
die(0,f'Transaction has {r.confs} confirmation{suf(r.confs)}')
die(0, f'Transaction has {r.confs} confirmation{suf(r.confs)}')
elif await is_in_utxos():
die(4,'ERROR: transaction is in the blockchain (but not in the tracking wallet)!')
die(4, 'ERROR: transaction is in the blockchain (but not in the tracking wallet)!')
elif await is_replaced():
msg('Transaction has been replaced')
msg('Replacement transaction ' + (
f'has {r.replacing_confs} confirmation{suf(r.replacing_confs)}'
if r.replacing_confs else
'is in mempool' ) )
'is in mempool'))
if not tx.cfg.quiet:
msg('Replacing transactions:')
d = []
for txid in r.replacing_txs:
try:
d.append(await tx.rpc.call('getmempoolentry',txid))
d.append(await tx.rpc.call('getmempoolentry', txid))
except:
d.append({})
for txid,mp_entry in zip(r.replacing_txs,d):
msg(f' {txid}' + (' in mempool' if 'height' in mp_entry else '') )
die(0,'')
for txid, mp_entry in zip(r.replacing_txs, d):
msg(f' {txid}' + (' in mempool' if 'height' in mp_entry else ''))
die(0, '')

View file

@ -13,14 +13,14 @@ proto.btc.tx.unsigned: Bitcoin unsigned transaction class
"""
from ....tx import unsigned as TxBase
from ....obj import CoinTxID,MMGenDict
from ....util import msg,msg_r,ymsg,suf,die
from ....obj import CoinTxID, MMGenDict
from ....util import msg, msg_r, ymsg, suf, die
from .completed import Completed
class Unsigned(Completed,TxBase.Unsigned):
class Unsigned(Completed, TxBase.Unsigned):
desc = 'unsigned transaction'
async def sign(self,tx_num_str,keys): # return signed object or False; don't exit or raise exception
async def sign(self, tx_num_str, keys): # return signed object or False; don't exit or raise exception
from ....exception import TransactionChainMismatch
try:
@ -37,14 +37,14 @@ class Unsigned(Completed,TxBase.Unsigned):
self.cfg._util.qmsg(f'Passing {len(keys)} key{suf(keys)} to {self.rpc.daemon.exec_fn}')
if self.has_segwit_inputs():
from ....addrgen import KeyGenerator,AddrGenerator
kg = KeyGenerator( self.cfg, self.proto, 'std' )
ag = AddrGenerator( self.cfg, self.proto, 'segwit' )
keydict = MMGenDict([(d.addr,d.sec) for d in keys])
from ....addrgen import KeyGenerator, AddrGenerator
kg = KeyGenerator(self.cfg, self.proto, 'std')
ag = AddrGenerator(self.cfg, self.proto, 'segwit')
keydict = MMGenDict([(d.addr, d.sec) for d in keys])
sig_data = []
for d in self.inputs:
e = {k:getattr(d,k) for k in ('txid','vout','scriptPubKey','amt')}
e = {k:getattr(d, k) for k in ('txid', 'vout', 'scriptPubKey', 'amt')}
e['amount'] = e['amt']
del e['amt']
if d.mmtype == 'S':
@ -56,8 +56,8 @@ class Unsigned(Completed,TxBase.Unsigned):
try:
args = (
('signrawtransaction', self.serialized,sig_data,wifs,self.proto.sighash_type),
('signrawtransactionwithkey',self.serialized,wifs,sig_data,self.proto.sighash_type)
('signrawtransaction', self.serialized, sig_data, wifs, self.proto.sighash_type),
('signrawtransactionwithkey', self.serialized, wifs, sig_data, self.proto.sighash_type)
)['sign_with_key' in self.rpc.caps]
ret = await self.rpc.call(*args)
except Exception as e:
@ -68,11 +68,11 @@ class Unsigned(Completed,TxBase.Unsigned):
self.update_serialized(ret['hex'])
from ....tx import SignedTX
new = await SignedTX(cfg=self.cfg, data=self.__dict__, automount=self.automount)
tx_decoded = await self.rpc.call( 'decoderawtransaction', ret['hex'] )
tx_decoded = await self.rpc.call('decoderawtransaction', ret['hex'])
new.compare_size_and_estimated_size(tx_decoded)
new.coin_txid = CoinTxID(self.deserialized.txid)
if not new.coin_txid == tx_decoded['txid']:
die( 'BadMMGenTxID', 'txid mismatch (after signing)' )
die('BadMMGenTxID', 'txid mismatch (after signing)')
msg('OK')
return new
except Exception as e: