用python pyes操作elasticsearch api接口遇到的问题

尼玛,pyes一直都有问题,算了直接用es推荐的那个py吧

from elasticsearch import Elasticsearch
from datetime import datetime
es = Elasticsearch()

for i in xrange(3000):
    nima=random_str(50)
    res = es.index(index="ceshi", doc_type='rui',id=1, body={"woca":nima,'ltime':datetime.now()})
    print nima

In [10]: conn = pyes.ES([‘127.0.0.1:9200’])
KeyboardInterrupt

In [10]: import pyes

In [11]: conn = pyes.ES([‘127.0.0.1:9200’])

In [12]: conn.create_index(‘test-index’)
—————————————————————————
AttributeError                            Traceback (most recent call last)

/root/<ipython console> in <module>()

AttributeError: ‘ES’ object has no attribute ‘create_index’

In [13]: 

还有一个模块推荐给大家,叫esclient

import esclient
import unittest

class TestESClient(unittest.TestCase):
    """Test all API methods implemented in esclient library"""

    @classmethod
    def setUpClass(self):
        """Create an ESClient"""
        self.es = esclient.ESClient()

        """Delete the test schema, if any. This will prevent any errors
        due to the schema already existing """
        print("Deleting test indexes, if any")
        self.es.delete_index("contacts_esclient_test")
        self.es.delete_index("contacts_esclient_test2")

    def setUp(self):
        """ Create a test schema once """
        body = {
            "settings": {
                "number_of_shards": 1,
                "number_of_replicas": 0
            }
        }
        self.assertTrue(self.es.create_index('contacts_esclient_test', body))
        self.assertFalse(self.es.create_index('contacts_esclient_test', body))

        self.assertTrue(self.es.create_index('contacts_esclient_test2', body))
        self.assertFalse(self.es.create_index('contacts_esclient_test2', body))


        """ Index some test data """
        data = {"name": "Joe Tester","age": 21, "sex": "male"}
        self.assertTrue(self.es.index("contacts_esclient_test", "person", body=data,
                                        docid=1))
        data = {"name": "Joe Schmoe","age": 17, "sex": "male"}
        self.assertTrue(self.es.index("contacts_esclient_test", "person", body=data,
                                        docid=2))

        self.assertTrue(self.es.refresh('contacts_esclient_test'))

    def tearDown(self):
        """docstring for tearDownClass"""

        """Delete the test schemas"""
        self.assertTrue(self.es.delete_index("contacts_esclient_test"))
        self.assertTrue(self.es.delete_index("contacts_esclient_test2"))

    def test_open_close_index(self):
        """docstring for test_open_index"""
        self.assertTrue(self.es.close_index('contacts_esclient_test'))
        self.assertTrue(self.es.open_index('contacts_esclient_test'))

    def test_index_api(self):
        data = {"name": "Jane Tester","age": 23, "sex": "female"}
        self.assertTrue(self.es.index("contacts_esclient_test", "person", body=data,
                        docid=3))
        """
        Again, now with op_type='create', meaning: only index when
        the document id does not exist yet
        """
        self.assertTrue(self.es.index("contacts_esclient_test", "person", body=data,
                        docid="3", op_type="create"))

        """ Ensure that the document has really been indexed """
        result = self.es.get('contacts_esclient_test', 'person', 3)
        try:
            found = result['exists']
        except KeyError:
            found = result['found']
        self.assertTrue(found)

    def test_get_api(self):
        result = self.es.get('contacts_esclient_test', 'person', 1)
        try:
            found = result['exists']
        except KeyError:
            found = result['found']
        self.assertTrue(found)

    def test_mget_api(self):
        """docstring for test_mget_api"""
        result = self.es.mget('contacts_esclient_test', 'person',
                              ids=[1,2], fields=['name','age'])

        for doc in result['docs']:
            self.assertTrue(doc['_id'] == '1' or  doc['_id'] == '2')

    def test_search_queryargs_api(self):
        """docstring for test_search_api"""
        query_string_args = {
                "q": "name:Joe",
                "sort":"age",
                "timeout":10,
                "fields": "id,name,age"
                }
        result = self.es.search(query_string_args=query_string_args,
                                indexes=['contacts_esclient_test'])
        self.assertEqual(result['hits']['total'], 2)

    def test_search_body_api(self):
        """docstring for test_search_body_api"""
        query_body = {
            "query": {
               "term": {"name": "joe"}
            }
        }
        result = self.es.search(query_body=query_body,
                                indexes=['contacts_esclient_test'])
        self.assertEqual(result['hits']['total'], 2)

    def test_scan_scroll_api(self):
        query_body= {
            "query": {
                "match_all": {}
            }
        }
        scroll_id = self.es.scan(query_body=query_body, indexes=['contacts_esclient_test'], size=1)

        total_docs = 0
        while True:
            count = 0
            res = self.es.scroll(scroll_id)
            for hit in res['hits']['hits']:
                total_docs += 1
                count += 1
            if count == 0:
                break

        self.assertEqual(total_docs, 2)

    @unittest.skip("demonstrating skipping")
    def test_deletebyquery_querystring_api(self):
        """Delete documents with a query using querystring option"""
        query_string_args = {
                "q": "name:Joe",
                "sort":"age",
                "timeout":10,
                "fields": "id,name,age"
                }
        result = self.es.delete_by_query(query_string_args=query_string_args,
                                         indexes=['contacts_esclient_test'])
        self.assertTrue(result['ok'])
        self.assertTrue(self.es.refresh('contacts_esclient_test'))
        result = self.es.get('contacts_esclient_test', 'person', 1)
        self.assertFalse(result['found'])
        result = self.es.get('contacts_esclient_test', 'person', 1)
        self.assertFalse(result['found'])

    @unittest.skip("demonstrating skipping")
    def test_deletebyquery_body_api(self):
        """Delete documents with a query in a HTTP body"""
        query_body = { "term": {"name": "joe"}}
        result = self.es.delete_by_query(query_body=query_body,
                                indexes=['contacts_esclient_test'],
                                doctypes=['person'])
        self.assertTrue(result['ok'])
        self.assertTrue(self.es.refresh('contacts_esclient_test'))
        result = self.es.get('contacts_esclient_test', 'person', 1)
        self.assertFalse(result['found'])
        result = self.es.get('contacts_esclient_test', 'person', 1)
        self.assertFalse(result['found'])

    def test_count_api(self):
        """docstring for count_api"""
        result = self.es.count(indexes=['contacts_esclient_test'])
        """ We can be sure there are at least two docs indexed """
        self.assertTrue(result['count'] > 1)

    def test_delete_api(self):
        """Delete a document"""
        result = self.es.delete('contacts_esclient_test', 'person', 1)
        result = self.es.get('contacts_esclient_test', 'person', 1)
        try:
            found = result['exists']
        except KeyError:
            found = result['found']

        self.assertFalse(found)

    def test_create_delete_alias_api(self):
        self.es.create_alias('contacts_alias', ['contacts_esclient_test',
                                                'contacts_esclient_test2'])
        self.es.delete_alias('contacts_alias', ['contacts_esclient_test',
                                                'contacts_esclient_test2'])
    @unittest.skip("needs fixing")
    def test_status(self):
        """docstring for test_status"""
        result = self.es.status(indexes=['contacts_esclient_test'])
        self.assertTrue(result['ok'])

    def test_flush(self):
        """docstring for test_flush"""
        self.assertTrue(self.es.flush(['contacts_esclient_test'], refresh=True))

    def test_get_mapping(self):
        """docstring for test_get_mapping"""
        m = self.es.get_mapping(indexes=['contacts_esclient_test'])
        self.assertIn("contacts_esclient_test", m)

    @unittest.skip("needs to be fixed")
    def test_put_mapping(self):
        """docstring for test_put_mapping"""
        mapping = {'persons': {'properties':{'name': {'type': 'string'}}}}
        result = self.es.put_mapping(doctype='person', mapping=mapping, indexes=['contacts_esclient_test', 'contacts_esclient_test2'])
        self.assertTrue(result['ok'])

    def test_index_exists(self):
        result = self.es.index_exists("contacts_esclient_test")
        self.assertTrue(result)

    def test_bulk(self):
        self.es.bulk_index('contacts_esclient_test', 'bulk', {'test':'test'}, 1)
        self.es.bulk_index('contacts_esclient_test', 'bulk', {'test':'test'}, 2)
        self.assertTrue(self.es.bulk_push())
        result = self.es.get('contacts_esclient_test', 'bulk', 2)
        try:
            found = result['exists']
        except KeyError:
            found = result['found']
        self.es.bulk_index('contacts_esclient_test', 'bulk', {'test':'test'}, 3)
        self.es.bulk_delete('contacts_esclient_test', 'bulk', 2)
        self.assertTrue(self.es.bulk_push())

        result = self.es.get('contacts_esclient_test', 'bulk', 2)
        try:
            found = result['exists']
        except KeyError:
            found = result['found']

        result = self.es.get('contacts_esclient_test', 'bulk', 3)
        try:
            found = result['exists']
        except KeyError:
            found = result['found']

if __name__ == '__main__':
    unittest.main()


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

3 Responses

  1. 黄恒_ 2014年7月22日 / 下午4:45

    你用的pyes是新版本的,老版本的conn才有create_index,现在那些方法都放到模块conn.indices里面了。。使用conn.indices.create_index(“index_name”)

  2. 黄恒_ 2014年7月22日 / 下午4:45

    你用的pyes是新版本的,老版本的conn才有create_index,现在那些方法都放到模块conn.indices里面了。。使用conn.indices.create_index(“index_name”)

发表评论

邮箱地址不会被公开。 必填项已用*标注