0%

大家也许有过这种经历,关注的某只股票突发利好消息预判会连续涨停,赶紧打开炒股软件准备下单,发现这只股票已经涨停了,而且是几十万手的大单封死,根本没机会下手,随后几天只能看着股票一个涨停接一个涨停的一路上涨,后悔下手太慢。这种情况还有办法上车吗,散户一般能想到的办法都没戏,但是可以试试下面这种办法,吃不到肉但也许能喝口汤。

ETF套利介绍

在介绍之前我们先了解下什么是ETF套利?ETF指的是交易型开放式指数基金,是一种在交易所上市交易,并且份额可以变动的一种开放式基金,它的手续与股票是完全一样的。由于一级市场和二级市场同时存在,不可避免会出现价格无法同步的显现,这种情况下就给一些机构跨市场套利提供了条件,而套利交易会让套利机会消失,让两个市场的价格差异得到控制,从而保证一级和二级市场价格的一致性。因为一级市场只能是机构投资者参与,所以我们一般的投资者是没办法进行ETF套利的。

散户该如何操作呢

那我们怎么参与涨停股票的交易呢,办法很简单,可以找出持有我们要买入的涨停股的ETF基金,再看看基金持有的股票数量占基金的比重,找持有股票占比大的基金买入。一般基金持仓比较分散,每只股票持有数量从百分之零点几到百分之十几不等,而且优质股票往往被上百只基金同时持有,如何快速找到持有股票的基金和比重呢?

操作实战

以天齐锂业为例,我们从东方财富网上可以找到机构持仓明细 http://data.eastmoney.com/zlsj/detail/2019-12-31-0-002466.html,可以看到有多种类型的基金,我们选择ETF基金,比如最近比较火的华夏中证5G通信主题ETF

image-20200301015233403

进入到该基金的页面 http://fundact.eastmoney.com/fundinfo/515050.html?fund=515050,可以查看该基金持有的前十大股票,第一名就是中兴通信,占比9.90%

image-20200301015411242

了解以上的数据源之后,我们可以写个python程序快速的找到某只股票的基金持仓及占比情况,源码如下:

import requests
import execjs
from bs4 import BeautifulSoup

REQUEST_HEADER = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36'}


def get_stock_fundlist(symbol, date='2019-12-31'):
    """
        查询股票被持有的基金及持仓占比
    :param symbol: 股票代码,例如: SZ000001
    :param date: 报告日期,例如:2019-12-31,2019-03-31
    :return:
    """
    stock_url = 'http://data.eastmoney.com/zlsj/detail.aspx?type=ajax&sr=-1&p=1&ps=1000&stat=0&code=%s&date=%s&rt=52763434'
    fund_url = 'http://fund.eastmoney.com/f10/FundArchivesDatas.aspx?type=jjcc&code=%s&topline=15'

    web_source = requests.get(stock_url % (symbol[2:], date), headers=REQUEST_HEADER, timeout=5)
    js_obj = execjs.compile(web_source.content.decode('gbk'))
    results = js_obj.eval('jsname')
    if 'data' not in results:
        return
    etf_list = [record['SHCode'] for record in results['data'] if record['SHCode'].startswith('5')]
    results = {}
    for code in etf_list[:]:
        web_source = requests.get(fund_url % code, headers=REQUEST_HEADER, timeout=5)
        html_source = web_source.content.decode()
        html_source = html_source.split("\"")[1]
        soup = BeautifulSoup(html_source, 'lxml')
        items = soup.select(".tzxq")[0].select('tr')
        for item in items[1:]:
            if symbol[2:] not in item.text:
                continue
            fields = item.select('td')
            record = [field.text.strip() for field in fields if
                      field.text.strip() != '' and '变动' not in field.text]
            results[code] = record[3][:-1]
            break
    return sorted(results.items(), key=lambda kv: kv[1], reverse=True)


if __name__ == '__main__':
    print(get_stock_fundlist('SZ000063', '2019-12-31'))

执行代码输出结果如下:

[('515050', '9.90'), ('570007', '6.87'), ('519668', '6.52'), ('501062', '5.37'), ('550002', '5.13'), ('515000', '5.10'), ('570006', '5.08'), ('501028', '4.81'), ('502013', '4.33'), ('501015', '4.26'), ('550015', '4.22'), ('515580', '4.19'), ('501026', '4.19'), ('512970', '3.94'), ('519929', '3.69'), ('515200', '3.32'), ('550001', '3.06'), ('550008', '3.05'), ('501081', '3.05'), ('550009', '2.99'), ('512220', '2.95'), ('501076', '2.26'), ('519013', '2.09'), ('515880', '10.75'), ('510080', '1.60'), ('590007', '0.99'), ('519676', '0.71')]

可以看到所有基金中持有中兴通讯最多份额的基金是515050,该基金持仓里中兴通讯占基金的比例为9.9%,意味着中兴通讯每涨停一天对基金会有0.99%的贡献。

当然,这个办法有很多的局限性,比如基金持有的股票多数是优质股,这种机会比较难碰到,而且指数型基金往往持仓非常分散,也不适用这种办法。如果基金中其他股票下跌也会造成基金下跌。

515050 5GETF这种主题基金比较适合这种个股利好涨停的,遇到针对整个行业的利好消息,这种主题型的ETF往往同时持有多只股票出现大幅上涨,2月24日当天,515050 这只基金除了中兴通讯涨停外,信维通信和沪电股份也出现了涨停,这三只股票基金持仓占比高达17.18%,如果之后出现了连续涨停的走势,对基金影响将非常明显。

除了515050 5GETF还有其他几个比较适合的,比如159995 芯片ETF,515700 新能车,512760 半导体50等,可以都关注着,没准哪天掉下来个利好呢。

Python量化交易实战
欢迎您扫码订阅我的微信公众号: pyquant

今天我们通过一个例子来介绍python爬取数据的一般步骤,用到的工具包括python的经典模块requests和BeautifulSoup,另外结合刚学习的任务流工具TaskFlow来完成代码开发。

我们先来看一下要爬取的数据,网址是http://data.10jqka.com.cn/funds/gnzjl/,通过chrome的开发者工具分析我们可以比较容易找到后台数据加载网址为

http://data.10jqka.com.cn/funds/gnzjl/field/tradezdf/order/desc/page/{page_num}/ajax/1/free/1/

其中page_num的位置为要查询第几页的数据,在网页上看到概念一共有6页数据,所以page_num取值为1-6

WechatIMG109

这里有个小技巧,可以先点击图片左上角的清空按钮,把已经加载的网址先清理掉,然后在原始网页上点第二页,就能看到图片左下角新加载的网址,点开右边“Preview” 看到资金流数据相关的内容,就能确定这个网址是用来加载数据的。

在chrome浏览器中输入 http://data.10jqka.com.cn/funds/gnzjl/field/tradezdf/order/desc/page/1/ajax/1/free/1/,并打开chrome开发者工具,在网页源码中找到数据所在table标签为

<table class="m-table J-ajax-table">
	...
</table>

抓取数据的完整源码如下

import time

