{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Matching Backpage Ads with Bitcoin Transactions\n", "\n", "This document shows the entire data pipeline that enables us to match a Backpage ad with a potential Bitcoin transactions.\n", "\n", "For the full paper, see http://www.kdd.org/kdd2017/papers/view/backpage-and-bitcoin-uncovering-human-traffickers\n", "\n", "If you have any questions on this code, please contact Danny Y. Huang: http://www.sysnet.ucsd.edu/~dhuang/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
preprocess_parse_blockchain_merged.json\n", "\n", "Output schema: \n", "\n", "
\n", "['3fa5b6ff32e05272dd2a4070ad3dfde3ef4c763c2c5ec348e2248cb47a46fd7e', # txn_hash\n", " [[['1EHkaH8MV5uBYcWXBLzMBZbWCcu71hwJ7k', 0.02680139], # input wallet, input amount\n", " ['18qBGi2FqzcBf9RDf8nzag4BiNdu41ma6y', 0.01492975]], # input wallet, input amount\n", " [['1MabcemUVnDrLdb8j1ri2KMUuaoTzuYSRg', 0.01439704], # output wallet, output amount\n", " ['1G2NWdrEyCDiKNqTx7Zr6QFeQ9hqVKRFmD', 0.027091]]]] # output wallet, output amount\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Parse transactions\n", "\n", "- Take raw json files from ~/txn_history/data/raw_blockchain_txn/%d.json, obtained using the getrawtransaction API call.\n", "\n", "- Flatten each transaction to produce an input and output rdd.\n", "\n", " - input: {txn_hash, prev_txn_hash, prev_ix}\n", " - output: {txn_hash, ix, wallet_addr, amount}" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from pyspark import SparkConf\n", "conf = SparkConf()\n", "conf.set(\"spark.local.dir\", '/tmp/spark');" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Generated using the getrawtransaction API call\n", "DATA_PATH = 'raw_blockchain_txn/*.json'\n", "\n", "RESULT_PATH = 'preprocess_parse_blockchain.json/'\n", "\n", "RESULT_MERGED_PATH = 'preprocess_parse_blockchain_merged.json'" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import json\n", "import pprint as pp\n", "import subprocess\n", "\n", "\n", "def parse_txn_input(txn_dict):\n", " \"\"\"Returns a list of dict of {txn_hash, prev_txn_hash, prev_ix}\"\"\"\n", " \n", " out_list = []\n", " \n", " blockchain_ts = txn_dict['time']\n", " txn_hash = txn_dict['txid']\n", " \n", " for vin_dict in txn_dict['vin']:\n", " try:\n", " out_list.append({\n", " 'txn_hash': txn_hash,\n", " 'prev_txn_hash': vin_dict['txid'],\n", " 'prev_ix': vin_dict['vout']\n", " })\n", " except:\n", " continue\n", "\n", " return out_list\n", "\n", "\n", "def parse_txn_output(txn_dict):\n", " \"\"\"Returns a list of dict of {txn_hash, ix, wallet_addr, amount}\"\"\"\n", " \n", " out_list = []\n", " \n", " txn_hash = txn_dict['txid']\n", " \n", " for vout_dict in txn_dict['vout']:\n", " try:\n", " out_list.append({\n", " 'txn_hash': txn_hash,\n", " 'ix': vout_dict['n'],\n", " 'wallet_addr': vout_dict['scriptPubKey']['addresses'][0],\n", " 'amount': vout_dict['value']\n", " })\n", " except:\n", " continue\n", "\n", " return out_list\n", "\n", "subprocess.call(['rm', '-rf', RESULT_PATH])\n", "\n", "rdd = sc.textFile(DATA_PATH).map(json.loads).cache()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Fill the prev_out of input" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Index input by prev_txn_hash:prev_ix\n", "\n", "input_rdd = rdd \\\n", " .flatMap(parse_txn_input) \\\n", " .map(lambda r: ('{}:{}'.format(r['prev_txn_hash'], r['prev_ix']), r))\n", "\n", "# Index output by txn_hash:ix\n", " \n", "output_rdd = rdd \\\n", " .flatMap(parse_txn_output) \\\n", " .map(lambda r: ('{}:{}'.format(r['txn_hash'], r['ix']), r)) \\\n", " .cache()\n", " \n", "# Fill in the wallet and amount for inputs\n", "\n", "complete_input_rdd = input_rdd \\\n", " .join(output_rdd) \\\n", " .map(lambda (_, (input_dict, prev_output_dict)): {\n", " 'txn_hash': input_dict['txn_hash'],\n", " 'wallet_addr': prev_output_dict['wallet_addr'],\n", " 'amount': prev_output_dict['amount']\n", " })\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Combine input and output" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Combine the input wallets per transaction hash. Produce txn_hash -> [(wallet, amount)]\n", "\n", "complete_input_rdd = complete_input_rdd \\\n", " .map(lambda r: (r['txn_hash'], [(r['wallet_addr'], r['amount'])])) \\\n", " .reduceByKey(lambda x, y: x + y)\n", " \n", "# Combine the output wallets per transaction hash. Produce txn_hash -> [(wallet, amount)]\n", "\n", "output_rdd = output_rdd \\\n", " .map(lambda (_, r): (r['txn_hash'], [(r['wallet_addr'], r['amount'])])) \\\n", " .reduceByKey(lambda x, y: x + y)\n", "\n", "# Combine the input and output wallets\n", "\n", "combined_rdd = complete_input_rdd.join(output_rdd)\n", "\n", "combined_rdd.map(json.dumps).saveAsTextFile(RESULT_PATH)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Concat text files into a single file" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import subprocess\n", "\n", "subprocess.call('cat {}part* > {}'.format(RESULT_PATH, RESULT_MERGED_PATH), shell=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Timestamping transactions\n", "\n", "Add mempool and blockchain timestamps to with blockchain transactions. Also include the USD price.\n", "\n", "Output path:\n", "
\n", "preprocess_txn_timestamps_merged.json\n", "\n", "\n", "Output schema:\n", "
\n", "{'first_seen_ts': 1482908821,\n", " 'input_list': [{'btc_value': 0.03098532,\n", " 'usd_value': 29.217590592916363,\n", " 'wallet_addr': '3LEv6RHmjwHrc3GPLVLKNevBLLvkJgwxG7'}],\n", " 'output_list': [{'btc_value': 0.00106,\n", " 'usd_value': 0.9995264218181817,\n", " 'wallet_addr': '3Q77yyZr651Vrgu78QyjcTj5QJuCnzNwYG'},\n", " {'btc_value': 0.02967299,\n", " 'usd_value': 27.980129735232726,\n", " 'wallet_addr': '35dBmN9oEh4TyQLYwopEm6BxKfPCyWQL7e'}],\n", " 'txn_hash': '871f3c22a690d673d97c522e1711df5907210c167dc9aaf4e20d09a0d62b143b'}\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Input data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "MIN_TIMESTAMP = 1479513600 # 2016-11-19\n", "MAX_TIMESTAMP = 1484956799 # 2017-01-20\n", "\n", "# Mempool \n", "# {'snapshot_ts': 1477347558,\n", "# 'txn_hash': '64188591762a999f42ab11c95b6f04c0846dbbe1203d7cc208c4a88ccd88b4ad'}\n", "MEMPOOL_PATH = 'vm137_mempool.json'\n", "\n", "# GoCoin exchange rate\n", "# {'prices': {'BTC': {'USD': '629.24361000'},\n", "# 'LTC': {'USD': '3.80000000'},\n", "# 'XDG': {'USD': '0.00023573'}},\n", "# 'timestamp': '2016-10-12T16:31:24.432Z'}\n", "EXCHANGE_RATE_PATH = 'gocoin_exchange_rate.json'\n", "\n", "# CSV: \n", "# 9b554ff8fc7f82670d8ac0fa5e154d55e70c1166266a78e2ed1526a5f4f506d1,1479541397\n", "BLOCKCHAIN_TXN_TIMESTAMP_PATH = 'raw_blockchain_txn_timestamps.csv'\n", "\n", "# ['3fa5b6ff32e05272dd2a4070ad3dfde3ef4c763c2c5ec348e2248cb47a46fd7e', # txn_hash\n", "# [[['1EHkaH8MV5uBYcWXBLzMBZbWCcu71hwJ7k', 0.02680139], # input wallet, input amount\n", "# ['18qBGi2FqzcBf9RDf8nzag4BiNdu41ma6y', 0.01492975]], # input wallet, input amount\n", "# [['1MabcemUVnDrLdb8j1ri2KMUuaoTzuYSRg', 0.01439704], # output wallet, output amount\n", "# ['1G2NWdrEyCDiKNqTx7Zr6QFeQ9hqVKRFmD', 0.027091]]]] # output wallet, output amount\n", "BLOCKCHAIN_PATH = 'preprocess_parse_blockchain_merged.json'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Output data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "RESULT_PATH = 'preprocess_txn_timestamps.json/'\n", "\n", "RESULT_MERGED_PATH = 'preprocess_txn_timestamps_merged.json'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get USD/BTC price\n", "\n", "Produce a dictionary that maps unix time to price" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import json\n", "import pandas\n", "from dateutil import parser\n", "\n", "EPOCH_START = parser.parse('1970-01-01T00:00Z')\n", "\n", "df = []\n", "\n", "with open(EXCHANGE_RATE_PATH) as fp:\n", " for line in fp:\n", "\n", " try:\n", " row = json.loads(line)\n", " except:\n", " continue\n", "\n", " # Convert UTC timestamp into unix time\n", " timestamp_str = row['timestamp']\n", " unix_time = int((parser.parse(timestamp_str) - EPOCH_START).total_seconds())\n", "\n", " # Transform unix time to 10-minute times\n", " unix_time -= unix_time % 600\n", "\n", " # Extract BTC/USD price only\n", " price = float(row['prices']['BTC']['USD'])\n", "\n", " df.append((unix_time, price))\n", "\n", "df = pandas.DataFrame(df, columns=['unix_time', 'price']).sort_values(by='unix_time')\n", "\n", "# Find mean price within each 10-minute interval\n", "df = df.groupby('unix_time')['price'].mean().to_frame('price')\n", "\n", "# Produce a dictionary that maps unix time (10-minute granuarity) to price\n", "price_dict = dict()\n", "for (unix_time, row) in df.iterrows():\n", " price_dict[unix_time] = row['price']" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def get_btc_price(unix_time):\n", " \"\"\"Returns the USD price of bitcoin at a particular time.\"\"\"\n", " unix_time -= unix_time % 600\n", " return price_dict[unix_time]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Combine all the prices" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import json\n", "\n", "# Produce txn_hash -> mempool_ts\n", "\n", "mempool_ts_rdd = sc \\\n", " .textFile(MEMPOOL_PATH) \\\n", " .map(json.loads) \\\n", " .map(lambda r: (r['txn_hash'], r['snapshot_ts'])) \\\n", " .reduceByKey(lambda t1, t2: min(t1, t2))\n", "\n", "def get_blockchain_ts(line):\n", " txn_hash, blockchain_ts = line.split(',')\n", " return (txn_hash, int(blockchain_ts))\n", " \n", "# Produce txn_hash -> blockchain_ts\n", "blockchain_ts_rdd = sc \\\n", " .textFile(BLOCKCHAIN_TXN_TIMESTAMP_PATH) \\\n", " .map(get_blockchain_ts)\n", "\n", "# Combine mempool and blockchain timestamps\n", "ts_rdd = blockchain_ts_rdd \\\n", " .leftOuterJoin(mempool_ts_rdd) \\\n", " .mapValues(lambda (blockchain_ts, mempool_ts): {\n", " 'blockchain_ts': blockchain_ts, \n", " 'mempool_ts': blockchain_ts if mempool_ts is None else mempool_ts,\n", " 'missing_mempool_ts': mempool_ts is None\n", " }) \\\n", " .cache()\n", "\n", "print 'Total transactions:', ts_rdd.count()\n", "\n", "missing_rdd = ts_rdd.filter(lambda (_, ts_dict): ts_dict['missing_mempool_ts'])\n", "\n", "print 'Missing transactions:', missing_rdd.count()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import subprocess\n", "\n", "# Produce txn_hash -> (input_list, output_list)\n", "blockchain_rdd = sc \\\n", " .textFile(BLOCKCHAIN_PATH) \\\n", " .map(json.loads)\n", " \n", " \n", "def process_txn(joined_row):\n", " \n", " txn_hash, ((input_list, output_list), ts_dict) = joined_row\n", " \n", " # Get the first seen timestamp = min (mempool time, blockchain time)\n", " \n", " first_seen_ts = min(ts_dict['mempool_ts'], ts_dict['blockchain_ts'])\n", " btc_price = get_btc_price(first_seen_ts)\n", " \n", " out_dict = {\n", " 'txn_hash': txn_hash,\n", " 'first_seen_ts': first_seen_ts,\n", " 'input_list': [],\n", " 'output_list': []\n", " }\n", " \n", " # Get the USD price for all inputs and outputs\n", " \n", " for (wallet_addr, btc_value) in input_list:\n", " out_dict['input_list'].append({\n", " 'wallet_addr': wallet_addr,\n", " 'btc_value': btc_value,\n", " 'usd_value': btc_value * btc_price\n", " })\n", "\n", " for (wallet_addr, btc_value) in output_list:\n", " out_dict['output_list'].append({\n", " 'wallet_addr': wallet_addr,\n", " 'btc_value': btc_value,\n", " 'usd_value': btc_value * btc_price\n", " })\n", " \n", " return out_dict\n", "\n", "\n", "subprocess.call(['rm', '-rf', RESULT_PATH])\n", " \n", "combined_rdd = blockchain_rdd \\\n", " .join(ts_rdd) \\\n", " .map(process_txn) \\\n", " .map(json.dumps) \\\n", " .saveAsTextFile(RESULT_PATH)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Merge result" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import subprocess\n", "\n", "subprocess.call('cat {}part* > {}'.format(RESULT_PATH, RESULT_MERGED_PATH), shell=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## GoCoin transactions\n", "\n", "Look for GoCoin wallets, either in Chainalysis or based on our own heuristics." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import pandas\n", "import chainalysis\n", "import async_chainalysis\n", "import json\n", "from operator import add\n", "import subprocess\n", "import os" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Use Chainalysis to find _all_ GoCoin wallets." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# List of GoCoin as determined from Chainalysis and from previous Becky transactions (way back in 2016)\n", "GOCOIN_WALLET_LIST = [\n", " '17rDgFFftygfXNZaA8rJBjSdJs5Eq8T5qo',\n", " '163iH6EuzjNNW2WGAujYsLHaMWvVFqScJg',\n", " '1JgzeGRmwTT3Jh2KyuoEjDzhvy9GW4281X', \n", " '1C2TzKY8ffkCvGDi5DbNLfquvMp2BHjH9E',\n", " '1DZGZwqjEgPX7trAUNKeMcK6SVRFenCWWu',\n", " '13TYKg2wirvvyv3PWic4P3g82Ng5ZPKjpP'\n", "]\n", "\n", "# Find all wallet addresses that belong to the GoCoin cluster\n", "\n", "gocoin_wallet_set = set()\n", "\n", "for gocoin_root_wallet in GOCOIN_WALLET_LIST:\n", " print 'Getting', gocoin_root_wallet\n", " df = chainalysis.get_cluster_addresses(gocoin_root_wallet)\n", " gocoin_wallet_set |= set(df['addr'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Look at strict gocoin wallets that appeared within our measurement period." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "BLOCKCHAIN_TIMESTAMP_PATH = 'preprocess_txn_timestamps_merged.json'\n", "\n", "def _flatten_wallets(txn):\n", " \n", " wallet_set = set()\n", " \n", " for txn_list in [txn['input_list'], txn['output_list']]:\n", " for r in txn_list:\n", " wallet_set.add((r['wallet_addr'], 1))\n", " \n", " return wallet_set\n", "\n", "\n", "blockchain_rdd = sc \\\n", " .textFile(BLOCKCHAIN_TIMESTAMP_PATH) \\\n", " .map(json.loads) \\\n", " .cache()\n", "\n", "transient_wallet_rdd = blockchain_rdd \\\n", " .flatMap(_flatten_wallets) \\\n", " .reduceByKey(add) \\\n", " .filter(lambda (wallet, count): count == 2) \\\n", " .cache()\n", " \n", "strict_gocoin_wallet_rdd = sc \\\n", " .parallelize(gocoin_wallet_set) \\\n", " .map(lambda wallet: (wallet, 1)) \\\n", " .join(transient_wallet_rdd) \\\n", " .map(lambda (wallet, _): (wallet, 1))\n", " " ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "### Find GoCoin-looking transactions\n", "\n", "- Output is a single wallet that starts with 3; used exactly twice.\n", "- Input is a list of more than 15 wallets, where at least 60% of the input BTC values have exactly 4 decimal places.\n", "- Each input wallet is used exaclty twice." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def is_gocoin_looking(txn):\n", " \n", " output_list = txn['output_list']\n", "\n", " if len(output_list) != 1:\n", " return False\n", " \n", " if not output_list[0]['wallet_addr'].startswith('3'):\n", " return False\n", " \n", " input_df = pandas.DataFrame(txn['input_list'])\n", " \n", " if len(input_df) <= 10:\n", " return False\n", " \n", " input_df['is_four_decimal'] = input_df['btc_value'].apply(\n", " lambda v: 1 if (v < 1 and len(str(v)) == 6) else 0\n", " )\n", " four_decimal_fraction = input_df['is_four_decimal'].sum() * 1.0 / len(input_df)\n", " if four_decimal_fraction <= 0.50:\n", " return False\n", " \n", " return True\n", "\n", "gocoin_heuristic_rdd = blockchain_rdd \\\n", " .filter(is_gocoin_looking) \\\n", " .flatMap(lambda txn: [r['wallet_addr'] for r in txn['input_list']]) \\\n", " .distinct() \\\n", " .map(lambda wallet: (wallet, 1)) \\\n", " .join(transient_wallet_rdd) \\\n", " .map(lambda (wallet, _): (wallet, 1))\n" ] }, { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "### Combine Chainalysis and Heuristics Results" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "gocoin_wallets_rdd = strict_gocoin_wallet_rdd \\\n", " .fullOuterJoin(gocoin_heuristic_rdd) \\\n", " .map(lambda (wallet, (strict, heuristic)): (wallet, 1 if strict == 1 else 0, 1 if heuristic == 1 else 0))\n", "\n", "gocoin_wallets_df = pandas.DataFrame(\n", " gocoin_wallets_rdd.collect(), \n", " columns=['wallet_addr', 'is_chainalysis', 'is_heuristics']\n", ")\n", "\n", "gocoin_wallets_df.set_index('wallet_addr').to_csv('gocoin_wallets.csv')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Analysis\n", "\n", "In this section, we match GoCoin transactions with Backpage ads." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import pandas\n", "import sys\n", "import re\n", "import json" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# CSV format: uuid, ad_id,location,ad_ts,total_cost\n", "BACKPAGE_PATH = 'backpage_ad_timestamps.csv'\n", "\n", "BLOCKCHAIN_HDFS_PATH = 'preprocess_txn_timestamps_merged.json'\n", "\n", "AUTHOR_LABEL_PATH = 'phone_email_labels.txt'\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Parse author labels" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Should match 4188634_alabama label_6\n", "author_regex = re.compile(r'(\\d+)_([a-z]+) label_(\\d+)')\n", "\n", "author_df = []\n", "\n", "with open(AUTHOR_LABEL_PATH) as fp:\n", " for line in fp:\n", " match = author_regex.search(line)\n", " author_df += [(\n", " int(match.group(1)), match.group(2), int(match.group(3))\n", " )] \n", " \n", "author_df = pandas.DataFrame(author_df, columns=['ad_id', 'location', 'author_label']).set_index(['ad_id', 'location'])\n", "\n", "print len(author_df)\n", "author_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load Backpage data and combine with author labels." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "backpage_df = pandas.read_csv(BACKPAGE_PATH)\n", "\n", "backpage_df = backpage_df \\\n", " .set_index(['ad_id', 'location']) \\\n", " .join(author_df, how='left') \\\n", " .reset_index()\n", " \n", "# 2016-12-11 UTC = 1481414400\n", "backpage_df = backpage_df[backpage_df['action_ts'] >= 1481414400]\n", " \n", "backpage_df[pandas.notnull(backpage_df['author_label'])].head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepare to be matched with blockchain later. Increase action_ts by a minute to match with mempool_ts." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "backpage_rdd = []\n", "\n", "for (_, row) in backpage_df.iterrows():\n", " \n", " action_ts = row['action_ts']\n", " \n", " # Round down to nearest minute, although this does not have any effect\n", " # since backpage timestamps are accurate to the minute already.\n", " action_ts = int(action_ts - action_ts % 60)\n", " \n", " backpage_rdd += [\n", " (action_ts, row.to_dict())\n", " ]\n", " \n", "# possible_ts -> backpage_dict\n", "backpage_rdd = sc.parallelize(backpage_rdd)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load Mempool data\n", "\n", "Load mempool txn with rounded-down timestamps. Filter by Becky-style transactions." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def _find_possible_ts(txn):\n", "\n", " mempool_ts = txn['first_seen_ts']\n", " mempool_ts = int(mempool_ts - mempool_ts % 60)\n", " \n", " return [\n", "# (mempool_ts + 60, txn), \n", " (mempool_ts, txn),\n", " (mempool_ts - 60, txn),\n", "# (mempool_ts - 120, txn), \n", " ]\n", "\n", "blockchain_rdd = sc \\\n", " .textFile(BLOCKCHAIN_HDFS_PATH) \\\n", " .map(json.loads) \\\n", "\n", "# Produces rounded_down_mempool_ts -> txn\n", "blockchain_rdd = blockchain_rdd \\\n", " .flatMap(_find_possible_ts)\n", "\n", "blockchain_stats_rdd = sc \\\n", " .textFile(BLOCKCHAIN_HDFS_PATH) \\\n", " .map(json.loads) \\\n", " .cache()\n", " \n", "blockchain_ts_rdd = blockchain_stats_rdd \\\n", " .map(lambda txn: txn['first_seen_ts']) \\\n", " .cache()\n", " \n", "print blockchain_ts_rdd.min(), blockchain_ts_rdd.max()\n", "print blockchain_stats_rdd.map(lambda txn: txn['txn_hash']).distinct().count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load GoCoin data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "gocoin_wallet_df = pandas.read_csv('gocoin_wallets.csv')" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "gocoin_rdd = sc \\\n", " .parallelize([r.to_dict() for (_, r) in gocoin_wallet_df.iterrows()]) \\\n", " .map(lambda r: (r['wallet_addr'], {\n", " 'is_chainalysis': int(r['is_chainalysis']),\n", " 'is_heuristics': int(r['is_heuristics'])\n", " }))\n", " \n", "gocoin_rdd.first()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Match timestamp and cost. Remove irrelevant outputs. Flatten output.\n", "\n", "This is the core matching algorithm." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def _get_relevant_outputs(joined_row):\n", " \"\"\"Returns a list of matching outputs.\"\"\"\n", " \n", " (_, (txn_dict, backpage_dict)) = joined_row\n", " ret_list = []\n", "\n", " input_df = pandas.DataFrame(txn_dict['input_list'])\n", " input_wallets = ' '.join(set(input_df['wallet_addr']))\n", " input_wallet_count = len(input_wallets.split())\n", " \n", " action_cost = backpage_dict['action_cost']\n", " action_type = backpage_dict['action_type']\n", " \n", " for output_dict in txn_dict['output_list']:\n", "\n", " output_btc = output_dict['btc_value'] \n", " output_usd = output_dict['usd_value']\n", " \n", " # BTC value must be less than 1 and have exactly 3 or 4 decimal places\n", " if output_btc >= 1:\n", " continue\n", " if len(str(output_btc)) not in (5, 6):\n", " continue\n", " \n", " difference = (action_cost - output_usd) * 1.0 / output_usd\n", " \n", " # Ignore output USD amounts beyond the specified range\n", " if action_type == 'default':\n", " if not (-0.02 <= difference <= 0.02):\n", " continue\n", " elif action_type == 'sponsored':\n", " if not (-0.05 <= difference <= 0.05):\n", " continue\n", " else:\n", " raise RuntimeError('Invalid action_type: {}'.format(action_type))\n", " \n", " # Flatten for each output\n", " ret_dict = {\n", " 'txn_hash': txn_dict['txn_hash'],\n", " 'input_wallets': input_wallets,\n", " 'input_wallet_count': input_wallet_count,\n", " 'output_wallet': output_dict['wallet_addr'],\n", " 'output_btc': output_dict['btc_value'],\n", " 'output_usd': output_usd\n", " }\n", " ret_dict.update(backpage_dict)\n", " ret_list.append(ret_dict)\n", " \n", " return ret_list\n", "\n", "def merge_dict(x, y):\n", " \n", " ret = {}\n", " ret.update(x)\n", " ret.update(y)\n", " \n", " return ret\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "basic_match_rdd_1 = blockchain_rdd \\\n", " .join(backpage_rdd)\n", " \n", "basic_match_rdd_2 = basic_match_rdd_1 \\\n", " .flatMap(_get_relevant_outputs)\n", " \n", "basic_match_rdd_3 = basic_match_rdd_2 \\\n", " .map(lambda r: (r['output_wallet'], r)) \\\n", " .join(gocoin_rdd) \\\n", " .map(lambda (_, (x, y)): merge_dict(x, y))\n", "\n", "basic_match_rdd = basic_match_rdd_3" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "basic_match_df = pandas.DataFrame(basic_match_rdd.collect())" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "basic_match_df \\\n", " .sort_values('author_label') \\\n", " .to_csv('match_all_txn_basic_match_df.csv') " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "