From ddc890d22a2285a251fdd36eebb89553a4c0faa7 Mon Sep 17 00:00:00 2001 From: "pierre.paci" Date: Thu, 4 Jan 2018 15:28:17 +0100 Subject: [PATCH 1/2] table is now safe with BrokenPipe --- happybase/table.py | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/happybase/table.py b/happybase/table.py index d0469e1..1201567 100644 --- a/happybase/table.py +++ b/happybase/table.py @@ -4,14 +4,13 @@ import logging from numbers import Integral -from struct import Struct - -from six import iteritems from Hbase_thrift import TScan +from six import iteritems +from struct import Struct -from .util import thrift_type_to_dict, bytes_increment, OrderedDict from .batch import Batch +from .util import OrderedDict, bytes_increment, thrift_type_to_dict logger = logging.getLogger(__name__) @@ -38,12 +37,25 @@ def make_ordered_row(sorted_columns, include_timestamp): return od +def safe_call(function): + def safe(self, *args, **kwargs): + try: + return function(self, *args, **kwargs) + except BrokenPipeError: + logger.debug("BrokenPipeError: refresh thrift connection") + self.connection._refresh_thrift_client() + return function(self, *args, **kwargs) + + return safe + + class Table(object): """HBase table abstraction class. This class cannot be instantiated directly; use :py:meth:`Connection.table` instead. """ + def __init__(self, name, connection): self.name = name self.connection = connection @@ -55,6 +67,7 @@ def __repr__(self): self.name, ) + @safe_call def families(self): """Retrieve the column families for this table. @@ -68,11 +81,13 @@ def families(self): families[name] = thrift_type_to_dict(descriptor) return families + @safe_call def _column_family_names(self): """Retrieve the column family names for this table (internal use)""" names = self.connection.client.getColumnDescriptors(self.name).keys() return [name.rstrip(b':') for name in names] + @safe_call def regions(self): """Retrieve the regions for this table. @@ -86,6 +101,7 @@ def regions(self): # Data retrieval # + @safe_call def row(self, row, columns=None, timestamp=None, include_timestamp=False): """Retrieve a single row of data. @@ -131,6 +147,7 @@ def row(self, row, columns=None, timestamp=None, include_timestamp=False): return make_row(rows[0].columns, include_timestamp) + @safe_call def rows(self, rows, columns=None, timestamp=None, include_timestamp=False): """Retrieve multiple rows of data. @@ -176,6 +193,7 @@ def rows(self, rows, columns=None, timestamp=None, return [(r.row, make_row(r.columns, include_timestamp)) for r in results] + @safe_call def cells(self, row, column, versions=None, timestamp=None, include_timestamp=False): """Retrieve multiple versions of a single cell from the table. @@ -219,6 +237,7 @@ def cells(self, row, column, versions=None, timestamp=None, for c in cells ] + @safe_call def scan(self, row_start=None, row_stop=None, row_prefix=None, columns=None, filter=None, timestamp=None, include_timestamp=False, batch_size=1000, scan_batching=None, @@ -439,7 +458,7 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None, # # Data manipulation # - + @safe_call def put(self, row, data, timestamp=None, wal=True): """Store data in the table. @@ -463,6 +482,7 @@ def put(self, row, data, timestamp=None, wal=True): with self.batch(timestamp=timestamp, wal=wal) as batch: batch.put(row, data) + @safe_call def delete(self, row, columns=None, timestamp=None, wal=True): """Delete data from the table. @@ -483,6 +503,7 @@ def delete(self, row, columns=None, timestamp=None, wal=True): with self.batch(timestamp=timestamp, wal=wal) as batch: batch.delete(row, columns) + @safe_call def batch(self, timestamp=None, batch_size=None, transaction=False, wal=True): """Create a new batch operation for this table. @@ -529,6 +550,7 @@ def batch(self, timestamp=None, batch_size=None, transaction=False, # Atomic counters # + @safe_call def counter_get(self, row, column): """Retrieve the current value of a counter column. @@ -550,6 +572,7 @@ def counter_get(self, row, column): # is correctly initialised if didn't exist yet. return self.counter_inc(row, column, value=0) + @safe_call def counter_set(self, row, column, value=0): """Set a counter column to a specific value. @@ -567,6 +590,7 @@ def counter_set(self, row, column, value=0): """ self.put(row, {column: pack_i64(value)}) + @safe_call def counter_inc(self, row, column, value=1): """Atomically increment (or decrements) a counter column. @@ -586,6 +610,7 @@ def counter_inc(self, row, column, value=1): return self.connection.client.atomicIncrement( self.name, row, column, value) + @safe_call def counter_dec(self, row, column, value=1): """Atomically decrement (or increments) a counter column. From d848160b6cbf0899d28e9dd32ecb9bc64dbb4791 Mon Sep 17 00:00:00 2001 From: "pierre.paci" Date: Thu, 4 Jan 2018 16:23:19 +0100 Subject: [PATCH 2/2] safe call correctly open connection and also repair TTransportException --- happybase/table.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/happybase/table.py b/happybase/table.py index 1201567..a763a1e 100644 --- a/happybase/table.py +++ b/happybase/table.py @@ -8,6 +8,7 @@ from Hbase_thrift import TScan from six import iteritems from struct import Struct +from thriftpy.transport import TTransportException from .batch import Batch from .util import OrderedDict, bytes_increment, thrift_type_to_dict @@ -41,9 +42,10 @@ def safe_call(function): def safe(self, *args, **kwargs): try: return function(self, *args, **kwargs) - except BrokenPipeError: - logger.debug("BrokenPipeError: refresh thrift connection") + except (BrokenPipeError, TTransportException): + logger.debug("Network error: refresh thrift connection") self.connection._refresh_thrift_client() + self.connection.open() return function(self, *args, **kwargs) return safe