import requests
from bs4 import BeautifulSoup
from taskflow import engines
from taskflow.patterns import linear_flow
from taskflow.task import Task

REQUEST_HEADER = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36'}


class MoneyFlowDownload(Task):
    """
    下载资金流数据
    数据源地址:http://data.10jqka.com.cn/funds/gnzjl/

    """
    BASE_URl = {
        "concept": 'http://data.10jqka.com.cn/funds/gnzjl/field/tradezdf/order/desc/page/%s/ajax/1/free/1/',
    }

    def execute(self, bizdate, *args, **kwargs):

        for name, base_url in self.BASE_URl.items():
            # 爬取数据的存储路径
            dt_path = '/data/%s_%s.csv' % (bizdate, name)

            with open(dt_path, "a+") as f:
                # 记录数据文件的当前位置
                pos = f.tell()
                f.seek(0)
                lines = f.readlines()
                # 读取文件中的全部数据并将第一列存储下来作为去重依据,防止爬虫意外中断后重启程序时,重复写入相同
                crawled_list = list(map(lambda line: line.split(",")[0], lines))
                f.seek(pos)
                # 循环500次,从第一页开始爬取数据,当页面没有数据时终端退出循环
                for i in range(1, 500):
                    print("start crawl %s, %s" % (name, base_url % i))
                    web_source = requests.get(base_url % i, headers=REQUEST_HEADER)
                    soup = BeautifulSoup(web_source.content.decode("gbk"), 'lxml')
                    table = soup.select('.J-ajax-table')[0]
                    tbody = table.select('tbody tr')
                    # 当tbody为空时,则说明当前页已经没有数据了,此时终止循环
                    if len(tbody) == 0:
                        break
                    for tr in tbody:
                        fields = tr.select('td')
                        # 将每行记录第一列去掉,第一列为序号,没有存储必要
                        record = [field.text.strip() for field in fields[1:]]
                        # 如果记录还没有写入文件中,则执行写入操作,否则跳过这行写入
                        if record[0] not in crawled_list:
                            f.writelines([','.join(record) + '\n'])
                    # 同花顺网站有反爬虫的机制,爬取速度过快很可能被封
                    time.sleep(1)


if __name__ == '__main__':
    bizdate = '20200214'
    tasks = [
        MoneyFlowDownload('moneyflow data download')
    ]
    flow = linear_flow.Flow('ths data download').add(*tasks)
    e = engines.load(flow, store={'bizdate': bizdate})
    e.run()

执行程序后,在dt_path位置已经存储了概念的资金流数据,文件名为20200214_concept.csv,内容大致如下:

钛白粉,1008.88,6.29%,7.68,6.21,1.47,7,金浦钛业,10.04%,2.96
磷化工,916.833,2.42%,37.53,34.78,2.75,28,六国化工,9.97%,4.08
光刻胶,1435.68,2.40%,43.51,44.31,-0.80,20,晶瑞股份,10.01%,42.99

此时就完成了同花顺概念分类的资金流数据的爬取,之后可以每天定时启动任务抓取数据进行分析。

Python量化交易实战
欢迎您扫码订阅我的微信公众号: pyquant

TaskFlow 是一个以高度可用,易于理解和声明试方式来执行[作业,任务,流程]的库,可与OpenStack和其他项目一起使用。

基本概念

taskflow库在oslo项目中是一个实现比较复杂的项目,要弄清楚其实现原理,首先需要对其中的相关概念有所了解。所以,本文首先总结了taskflow中常用的一些基本概念,这些概念主要包括如下几个:

Atom

Atom类是taskflow的最小单位,taskflow中其他类,包括Task等都需要继承这个类。一个Atom对象是一个命名对象,通过操作输入数据以执行一些促进整个流程发展的动作,或者产生一个处理结果等。它是一个抽象类,提供了两个抽象方法:execute()用于执行一个动作,revert()用于根据execute()执行结果和失败信息还原到任务执行之前的状态;除此之外,还分别为这两个方法提供了pre_execute()/post_execute()、pre_revert()/post_revert()方法用于定义在执行execute或revert操作前后执行的操作。

Task

Task类是一个拥有执行和回滚操作的最小工作单元,表示一个任务流中的某一个任务。它是一个继承自Atom类的表示一个任务的父类,开发者可以执行定义一个继承自Task类的任务类,并重写execute()和revert()方法分别表示执行和回滚的操作。

Task outline.

Task的两种类型:

  • Task: 对于继承和创建自己的子类很有用。
  • FunctorTask: 对于将现有function包装到任务对象中很有用,但是不能应用在engine中

Retry

Retry类也是一个继承自Atom的抽象类,它主要定义了当有错误发生时,如何进行重试操作。其也包含也不同的类型,将会在接下来的部分进行详细介绍。继承重试的子类必须提供on_failure()函数来对故障进行处理。

为避免重复创建常见的重试模式,提供了以下常见的重试子类:

  • AlwaysRevert: 始终还原subflow.

  • AlwaysRevertAll: 始终还原整个flow

  • Times: 对subflow重试指定次数

  • ForEach: 允许在每次发生故障时为subflow提供不同的值(使其有可能通过更改subflow输入来解决故障)

  • ParameterizedForEach: 和ForEach类似,但是从存储中获取值.

    Inheritance diagram of taskflow.atom, taskflow.task, taskflow.retry.Retry, taskflow.retry.AlwaysRevert, taskflow.retry.AlwaysRevertAll, taskflow.retry.Times, taskflow.retry.ForEach, taskflow.retry.ParameterizedForEach

关于重试的策略,taskflow通过一个枚举类型的Decision定义了三种策略:

  • REVERT:仅回滚失败Flow对象周围或关联的子流Flow对象。该策略在回滚子流Flow对象之前,会首先咨询其父Atom对象以确定父Atom对象是否使用不同的重试策略。该策略允许安全的嵌套具有不同重试策略的Flow对象。如果父Atom对象中没有定义重试策略,则默认只回滚关联子流Flow对象中的Atom对象。当然,你可以通过defer_revert参数改变默认行为,当其设置为True,表示REVERT策略将继承父Atom的策略,如果父Atom对象没有重试策略,则它也将被回滚。
  • REVERT_ALL:不管失败Flow对象的父Atom对象的策略如何,都将回滚整个流程。
  • RETRY:重试该失败的Flow/Task对象。

Flow

Flow类是一个用来关联所有相关Task类,并规定这些Task类的执行和回滚顺序的抽象类。而oslo中为Flow提供了三种实现方式:graph_flow表示图流,linear_flow表示线性流,unordered_flow表示无序流。关于这三种类型的流实现会在之后进行详细分析。

Inheritance diagram of taskflow.flow, taskflow.patterns.linear_flow, taskflow.patterns.unordered_flow, taskflow.patterns.graph_flow

  • linear_flow:线性流,该类型的Flow对象将按照Task/Flow加入的顺序来依次执行,按照加入的倒序依次回滚。
  • graph_flow:图流,该类型的Flow对象会按照给加入的Task/Flow显示指定的依赖关系或通过其间的provides/requires属性隐含的依赖关系执行和回滚。
  • unordered_flow:无序流,该类型的Flow对象所加入的Task/Flow会按照任意顺序执行或回滚。

要弄清楚这三种类型的Flow对象,首先需要了解oslo定义的Flow基类的构成。在oslo定义的Flow基类中,主要包含以下几个重要的属性和方法:

  • name:表示初始化Flow对象时,为其指定的名称,并不能唯一表示一个Flow对象。
  • retry:表示与该Flow对象关联的重试控制器。
  • provides:表示该Flow对象提供的一组符号名称。
  • requires:表示该Flow对象所需要的一组"unsatisfied"符号名称。
  • add(*items):该方法用于为该Flow对象添加一个或一组Task/Flow对象。
  • iter_links():迭代Flow对象的子节点之间的依赖关系链接。例如在迭代一个三元组(A, B, meta)时,就是迭代一个从子节点A(一个Atom对象或一个Subflow)指向子节点B(一个Atom对象或一个Subflow)的链接;换句话说,也就代表了子流B依赖于子流A,或者子流B需要子流A;而meta代表了这个依赖关系链接的元数据,是一个字典。
  • iter_nodes():迭代Flow对象中的所有节点。例如在迭代一个二元组(A, meta)时,A(一个Atom对象或一个Subflow)是当前Flow对象的子流或子任务;meta同样代表了这个链接的元数据,是一个字典。

Engine

Engine类是一个表示真正运行Atom对象的抽象类,它的实现类主要用于载入(load)一个Flow对象,然后驱动这个Flow对象的Task对象开始运行。Engine的实现也有多种不同的形式,这也会在接下来的部分进行详细介绍。

taskflow在具体实现Task/Flow管理时,首先定义了一个Engine抽象类,所有实现都需要继承这个抽象类。这个抽象类定义了如下重要属性和方法:

  • notifier:一个通知对象,它会分发与Engine对象中包含的Flow对象相关的事件通知。
  • atom_notifier:一个通知对象,它会分发与Engine对象中包含的Atom对象相关的事件通知。
  • options:相关数据结构传递给Engine对象的选项。
  • storage:Engine对象的存储单元。
  • statistics:Engine对象收集的运行时统计数据字典。当Engine没有运行时,这个值为空;在Engine正在运行时或已经运行之前,它可能会存储一些对正在运行或运行完成时有用的或包含信息的键值对。
  • compile():该方法可以将Engine对象中包含的Flow对象编译成Engine对象内部表示形式。这个内部表示形式就是Engine对象实际用于运行的流的形式。
  • reset():将Engine对象重置为PENDING状态。如果一个Flow以FAILURE、SUCCESS、REVERTED状态结束运行(即调用Engine对象的run()方法之后),或由于某种状态使得其处于某种中间状态,此时可以调用reset()方法进行重置,然后进行重试操作。
  • prepare():在Engine对象编译完所有包含的Flow对象之后,且在Flow运行之前执行该方法,为流程的执行进行一些准备操作。
  • validate():在Engine对象编译完所有包含的Flow对象之后,且在Flow运行之前执行该方法,为流程的执行进行一些验证操作。
  • run():运行Engine对象中的Flow流程。
  • suspend():该方法尝试暂停Engine对象。如果一个Engine对象正在执行某个Atom对象,则执行该方法会将这个Atom对象之后的所有正要运行的工作都暂停,并将这个Engine对象的状态变为暂停状态,以便之后进行恢复操作。

taskflow在具体实现Engine时,都需要给上述属性和方法重新赋值或进行覆写操作,以实现一个完整的管理流程Flow/Task对象的Engine类。在taskflow中,目前实现了三种策略的Engine类,而在这三种策略中,有两种是面向行为的action_egine类:SerialActionEngine、ParallelActionEngine;另一种是面向多进程的worker_base类:WorkerBaseActionEngine。这三种类型的Engine类的异同点如下所示:

  • SerialActionEngine:这是一个以串行方式运行任务的Engine类,也就是说所有的任务都会在调用engine.run()方法的线程中顺序执行。
  • ParallelActionEngine:这是一个以并行方式运行任务的Engine类,即可以在多个线程中运行Engine对象中的任务。在这种策略中,taskflow定义了对应的多个ParallelThreadTaskExecutor创建运行任务的线程
  • WorkerBaseActionEngine:这是一个可以将任务调度到不同worker(即进程)中执行的Engine类。
Python量化交易实战
欢迎您扫码订阅我的微信公众号: pyquant

最近工作上遇到一个问题,某券商提供的文件单接口需要用其提供的c语言工具调用,由于我们的交易框架是python开发的,所以需要用到cython来调用c开发的工具包,学习了cython的官方入门教程顺便记录如下。

cython介绍

Cython是针对Python编程语言和扩展的Cython编程语言(基于Pyrex)的优化静态编译器。
它使为Python编写C扩展与Python本身一样容易。

编写样例代码

首先,创建一个后缀为pyx的文件cython_example.pyx,编写cython函数

def say_hello_to(name):
    print("Hello %s!" % name)

然后编写相应的编译脚本文件:

from distutils.core import setup
from Cython.Build import cythonize

setup(name='Hello world app',
      ext_modules=cythonize("cython_example.pyx"))

编译代码

python setup.py build_ext --inplace

报如下错误:

running build_ext
building cython_example extension
error: Unable to find vcvarsall.bat

Which Microsoft Visual C++ compiler to use with a specific Python version ?

visual c++ python
14.x 3.5,3.6,3.7,3.8
10.0 3.3,3.4
9.0 2.6, 2.7, 3.0, 3.1, 3.2

我使用的是python3.6,所以下载 Visual C++ 2015 Build Tools(包含Visual C++ 14.0)

下载地址:

http://go.microsoft.com/fwlink/?LinkId=691126&fixForIE=.exe.

安装之后重新执行编译命令输出如下最后两行说明编译通过:

Generating code
Finished generating code

会在当前目录下生成build目录和cython_example.cp36-win32.pyd

接下来做个测试,编写一个test.py,内容如下

import cython_example

cython_example.say_hello_to('eryk')

执行命令后输出如下结果说明调用cython函数成功了

Hello eryk!

参考

http://docs.cython.org/en/latest/src/quickstart/build.html

https://wiki.python.org/moin/WindowsCompilers

https://stackoverflow.com/questions/29846087/microsoft-visual-c-14-0-is-required-unable-to-find-vcvarsall-bat

我的微信公众号:pyquant

Python量化交易实战
欢迎您扫码订阅我的微信公众号: pyquant

1. 背景

最近公司在某券商募集了一笔资金,需要对接其提供的程序化接口方便以后进行交易,对方只提供了c++版本程序,我们系统是python开发的,所以需要用python调用c++,大致了解下了python调用c++的几种方式,下面根据网上的资料介绍下几种方式优缺点,最后给个mac环境下python调用c++的例子。

2. Python调用C/C++程序方法

  1. ctypes

    • 如果是 C 函数库,则直接 load 这个库,然后调用即可;
    • 如果是 C++ 函数库,则需要用extern关键字封装一个供 C 使用的函数,即把类隐藏到一些 C 风格的函数里,然后用 extern 标明这些函数,以方便外部调用。
  2. SWIG

    SWIG完整支持ANSI C,支持除嵌套类外的所有C++特性。SWIG是一个接口编译器,旨在为C/C++方便地提供脚本语言接口。SWIG不仅可以为C/C++程序生成 Python接口,目前可以生成CLISP,Java,Lua,PHP,Ruby,Tcl等19种语言的接口。SWIG被Subversion, wxPython, Xapian等项目使用。值得一提的是,Google也使用SWIG。

  3. SIP

    SIP是一种Python工具,用于自动生成Python与C、C++库的绑定。SIP最初是在1998年用PyQt开发的,用于Python与Qt GUI toolkit的绑定,但适用于生成任何C或C++库的绑定。

  4. Cython

    Cython是让Python脚本支持C语言扩展的编译器,Cython能够将Python+C混合编码的.pyx脚本转换为C代码,主要用于优化Python脚本性能或Python调用C函数库。由于Python固有的性能差的问题,用C扩展Python成为提高Python性能常用方法,Cython算是较为常见的一种扩展方式。

  5. Boost.Python

    Boost.Python是Boost提供的一个C++的模板库,用以支持Python和C++的无缝互操作。相对SWIG来说,这个库的优势是功能通过C++ API完成,不用学习写新的接口文件。对C++的支持更自然、完整。

ctypes SWIG SIP Cython Boost.Python
是否支持Python3 支持 支持 支持 支持 支持
对接难易程度 简单 中等 中等 困难 困难
是否需开发封装代码 c++需要,c不需要 需要 需要 需要 需要

3. 使用ctypes调用C/C++ 例子

3.1 安装 GNU 的 C/C++ 编译器

以mac为例,安装之后命令行输入 g++ -v 查看是否安装成功

 ~/ g++ -v
Configured with: --prefix=/Applications/Xcode.app/Contents/Developer/usr --with-gxx-include-dir=/Applications/Xcode.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.14.sdk/usr/include/c++/4.2.1
Apple LLVM version 10.0.0 (clang-1000.11.45.5)
Target: x86_64-apple-darwin18.2.0
Thread model: posix
InstalledDir: /Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin

3.2 开发c++例子程序:

#include <iostream>     
class Test{     
    public:                  
        void print(){             
            std::cout << "Hello world!" << std::endl;
        }
};

extern "C" {     
    Test* Test_new(){         
        return new Test(); 
    }     
    void Test_print(Test* test){ 
        test->print(); 
    }
}

保存文件未test.cpp,编译代码生成动态链接库:

g++ -o test.so -shared -fPIC test.cpp

执行完成后会在当前目录下生辰test.so 文件

3.3 Python中调用so

from ctypes import cdll
lib = cdll.LoadLibrary('./test.so')

class Test(object):
    def __init__(self):
        self.obj = lib.Test_new()

    def print(self):
        lib.Test_print(self.obj)

test = Test()
test.print()

执行Python代码会输出 Hello world!,说明执行成功

参考

https://stackoverflow.com/questions/145270/calling-c-c-from-python

关于python对接c/c++各种方案的优缺点,这篇文章说的比较清楚: https://www.jb51.net/article/63623.htm

我的微信公众号:pyquant

Python量化交易实战
欢迎您扫码订阅我的微信公众号: pyquant

定义

FIX协议是由国际FIX协会组织提供的一个开放式协议,目的是推动国际贸易电子化的进程,在各类参与者之间,包括投资经理、经纪人,买方、卖方建立起实时的电子化通讯协议。

FIX协议的目标是把各类证券金融业务需求流程格式化,使之成为一个个可用计算机语言描述的功能流程,并在每个业务功能接口上统一交换格式,方便各个功能模块的连接。

FIX协议各个版本对股票、期权和期货的支持程度,目前市场上使用FIX4.4的较多。

FIX通信模型

  • Initiator:发起者,建立通信连路,通过发送初始Logon消息发起会话的参与方。
  • Acceptor:接收方 FIX会话的接收方。负责执行第一层次的认证和通过传输Logon消息的确认正式声明连接请求被接受。
  • 原则:先发起者为Initiator ,接受者为Acceptor 。
  • 标准模式以网关为Acceptor,客户端为Initiator做为常用模式。

基本概念

FIX Connection

FIX连接 由3部分组成:logon登录,message exchange消息传输,和logout注销.

FIX Session

FIX会话由一个或多个FIX Connection FIX连接组成。一个FIX会话可以有多次登录。一个FIX会话定义为一个在连接双方间的的带有连续序列号的有序消息双向传输流。 单个FIX会话能够跨越多个连续(不是并行的)的物理连接。在一个维持的,单独的FIX会话中,参与方能够多次连接和断开连接。连接的参与方必须根据单个系统及时间区域需求,公共协商会话的开始和结束。无论什么原因,重新设置接收和发送序列号为1,意味着一个新的FIX会话的开始。

建议一个新的FIX会话在每24小时期间建立一次。可以维持24小时的连接和通过设置在Logon消息中的ResetSeqNumFlag建立一套新的序列号。

Sequence Num

所有的FIX消息都由一个唯一的序列号进行标示。序列号在每一个FIX会话开始时被初始化为1,并在整个会话期间递增。监控序列号可以使会话参与者识别和处理丢失的消息,当在一个FIX会话中重新连接时能够优雅地进行应用程序同步。每个会话将建立一组互不依赖的接受和发送序列。会话参与者将维护一个赋予发送消息的序列和一个监控接受消息的消息块间隙序列号。

心跳 Heartbeats

在消息交互期间,FIX应用程序将周期性产生Heartbeat心跳消息。该心跳消息可以监控通信链路状态及识别接受序列号间隙。发送Heartbeat的周期间隔由会话发起者使用在Logon消息中HeartBtInt域进行定义。Heartbeat心跳消息的时间间隔应当在每一个消息发送后复位,即发送一个消息后,在间隔给定的时间内无其它消息发送则发送一个Heartbeat心跳消息。HeartBtInt的值应当被会话双方认同,由会话发起方定义并由会话接收者通过Logon消息进行确认。同一个HeartBtInt被会话双方——登录的发起者和登录的接受者共同使用。

Ordered Message Processing

FIX协议假设消息在所有参与者间完全按照顺序进行传输。协议的实现者在设计消息间隙填充处理时应当考虑这个假设。有两种方式处理消息间隙。每一个都要求所有的消息时最后一个接收消息的后续消息或在维护一个所有新消息有序序列时,请求特定丢失消息。比如:接收方丢失了5个消息块中的第二个,程序能忽略第3到第5个消息,产生一个对消息2到消息5的重传请求,或者从消息2到无穷大消息编号的重传请求。另外的方式是暂时存储消息3到消息5,仅要求重传消息2。对于这两种方式,消息3到消息5都不应该先于消息2进行处理。

Possible Duplicate

当一个FIX引擎对一个消息是否成功地被指定的目标接收或者当对一个重传请求进行响应时,将会产生一个可能的消息复制。这个消息将用同样的序列进行重新传送,此时在头部的PossDupFlag域将会被设置为‘Y’。接收端程序负责处理该重发消息,可以作为一个新消息进行处理,或者根据实际情况忽略该消息。所有重传请求的响应消息都将包含其值为‘Y’的PossDupFlag域。没有PossDupFlag域或者PossDupFlag域为‘N’的消息应被当作初始传送消息。注意,一个PossDupFlag值为‘Y’的重传消息需要重新计算其CheckSum值。一个可能的复制消息里发生变化的域包括:CheckSum,OrigSendingTime,SendingTime,BodyLength和PossDupFlag。加密相关域(SecureDataLen和SecureData)也必须被重新构造。

Possible Resends

模糊的应用层消息可能随同PossResend标志被重传。当一个指令没有在规定时间长度内进行确认或者终端用户挂起该指令没有进行传送时这种方法非常有用。接收程序必须识别此标志,并质疑其内部域以确定该指令是否在之前已经被接收过。注意,可能的重传消息将包含与原始消息相同的数据体,但包含PossResend标志和一个新的序列号。此外,CheckSum和与加密相关的域值需要重构。

数据完整校验 Data Integrity

消息数据内容的完整性可以参用两种方式来验证:消息长度和效验码检查。程序通过计算BodyLength域到(并包含)在CheckSum标记(“10=”)后的分界符的字符数与在BodyLength中标示的消息长度进行比较来完成完整性效验。ChekSum完整性检查,通过计算从域“8=”中“8”开始,包括紧跟在CheckSum标记域的分界符每个字符的2进制和同CheckSum进行比较得到。

消息确认 Message Acknowledgements

消息数据内容的完整性可以参用两种方式来验证:消息长度和效验码检查。程序通过计算BodyLength域到(并包含)在CheckSum标记(“10=”)后的分界符的字符数与在BodyLength中标示的消息长度进行比较来完成完整性效验。ChekSum完整性检查,通过计算从域“8=”中“8”开始,包括紧跟在CheckSum标记域的分界符每个字符的2进制和同CheckSum进行比较得到。

加密 Encryption

敏感数据在公众网络上的传输建议采用数据加密技术来掩饰应用消息。
加密算法由连接双方共同协商。
一个消息的任何一个域可以被加密并放在SecureData域中。然而,一些显示的标志域必须采用明文进行传输。为确保完整性,明文域可以在SecureData域中重复。
当使用加密时,建议但不是必须,所有的消息体都进行加密。如果一个消息中的重复组数据中的部分数据要加密,这个重复组必须全部进行加密。
预先协商好的加密算法在Logon消息中进行声明。

自定义域

  • FIX为给用户提供最大的灵活性,FIX协议允许用户自定义域。这些域在认同的参与者之间实现、应用,并且应注意避免冲突。
  • Tag数在5000 到9999保留用于用户自定义域。这些tag值用于企业联盟的信息交换。可以通过FIX网站进行注册。
  • 10000以上保留用于单一企业内部使用。不用注册。

消息类型

初始化过程之后,正常的消息交换将开始。所有有效的消息格式的细节将在“Adminitrative Message ”管理消息和“Application Messages”应用消息部分介绍。

1. 管理信息

它是为了信息交换过程更加顺畅一致而使用的控制,包括:登录、心跳、检验请求、重新发送请求、拒绝(交换过程)顺序重设及注销等。

2. 应用消息

也就是交易的数据,它包括:

  • 公告 宣布已完成的交易信息。
  • 重要提示 告知由经纪人买卖的证券是由私人股份有限公司所有,还是由代理持有,以及持有量。
  • 消息 是经纪人和机构之间传送的一般自由格式信息,带有识别信息紧急性和商号主题词分类标志。
  • 电子邮件 其格式和用途与消息信息相同,但更倾向于双方非公开的用途。
  • 报价请求 有些市场,要求经纪人在每次订单前提出报价。
  • 报价与多宗报价 回应报价请求的信息,并用于发表主动的报价。
  • 请求对多宗报价的确认 使用报价回应水平标记,有选择地支持对报价的确认。
  • 报价撤销 报价发起人用于撤销报价。
  • 报价状况请求 机构用来生成执行报告。
  • 报价确认 针对报价、多宗报价、报价撤销和报价请求,作出回应。
  • 行情数据请求 通过此请求得到所指定的证券和外汇交易报价的行情数据。
  • 行情数据—快照/完全刷新 该信息用于发送双方的订单登记簿、报价清单、交易清单、指数值、开盘价、收盘价、成交单价、最高价、最低价和变动加权平均价等。
  • 行情数据—添加刷新 用于添加刷新请求。
  • 行情数据请求拒绝 用于经纪人因交易或技术上的原因不承兑行情数据请求的情况。
  • 证券定义请求 用于某一指定证券与第二方交易。
  • 证券定义 接受或拒绝证券定义信息中请求的证券,发回证券及类型清单。
  • 证券状况请求 用于提出有关证券状况的请求。
  • 证券状况 提供有关证券状况改变的报告。
  • 交易盘状况请求 请求有关市面状况的信息。
  • 交易盘状况 提供有关市场状况的信息。
  • 新订单—单一 机构向经纪人提供有关证券或外汇的订单。
  • 执行报告 确认收到订单或订单改变信息,传递订单状况或订单成交信息,报告交易的费用。
  • 未知交易 通知交易方,收到的订单已被执行。
  • 订单撤销/替换请求 改变订单的参数。
  • 订单撤销拒绝 是经纪人在不能承兑所收到的撤销请求信息时发出的信息。
  • 订单状况请求 机构要求经纪人生成并发挥有关订单状况的信息。
  • 划拨 指定如何将一个订单或一组订单细分为一个或多个账户。
  • 划拨确认 确认收到机构发送的划拨信息及状态。
  • 结算指令 经纪人或机构交易结算的指令。
  • 出价请求 在“非公开”市场与“公开”市场,因市场规则不同,该信息的用法也不同
  • 出价回应 因两个市场规则不同,有不同的用法。
  • 新订单—清单 因两种市场规则的不同而不同。
  • 敲定价 交换本金交易的敲定价。
  • 状况清单 卖方以主动方式发送回应状况清单请求信息。
  • 清单执行 机构用于指示经纪人开始执行已被提交的证券订单信息。
  • 清单撤销执请求 用于机构希望在执行交易盘之前或之中,撤销已被提交的证券订单消息。
  • 状况清单请求 用于机构指示经纪人生成有关某一状况清单的信息。
  • 清单订单信息的分解 使用与其它FIX信息相同的方法,支持程序交易中的信息分解。
  • 交易信息拒绝 拒绝因遵循了交易盘规则而不能以其它方式进行拒绝的应用层面的信息。

消息格式

数据类型

整数int,浮点数float,单个字符char,布尔Boolean,字符串String,数据data

每条FIX信息都是由一系列带有〈标记〉=〈值〉的域组成。每个标记代表不同的含义,可以是信息的类型,目标商务名称,证券买入价等。FIX协议规定了0~5000的标记含义(fix信息字典),5000以上可由使用者自己定义,以适用特定的应用。

参考

https://www.fix-events.com/Archives/asianfix2008/cn/pdf/AlanDean_Chi.pdf

https://l297.oschina.io/15034517662312.html

https://juejin.im/post/5bf7c4ae51882528c4467649

https://www.huiyep.com/knowledge/155062.html

https://github.com/quickfix/quickfix/blob/master/examples/executor/python/executor.py

我的微信公众号:pyquant

Python量化交易实战
欢迎您扫码订阅我的微信公众号: pyquant

第一章 系统的建立

1. 鉴定可交易的市场

  • 流动性
    流动性的最佳度量是交易量和未平仓合约数,观察一段时间的平均值优于观察单日
  • 历史波动性
    历史上趋势变化范围宽的市场优于平静的趋势变化范围窄的市场
  • 准确的基本面和技术面数据
  • 避免新市场

2. 鉴别走势

  • 两条移动平均线,例如: 3和12,9和18,观察不到水平状态
    • 3 > 12,上升趋势
    • 3 < 12,下降趋势
  • 三条移动平均线,例如: 4、9和18
    • 4 > 9 > 18, 上升趋势
    • 4 < 9 < 18, 下降趋势
    • 其他,水平状态
  • 移动平均线和其他技术指标组合
    当指标发出的信号不一致时,认为市场无趋势或呈横向趋势
  • 简单化

不建议运用翻转策略,因为这不能鉴定横向趋势的市场,并且总是存在于从多头到空头或者空头到多头的市场。这些反转系统总是倾向于在横向趋势的市场中出现锯齿状波动,毫无成功的可能,除非市场趋势持续如此。

3. 市场择时

依赖单一的”神奇指标“不现实,要设计一个灵活的动态系统,在真实的不断变化的市场条件也能管用。

市场的三种走势

  • 长期走势:数周或数月
  • 中期走势:最近几天
  • 短期价格变化:前一天和当天

短期信号最先出现,其次是中期信号,最后是长期信号,当我们鉴别出了长期走势时,第一个中期和短期信号已经出现过了,因子,我们要利用会在长期走势中反复出现的中期和短期信号。

耐心

等到市场时机成熟时进入,增加了每笔交易的利润,减少了必要的资本,减少了交易数量,大大提高了净利润额

4. 建立止损

止损过近或过远

  • 近止损,损失小、风险低、过早离场
  • 远止损,胜率高、单笔损失大、风险高

理想的止损

建立可接受的止损程序可以通过将止损设立在随机价格波动范围外一点点的位置。

  1. 一种可行的办法是算出价格移动平均线的标准差,在偏离移动平均线的最高点处建立止损。(布林带)
  2. 把每日价格变化的平均值作为建立止损的最小距离,这样可以避免大部分会产生小型的锯齿状波动的价格波动。
    例如:建立5或10日移动平均线,将原始止损点设置于移动平均线之间的区间相等的最小距离处。

止损方法要保持一致

5. 退市择时

  • 跟踪止损法
  • 相对强弱指标(RSI)

退市策略绩效可使用随机入市的方法验证效果

6. 再入市择时

退市策略是更为重要,所以使用可能的最佳的退市策略,然后调节退市后会被触发的再入市指标的敏感度。

7. 监控系统

  • 每周期交易频率,比如每月1.5笔
  • 盈利交易笔数与亏损交易笔数的百分比
  • 最糟糕的周期中盈利笔数
  • 最佳周期中盈利笔数
  • 最长连续亏损
  • 连续盈利交易
  • 每笔盈利交易的平均收益
  • 平均亏损交易额
  • 最大回吐
  • 最大回吐恢复时长

第二章 技术研究

方向性运动指标DMI和平均方向性运动指数ADX

* 下降的ADX预示着市场不呈现任何趋势,应该采取逆势策略,而不是顺势策略。
* ADX滞后性

布林带、包络线和通道

通道突破作为确认方法

商品通道指数CCI

背离

  • 趋势性市场和非趋势性市场
    非趋势性市场偏离交易朝两个方向都可以进行,而在趋势性市场逆势的偏离信号一般应该被忽略(视图抓住大的顶部和底部除外)
  • 连续偏离
    连续三次偏离
  • 相关市场偏离

动力指标和变化率

动力指标精确测量了市场的运转速度,从某种程度上说,测量了一种趋势完好存在的程度。

计算过程:用当日的收盘价减去n天前的收盘价,结果是一个位于零点或零线附近的正数或负数。

公式:

M = Pt - Pt-n

M是动力指标,Pt是当日的收盘价,Pt-n是Pt时间段(通常是天)前的收盘价。

变化率的公式是:

ROC=100(Pt/Pt-n)

动力线和零线交叉次数随着动力线指标计算中使用的时间周期的不同而变化。时间周期短,动力线与零线交叉得约频繁,指标发出信号的速度越快。

动力线最有效的用途之一是界定长期趋势。25周期的动力周线图是个非常可靠的长期走势指标。当动力线迅速离开零线时,顺势交易会带来丰厚的回报。

其他指标的动力指标量化倾斜度

移动平均线

  • 简单移动平均线

  • 加权移动平均线

  • 指数移动平均线

  • 三条移动平均线: 4-9-18均线法

  • 四条移动平均线

  • 移位后的移动平均线

  • 发现过滤器

MACD

抛物线指标SAR

最大的价值是用来建立止损点

相对强弱指标RSI

RSI > 75 or RSI < 25,推迟入市

慢速随机指标KDJ

随机指标最不适合用于只有持续的微小变动的趋势性市场

  • 左右交点

波动性

  • 平均真实区间(ATR)
  • 突破或价格峰值超出近期区间或平均真实区间是十分重要的信号,应作为入市点。

第三章 系统测试

避免最优化

从统计学的角度而言,少于30笔交易,产生的结果就不可靠。交易笔数大于30的程度越多

积累的前进测试,滚动前进最优化处理

衡量绩效

夏普比率(Sharpe Ratio)

其定义为年度化的收益(一种盈利性度量形式)与无风险收益率的差除以年度化的收益标准差(一种波动度量形式)。

英镑比率(Sterling Ratio)

英镑比率=年平均收益比率/[(1×3年平均最大亏损) + 10]

卡尔玛比率(Calmar Ratio)

几何平均数

几何平均数衡量的是你的交易系统的增长因子。几何平均数越高,你的系统再投资时创造高收益的潜力就越大。

净盈利

测试样例中的交易数

总交易数必须大于30,这是为了确保统计上的重要结果的可靠性。

最大盈利及最大的亏损交易

倘若最大的盈利交易不合理地歪曲了净盈利额,它就很重要。许多保守的系统测试员会剔除每种商品中的最大盈利交易,然后再次评价结果。

最大的连续盈利交易和亏损交易

峰到谷式的亏损

计算最大净资产额最为准确的方法是取日总净资产额的最高值与随后的日总净资产额的最低值之差值。

盈利交易百分比

大部分成功的顺势交易商拥有35%到45%的盈利交易。要达到55%以上是很困难的

平均盈利与平均亏损比率

总收益的最大亏损

波动性和破产的概率

测试入市、退市及止损方法

如果你的风险控制由两种类型的止损方法组成,一个是简单的定额止损法,另一个是将止损位设置为近期高价或低价,如果你知道每种止损价位被触及的频率,这对设计和测试过程会有帮助的。

测试入市方法

在整体方案中,退市方法确实要比入市方法重要,毕竟,交易的结果最终由退市决定,当入市做对了后,找到良好的退市方法就要容易很多。

有效测试一个交易系统的任何单个的要素的最佳方法就是尽量将它分离开来。建立你自己的交易系统,然后删去通常的退出方法。用一种可以在进入每笔交易特定天数后自动退出市场的方法取代原先的退市方法。

测试退市策略

选择一种结果合理的入市方法作为反转系统,然后用相同的数据和入市方法对每种退市策略一一进行测试。

从理论上讲,比入市方法更敏感的退市方法应该捕捉到每个市场变动的更多的信息。以下是对我们测试的退市策略的一个描述。

跟踪止损

初始风险是指进入市场点与控制风险的止损点之间的差距。净资产额风险是指开立的头寸的市场价格与你的退市策略暗示的价格之间的差值。

测试止损策略

初始风险止损

初始风险止损可定义为一种以某种方式限制一笔交易从入市点就可能累积的亏损的止损。当一笔交易对我们不利时,初始风险止损通常先于任何其它类型的退市策略被触发。它是你最基本的“止损”指令。

  • 定额止损
  • 支撑/阻力
  • 无利退出, 如果交易在一定天数后无利可获则退出

保本止损

保本止损被定义为一旦交易达到一定的利润额,在入市点建立的止损。这种止损的目的明显是要防止合理的利润变为亏损。

跟踪止损

跟踪止损是指达到一些合乎逻辑的价格点后,不断计算得出的止损价。跟踪止损可以用作初始风险止损或取利退市策略,或者兼任两者功能。

  • 从收盘价开始跟踪的定额止损
    这种止损是从交易方向的最高收盘价或最低收盘价开始计算止损点。
  • 从高价或低价开始跟踪的定额止损
    一旦达到某一盈利高度,就应该跟踪止损保护一定数目的收益。交易的损失将会被限制在交易所达到的高价和低价以及跟踪止损的高价和低价的差额范围内。这种止损可即刻跟踪,并且可以用作初始风险止损,也可以用作跟踪止损。

创建一个简单的交易系统

交易系统的目标

年回报率20% ~ 30%,盈利交易百分比大于40%,平均盈利与平均亏损的比率至少为2:1

风险控制

  • 初始风险
    即入市点与保护性止损点之间的差距
  • 资产风险
    即投入市场的净资产额与跟踪止损价位之间的差额。这两类风险还可以进一步分为单笔交易中的初始风险和资产风险,以及跨投资组合同时交易的初始风险和资产风险。

第四章 日交易

交易成本

手续费和点差

市场选择

当日high和low价差的绝对值高,或者更高的时间比率。持续的高波动性是日内交易市场或标的选择的重要指标。

考虑最小变动价位的大小

流动性及最小价差的大小也是选择日交易市场时应该考虑的因素

日交易商不断地面临着从相对小的价格变化幅度中获取最大利润的问题。这种情况自然使得交易商采取回调时买进、反弹时卖出的策略,而不是试图去顺应走势。大部分的顺势策略对于日交易而言往往太慢。逆势策略提供了从一个小的价格变化幅度中获取最大利润的可能性。然而,逆势策略往往不如顺势策略那么可靠,因为快速地识别出价格的转轨点比简单的顺势交易要难得多。

成功的日交易商试图在上升趋势阶段中的回调时买进,在下降趋势阶段在反弹时卖出。想持续赚钱的日交易商必须善于顺应趋势,又要善于发现短期的转折点。许多交易商亏钱是因为他们两者都不擅长。看过一些可能的日交易策略的例子后,请记住两个步骤:首先找出中期走势,然后找出短期转折点。要想获得盈利的日交易,两步都要进行得快而准。

我的微信公众号:pyquant

Python量化交易实战
欢迎您扫码订阅我的微信公众号: pyquant

6月11号在火币OTC上卖了一点BTC给朋友还钱,结果今天朋友告诉我他的银行卡被冻结了,我看了下自己的银行卡,发现我的银行卡也被冻结了,只能转入不能转出。

给银行打电话询问,答复是司法冻结的,银行看不到原因和无权解冻。银行告诉我冻结的是哪个公安局和案件号(这两个信息非常重要)。之后打当地114查询到公安局的电话号码。

我又联系火币网,客服给了我一个链接,一看就懂了,链接和截图附在文后。。。看来有不少人被坑过,大概意思是司法冻结的原因是可能涉及到赃款,看严重程度不同账号被冻结的时间长度也不同,短的会被冻结48-72小时,长的可以冻结半年。之后给公安局打电话询问了我的账户的情况,大致意思是说给我转账的银行卡涉及电信诈骗,所以我的卡被冻结48小时,之后就没事了。

小建议:

OTC交易尽量选择交易量大的ID进行交易

关于银行卡冻结及处理办法 (火币官网链接需要翻墙),重要内容截图如下

我的微信公众号:pyquant

Python量化交易实战
欢迎您扫码订阅我的微信公众号: pyquant

这篇文章原文出自kaggle,我大致翻译翻一下,文中给出了reduce_mem_usage方法可以用来自动缩减dataframe占用空间

这篇notebook展示了通过使用更合理的数据类型来减少dataframe的内存使用量

方法如下:

  1. 迭代每一个column
  2. 检查column是否为数字型
  3. 检查column是否可以用integer表示
  4. 找出column下的最大值和最小值
  5. 选择适用于数据范围的最合适的数据类型

通过以上步骤处理后将一份测试数据从1.3GB减少到466MB

源码如下:

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

def reduce_mem_usage(props):
    start_mem_usg = props.memory_usage().sum() / 1024**2 
    print("Memory usage of properties dataframe is :",start_mem_usg," MB")
    NAlist = [] # Keeps track of columns that have missing values filled in. 
    for col in props.columns:
        if props[col].dtype != object:  # Exclude strings
            
            # Print current column type
            print("******************************")
            print("Column: ",col)
            print("dtype before: ",props[col].dtype)
            
            # make variables for Int, max and min
            IsInt = False
            mx = props[col].max()
            mn = props[col].min()
            
            # Integer does not support NA, therefore, NA needs to be filled
            if not np.isfinite(props[col]).all(): 
                NAlist.append(col)
                props[col].fillna(mn-1,inplace=True)  
                   
            # test if column can be converted to an integer
            asint = props[col].fillna(0).astype(np.int64)
            result = (props[col] - asint)
            result = result.sum()
            if result > -0.01 and result < 0.01:
                IsInt = True

            
            # Make Integer/unsigned Integer datatypes
            if IsInt:
                if mn >= 0:
                    if mx < 255:
                        props[col] = props[col].astype(np.uint8)
                    elif mx < 65535:
                        props[col] = props[col].astype(np.uint16)
                    elif mx < 4294967295:
                        props[col] = props[col].astype(np.uint32)
                    else:
                        props[col] = props[col].astype(np.uint64)
                else:
                    if mn > np.iinfo(np.int8).min and mx < np.iinfo(np.int8).max:
                        props[col] = props[col].astype(np.int8)
                    elif mn > np.iinfo(np.int16).min and mx < np.iinfo(np.int16).max:
                        props[col] = props[col].astype(np.int16)
                    elif mn > np.iinfo(np.int32).min and mx < np.iinfo(np.int32).max:
                        props[col] = props[col].astype(np.int32)
                    elif mn > np.iinfo(np.int64).min and mx < np.iinfo(np.int64).max:
                        props[col] = props[col].astype(np.int64)    
            
            # Make float datatypes 32 bit
            else:
                props[col] = props[col].astype(np.float32)
            
            # Print new column type
            print("dtype after: ",props[col].dtype)
            print("******************************")
    
    # Print final result
    print("___MEMORY USAGE AFTER COMPLETION:___")
    mem_usg = props.memory_usage().sum() / 1024**2 
    print("Memory usage is: ",mem_usg," MB")
    print("This is ",100*mem_usg/start_mem_usg,"% of the initial size")
    return props, NAlist

原文链接:

https://www.kaggle.com/arjanso/reducing-dataframe-memory-size-by-65


我的微信公众号:pyquant

Python量化交易实战
欢迎您扫码订阅我的微信公众号: pyquant

读写API

HDFStore支持使用read_hdf进行读取和使用to_hdf进行写入的top-level API,类似于read_csv和to_csv的工作方式。

默认情况下,HDFStore不会丢弃全部为na的行。可以通过设置dropna = True来更改此行为。

import pandas as pd
import h5py
df_tl = pd.DataFrame({'A': list(range(5)), 'B': list(range(5))})
df_tl.to_hdf('store_tl.h5', 'table', append=True)
pd.read_hdf('store_tl.h5', 'table', where=['index>2'])

HDF数据类型

  • Fixed Format
    • 一次写入,重复读取,不可追加
    • 不可以使用where查询,比如每次获取指定key的全部内容
    • 不支持dataframe有非唯一的column
    • 相比于table格式更快的读写速度
    • 使用put或者to_hdf时的默认类型,也可以通过format='fixed’或format='f’指定类型
  • Table Format
    • 支持append操作
    • 支持删除和查询类型操作
    • 执行put或者to_hdf操作时通过设置format='table’或format='t’指定table格式
    • pd.set_option(‘io.hdf.default_format’,‘table’) 设置默认hdf format类型

在第一次append/put操作之后,您无法更改数据列(也不能转换索引)(当然,您只需读取数据并创建新表!)。

警告HDFStore对于写入不是线程安全的。底层PyTables仅支持并发读取(通过线程或进程)。如果您需要同时进行读写,则需要在单个进程中在单个线程中序列化这些操作。否则你将破坏你的数据。

TODO 验证append是否会按照index排序的例子

Hierarchical Keys

key可以包含路径,例如’/food/apple’, 存储时会自动创建sub-stores(在pytables中是groups),可以省略路径开头的’/’,’/food/apple’和’food/apple’表示相同的key。

注意:删除操作会删除子路径下的所有内容,请小心使用

df = pd.DataFrame({'A': list(range(2)), 'B': list(range(2))})
store = pd.HDFStore('hk_test.h5')
store.append('/food/apple', df)
store.append('food/orange', df)
store.append('df',df)
print(store.keys())

for (path, subgroups, subkeys) in store.walk():
    for subgroup in subgroups:
        print('GROUP: {}/{}'.format(path, subgroup))
    for subkey in subkeys:
        key = '/'.join([path, subkey])
        print('KEY: {}'.format(key))
        print(store.get(key))

print(store['food/orange'])

查询

select和delete操作可以通过查询语句对数据子集进行操作,好处是可以在非常大的数据集上只检索一小部分数据。

查询表达式:

  • 支持使用index和columns查询dataframe
  • 支持使用major_axis、minor_axis和items查询Panel
  • 如果指定data_columns,则它将被作为附加索引器

有效的比较运算符是:

=, ==, !=, >, >=, <, <=

有效的布尔表达式包括:

  • | : 或操作
  • & : 与操作
  • ( 和 ) : 分组

例子:

  • ‘index >= date’
  • “columns = [‘A’, ‘D’]”
  • “columns in [‘A’, ‘D’]”
  • ‘columns = A’
  • ‘columns == A’
  • “~(columns = [‘A’, ‘B’])”
  • ‘index > df.index[3] & string = “bar”’
  • ‘(index > df.index[3] & index <= df.index[6]) | string = “bar”’
  • “ts >= Timestamp(‘2012-02-01’)”
  • “major_axis>=20130101”

不建议通过将字符串插入查询表达式来将字符串传递给查询。只需将感兴趣的字符串分配给变量,并在表达式中使用该变量。
例如:

string = "HolyMoly'"
store.select('df', 'index == string')
store.select('dfq', "index>pd.Timestamp('20130104') & columns=['A', 'B']")
store.select('dfq', where="A>0 or C>0")
store.select('df', "columns=['A', 'B']")

删除

store.remove('wp', 'major_axis > 20000102')

警告: 请注意HDF5不会自动回收h5文件中的空格。因此,反复删除(或删除节点)并再次添加,将趋于增加文件大小。

压缩

  • complevel指定压缩强度,complevel = 0和complevel = None禁用压缩,0 <complevel <10启用压缩。
  • complib 指定压缩库,默认使用zlib
    • zlib:默认的压缩库。压缩方面的经典之作可以实现良好的压缩率,但速度有些慢。
    • lzo:快速压缩和减压。
    • bzip2:良好的压缩率。
    • blosc:快速压缩和解压缩。
    • blosc:blosclz这是blosc的默认压缩器
    • blosc:lz4:紧凑,非常流行和快速的压缩机。
    • blosc:lz4hc:LZ4的调整版本,以牺牲速度为代价产生更好的压缩比。
    • blosc:snappy:在许多地方使用的流行压缩器。
    • blosc:zlib:经典;比以前慢一些,但实现了更好的压缩比。
    • blosc:zstd:非常平衡的编解码器;它提供了上述其他压缩比,并且速度相当快。
store_compressed = pd.HDFStore('store_compressed.h5', complevel=9,complib='blosc:blosclz')

store.append('df', df, complib='zlib', complevel=5)

ptrepack

重新生成压缩文件,重写文件将回收已删除的空间,也可以改变complevel

ptrepack --chunkshape=auto --propindexes --complevel=9 --complib=blosc in.h5 out.h5

性能

  • fixed stores 读写速度快于 tables 格式,但tables支持追加、删除和查询操作
  • 可以设置chunksize=< int >来指定chunsize大小,这将会降低写入时的内存使用量
  • 设置expectedrows 可以优化读写性能
  • Duplicate rows将会被写入tables,在select时会被过滤掉

我的微信公众号:pyquant

Python量化交易实战
欢迎您扫码订阅我的微信公众号: pyquant